"""Reporting service layer aggregating deterministic and simulation metrics.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import date import math from typing import Mapping, Sequence from urllib.parse import urlencode from fastapi import Request from models import FinancialCategory, Project, Scenario from services.financial import ( CashFlow, ConvergenceError, PaybackNotReachedError, internal_rate_of_return, net_present_value, payback_period, ) from services.simulation import ( CashFlowSpec, SimulationConfig, SimulationMetric, SimulationResult, run_monte_carlo, ) from services.unit_of_work import UnitOfWork DEFAULT_DISCOUNT_RATE = 0.1 DEFAULT_ITERATIONS = 500 DEFAULT_PERCENTILES: tuple[float, float, float] = (5.0, 50.0, 95.0) _COST_CATEGORY_SIGNS: Mapping[FinancialCategory, float] = { FinancialCategory.REVENUE: 1.0, FinancialCategory.CAPITAL_EXPENDITURE: -1.0, FinancialCategory.OPERATING_EXPENDITURE: -1.0, FinancialCategory.CONTINGENCY: -1.0, FinancialCategory.OTHER: -1.0, } @dataclass(frozen=True) class IncludeOptions: """Flags controlling optional sections in report payloads.""" distribution: bool = False samples: bool = False @dataclass(slots=True) class ReportFilters: """Filter parameters applied when selecting scenarios for a report.""" scenario_ids: set[int] | None = None start_date: date | None = None end_date: date | None = None def matches(self, scenario: Scenario) -> bool: if self.scenario_ids is not None and scenario.id not in self.scenario_ids: return False if self.start_date and scenario.start_date and scenario.start_date < self.start_date: return False if self.end_date and scenario.end_date and scenario.end_date > self.end_date: return False return True def to_dict(self) -> dict[str, object]: payload: dict[str, object] = {} if self.scenario_ids is not None: payload["scenario_ids"] = sorted(self.scenario_ids) if self.start_date is not None: payload["start_date"] = self.start_date if self.end_date is not None: payload["end_date"] = self.end_date return payload @dataclass(slots=True) class ScenarioFinancialTotals: currency: str | None inflows: float outflows: float net: float by_category: dict[str, float] def to_dict(self) -> dict[str, object]: return { "currency": self.currency, "inflows": _round_optional(self.inflows), "outflows": _round_optional(self.outflows), "net": _round_optional(self.net), "by_category": { key: _round_optional(value) for key, value in sorted(self.by_category.items()) }, } @dataclass(slots=True) class ScenarioDeterministicMetrics: currency: str | None discount_rate: float compounds_per_year: int npv: float | None irr: float | None payback_period: float | None notes: list[str] = field(default_factory=list) def to_dict(self) -> dict[str, object]: return { "currency": self.currency, "discount_rate": _round_optional(self.discount_rate, digits=4), "compounds_per_year": self.compounds_per_year, "npv": _round_optional(self.npv), "irr": _round_optional(self.irr, digits=6), "payback_period": _round_optional(self.payback_period, digits=4), "notes": self.notes, } @dataclass(slots=True) class ScenarioMonteCarloResult: available: bool notes: list[str] = field(default_factory=list) result: SimulationResult | None = None include_samples: bool = False def to_dict(self) -> dict[str, object]: if not self.available or self.result is None: return { "available": False, "notes": self.notes, } metrics: dict[str, dict[str, object]] = {} for metric, summary in self.result.summaries.items(): metrics[metric.value] = { "mean": _round_optional(summary.mean), "std_dev": _round_optional(summary.std_dev), "minimum": _round_optional(summary.minimum), "maximum": _round_optional(summary.maximum), "percentiles": { f"{percentile:g}": _round_optional(value) for percentile, value in sorted(summary.percentiles.items()) }, "sample_size": summary.sample_size, "failed_runs": summary.failed_runs, } samples_payload: dict[str, list[float | None]] | None = None if self.include_samples and self.result.samples: samples_payload = {} for metric, samples in self.result.samples.items(): samples_payload[metric.value] = [ _sanitize_float(sample) for sample in samples.tolist() ] payload: dict[str, object] = { "available": True, "iterations": self.result.iterations, "metrics": metrics, "notes": self.notes, } if samples_payload: payload["samples"] = samples_payload return payload @dataclass(slots=True) class ScenarioReport: scenario: Scenario totals: ScenarioFinancialTotals deterministic: ScenarioDeterministicMetrics monte_carlo: ScenarioMonteCarloResult | None def to_dict(self) -> dict[str, object]: scenario_info = { "id": self.scenario.id, "project_id": self.scenario.project_id, "name": self.scenario.name, "description": self.scenario.description, "status": self.scenario.status.value if hasattr(self.scenario.status, 'value') else self.scenario.status, "start_date": self.scenario.start_date, "end_date": self.scenario.end_date, "currency": self.scenario.currency, "primary_resource": self.scenario.primary_resource.value if self.scenario.primary_resource and hasattr(self.scenario.primary_resource, 'value') else self.scenario.primary_resource, "discount_rate": _round_optional(self.deterministic.discount_rate, digits=4), "created_at": self.scenario.created_at, "updated_at": self.scenario.updated_at, "simulation_parameter_count": len(self.scenario.simulation_parameters or []), } payload: dict[str, object] = { "scenario": scenario_info, "financials": self.totals.to_dict(), "metrics": self.deterministic.to_dict(), } if self.monte_carlo is not None: payload["monte_carlo"] = self.monte_carlo.to_dict() return payload @dataclass(slots=True) class AggregatedMetric: average: float | None minimum: float | None maximum: float | None def to_dict(self) -> dict[str, object]: return { "average": _round_optional(self.average), "minimum": _round_optional(self.minimum), "maximum": _round_optional(self.maximum), } @dataclass(slots=True) class ProjectAggregates: total_inflows: float total_outflows: float total_net: float deterministic_metrics: dict[str, AggregatedMetric] def to_dict(self) -> dict[str, object]: return { "financials": { "total_inflows": _round_optional(self.total_inflows), "total_outflows": _round_optional(self.total_outflows), "total_net": _round_optional(self.total_net), }, "deterministic_metrics": { metric: data.to_dict() for metric, data in sorted(self.deterministic_metrics.items()) }, } @dataclass(slots=True) class MetricComparison: metric: str direction: str best: tuple[int, str, float] | None worst: tuple[int, str, float] | None average: float | None def to_dict(self) -> dict[str, object]: return { "metric": self.metric, "direction": self.direction, "best": _comparison_entry(self.best), "worst": _comparison_entry(self.worst), "average": _round_optional(self.average), } def parse_include_tokens(raw: str | None) -> IncludeOptions: tokens: set[str] = set() if raw: for part in raw.split(","): token = part.strip().lower() if token: tokens.add(token) if "all" in tokens: return IncludeOptions(distribution=True, samples=True) return IncludeOptions( distribution=bool({"distribution", "monte_carlo", "mc"} & tokens), samples="samples" in tokens, ) def validate_percentiles(values: Sequence[float] | None) -> tuple[float, ...]: if not values: return DEFAULT_PERCENTILES seen: set[float] = set() cleaned: list[float] = [] for value in values: percentile = float(value) if percentile < 0.0 or percentile > 100.0: raise ValueError("Percentiles must be between 0 and 100.") if percentile not in seen: seen.add(percentile) cleaned.append(percentile) if not cleaned: return DEFAULT_PERCENTILES return tuple(cleaned) class ReportingService: """Coordinates project and scenario reporting aggregation.""" def __init__(self, uow: UnitOfWork) -> None: self._uow = uow def project_summary( self, project: Project, *, filters: ReportFilters, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], ) -> dict[str, object]: scenarios = self._load_scenarios(project.id, filters) reports = [ self._build_scenario_report( scenario, include_distribution=include.distribution, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) for scenario in scenarios ] aggregates = self._aggregate_project(reports) return { "project": _project_payload(project), "scenario_count": len(reports), "filters": filters.to_dict(), "aggregates": aggregates.to_dict(), "scenarios": [report.to_dict() for report in reports], } def scenario_comparison( self, project: Project, scenarios: Sequence[Scenario], *, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], ) -> dict[str, object]: reports = [ self._build_scenario_report( self._reload_scenario(scenario.id), include_distribution=include.distribution, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) for scenario in scenarios ] comparison = { metric: data.to_dict() for metric, data in self._build_comparisons(reports).items() } return { "project": _project_payload(project), "scenarios": [report.to_dict() for report in reports], "comparison": comparison, } def scenario_distribution( self, scenario: Scenario, *, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], ) -> dict[str, object]: report = self._build_scenario_report( self._reload_scenario(scenario.id), include_distribution=True, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) return { "scenario": report.to_dict()["scenario"], "summary": report.totals.to_dict(), "metrics": report.deterministic.to_dict(), "monte_carlo": ( report.monte_carlo.to_dict() if report.monte_carlo else { "available": False} ), } def _load_scenarios(self, project_id: int, filters: ReportFilters) -> list[Scenario]: scenarios = self._uow.scenarios.list_for_project( project_id, with_children=True) return [scenario for scenario in scenarios if filters.matches(scenario)] def _reload_scenario(self, scenario_id: int) -> Scenario: return self._uow.scenarios.get(scenario_id, with_children=True) def _build_scenario_report( self, scenario: Scenario, *, include_distribution: bool, include_samples: bool, iterations: int, percentiles: tuple[float, ...], ) -> ScenarioReport: cash_flows, totals = _build_cash_flows(scenario) deterministic = _calculate_deterministic_metrics( scenario, cash_flows, totals) monte_carlo: ScenarioMonteCarloResult | None = None if include_distribution: monte_carlo = _run_monte_carlo( scenario, cash_flows, include_samples=include_samples, iterations=iterations, percentiles=percentiles, ) return ScenarioReport( scenario=scenario, totals=totals, deterministic=deterministic, monte_carlo=monte_carlo, ) def _aggregate_project(self, reports: Sequence[ScenarioReport]) -> ProjectAggregates: total_inflows = sum(report.totals.inflows for report in reports) total_outflows = sum(report.totals.outflows for report in reports) total_net = sum(report.totals.net for report in reports) metrics: dict[str, AggregatedMetric] = {} for metric_name in ("npv", "irr", "payback_period"): values = [ getattr(report.deterministic, metric_name) for report in reports if getattr(report.deterministic, metric_name) is not None ] if values: metrics[metric_name] = AggregatedMetric( average=sum(values) / len(values), minimum=min(values), maximum=max(values), ) return ProjectAggregates( total_inflows=total_inflows, total_outflows=total_outflows, total_net=total_net, deterministic_metrics=metrics, ) def _build_comparisons( self, reports: Sequence[ScenarioReport] ) -> Mapping[str, MetricComparison]: comparisons: dict[str, MetricComparison] = {} for metric_name, direction in ( ("npv", "higher_is_better"), ("irr", "higher_is_better"), ("payback_period", "lower_is_better"), ): entries: list[tuple[int, str, float]] = [] for report in reports: value = getattr(report.deterministic, metric_name) if value is None: continue entries.append( (report.scenario.id, report.scenario.name, value)) if not entries: continue if direction == "higher_is_better": best = max(entries, key=lambda item: item[2]) worst = min(entries, key=lambda item: item[2]) else: best = min(entries, key=lambda item: item[2]) worst = max(entries, key=lambda item: item[2]) average = sum(item[2] for item in entries) / len(entries) comparisons[metric_name] = MetricComparison( metric=metric_name, direction=direction, best=best, worst=worst, average=average, ) return comparisons def build_project_summary_context( self, project: Project, filters: ReportFilters, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], request: Request, ) -> dict[str, object]: """Build template context for project summary page.""" scenarios = self._load_scenarios(project.id, filters) reports = [ self._build_scenario_report( scenario, include_distribution=include.distribution, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) for scenario in scenarios ] aggregates = self._aggregate_project(reports) return { "request": request, "project": _project_payload(project), "scenario_count": len(reports), "aggregates": aggregates.to_dict(), "scenarios": [report.to_dict() for report in reports], "filters": filters.to_dict(), "include_options": include, "iterations": iterations, "percentiles": percentiles, "title": f"Project Summary · {project.name}", "subtitle": "Aggregated financial and simulation insights across scenarios.", "actions": [ { "href": request.url_for( "reports.project_summary", project_id=project.id, ), "label": "Download JSON", } ], } def build_scenario_comparison_context( self, project: Project, scenarios: Sequence[Scenario], include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], request: Request, ) -> dict[str, object]: """Build template context for scenario comparison page.""" reports = [ self._build_scenario_report( self._reload_scenario(scenario.id), include_distribution=include.distribution, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) for scenario in scenarios ] comparison = { metric: data.to_dict() for metric, data in self._build_comparisons(reports).items() } comparison_json_url = request.url_for( "reports.project_scenario_comparison", project_id=project.id, ) scenario_ids = [str(s.id) for s in scenarios] comparison_query = urlencode( [("scenario_ids", str(identifier)) for identifier in scenario_ids] ) if comparison_query: comparison_json_url = f"{comparison_json_url}?{comparison_query}" return { "request": request, "project": _project_payload(project), "scenarios": [report.to_dict() for report in reports], "comparison": comparison, "include_options": include, "iterations": iterations, "percentiles": percentiles, "title": f"Scenario Comparison · {project.name}", "subtitle": "Evaluate deterministic metrics and Monte Carlo trends side by side.", "actions": [ { "href": comparison_json_url, "label": "Download JSON", } ], } def build_scenario_distribution_context( self, scenario: Scenario, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], request: Request, ) -> dict[str, object]: """Build template context for scenario distribution page.""" report = self._build_scenario_report( self._reload_scenario(scenario.id), include_distribution=True, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) return { "request": request, "scenario": report.to_dict()["scenario"], "summary": report.totals.to_dict(), "metrics": report.deterministic.to_dict(), "monte_carlo": ( report.monte_carlo.to_dict() if report.monte_carlo else { "available": False} ), "include_options": include, "iterations": iterations, "percentiles": percentiles, "title": f"Scenario Distribution · {scenario.name}", "subtitle": "Deterministic and simulated distributions for a single scenario.", "actions": [ { "href": request.url_for( "reports.scenario_distribution", scenario_id=scenario.id, ), "label": "Download JSON", } ], } def _build_cash_flows(scenario: Scenario) -> tuple[list[CashFlow], ScenarioFinancialTotals]: cash_flows: list[CashFlow] = [] by_category: dict[str, float] = {} inflows = 0.0 outflows = 0.0 net = 0.0 period_index = 0 for financial_input in scenario.financial_inputs or []: sign = _COST_CATEGORY_SIGNS.get(financial_input.category, -1.0) amount = float(financial_input.amount) * sign net += amount if amount >= 0: inflows += amount else: outflows += -amount by_category.setdefault(financial_input.category.value, 0.0) by_category[financial_input.category.value] += amount if financial_input.effective_date is not None: cash_flows.append( CashFlow(amount=amount, date=financial_input.effective_date) ) else: cash_flows.append( CashFlow(amount=amount, period_index=period_index)) period_index += 1 currency = scenario.currency if currency is None and scenario.financial_inputs: currency = scenario.financial_inputs[0].currency totals = ScenarioFinancialTotals( currency=currency, inflows=inflows, outflows=outflows, net=net, by_category=by_category, ) return cash_flows, totals def _calculate_deterministic_metrics( scenario: Scenario, cash_flows: Sequence[CashFlow], totals: ScenarioFinancialTotals, ) -> ScenarioDeterministicMetrics: notes: list[str] = [] discount_rate = _normalise_discount_rate(scenario.discount_rate) if scenario.discount_rate is None: notes.append( f"Discount rate not set; defaulted to {discount_rate:.2%}." ) if not cash_flows: notes.append( "No financial inputs available for deterministic metrics.") return ScenarioDeterministicMetrics( currency=totals.currency, discount_rate=discount_rate, compounds_per_year=1, npv=None, irr=None, payback_period=None, notes=notes, ) npv_value: float | None try: npv_value = net_present_value( discount_rate, cash_flows, compounds_per_year=1, ) except ValueError as exc: npv_value = None notes.append(f"NPV unavailable: {exc}.") irr_value: float | None try: irr_value = internal_rate_of_return( cash_flows, compounds_per_year=1, ) except (ValueError, ConvergenceError) as exc: irr_value = None notes.append(f"IRR unavailable: {exc}.") payback_value: float | None try: payback_value = payback_period( cash_flows, compounds_per_year=1, ) except (ValueError, PaybackNotReachedError) as exc: payback_value = None notes.append(f"Payback period unavailable: {exc}.") return ScenarioDeterministicMetrics( currency=totals.currency, discount_rate=discount_rate, compounds_per_year=1, npv=npv_value, irr=irr_value, payback_period=payback_value, notes=notes, ) def _run_monte_carlo( scenario: Scenario, cash_flows: Sequence[CashFlow], *, include_samples: bool, iterations: int, percentiles: tuple[float, ...], ) -> ScenarioMonteCarloResult: if not cash_flows: return ScenarioMonteCarloResult( available=False, notes=["No financial inputs available for Monte Carlo simulation."], ) discount_rate = _normalise_discount_rate(scenario.discount_rate) specs = [CashFlowSpec(cash_flow=flow) for flow in cash_flows] notes: list[str] = [] if not scenario.simulation_parameters: notes.append( "Scenario has no stochastic parameters; simulation mirrors deterministic cash flows." ) config = SimulationConfig( iterations=iterations, discount_rate=discount_rate, metrics=( SimulationMetric.NPV, SimulationMetric.IRR, SimulationMetric.PAYBACK, ), percentiles=percentiles, return_samples=include_samples, ) try: result = run_monte_carlo(specs, config) except Exception as exc: # pragma: no cover - safeguard for unexpected failures notes.append(f"Simulation failed: {exc}.") return ScenarioMonteCarloResult(available=False, notes=notes) return ScenarioMonteCarloResult( available=True, notes=notes, result=result, include_samples=include_samples, ) def _normalise_discount_rate(value: float | None) -> float: if value is None: return DEFAULT_DISCOUNT_RATE rate = float(value) if rate > 1.0: return rate / 100.0 return rate def _sanitize_float(value: float | None) -> float | None: if value is None: return None if math.isnan(value) or math.isinf(value): return None return float(value) def _round_optional(value: float | None, *, digits: int = 2) -> float | None: clean = _sanitize_float(value) if clean is None: return None return round(clean, digits) def _comparison_entry(entry: tuple[int, str, float] | None) -> dict[str, object] | None: if entry is None: return None scenario_id, name, value = entry return { "scenario_id": scenario_id, "name": name, "value": _round_optional(value), } def _project_payload(project: Project) -> dict[str, object]: return { "id": project.id, "name": project.name, "location": project.location, "operation_type": project.operation_type.value, "description": project.description, "created_at": project.created_at, "updated_at": project.updated_at, }