"""Reporting service layer aggregating deterministic and simulation metrics.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import date import math from typing import Mapping, Sequence from urllib.parse import urlencode import plotly.graph_objects as go import plotly.io as pio from fastapi import Request from models import FinancialCategory, Project, Scenario from services.financial import ( CashFlow, ConvergenceError, PaybackNotReachedError, internal_rate_of_return, net_present_value, payback_period, ) from services.simulation import ( CashFlowSpec, SimulationConfig, SimulationMetric, SimulationResult, run_monte_carlo, ) from services.unit_of_work import UnitOfWork DEFAULT_DISCOUNT_RATE = 0.1 DEFAULT_ITERATIONS = 500 DEFAULT_PERCENTILES: tuple[float, float, float] = (5.0, 50.0, 95.0) _COST_CATEGORY_SIGNS: Mapping[FinancialCategory, float] = { FinancialCategory.REVENUE: 1.0, FinancialCategory.CAPITAL_EXPENDITURE: -1.0, FinancialCategory.OPERATING_EXPENDITURE: -1.0, FinancialCategory.CONTINGENCY: -1.0, FinancialCategory.OTHER: -1.0, } @dataclass(frozen=True) class IncludeOptions: """Flags controlling optional sections in report payloads.""" distribution: bool = False samples: bool = False @dataclass(slots=True) class ReportFilters: """Filter parameters applied when selecting scenarios for a report.""" scenario_ids: set[int] | None = None start_date: date | None = None end_date: date | None = None def matches(self, scenario: Scenario) -> bool: if self.scenario_ids is not None and scenario.id not in self.scenario_ids: return False if self.start_date and scenario.start_date and scenario.start_date < self.start_date: return False if self.end_date and scenario.end_date and scenario.end_date > self.end_date: return False return True def to_dict(self) -> dict[str, object]: payload: dict[str, object] = {} if self.scenario_ids is not None: payload["scenario_ids"] = sorted(self.scenario_ids) if self.start_date is not None: payload["start_date"] = self.start_date if self.end_date is not None: payload["end_date"] = self.end_date return payload @dataclass(slots=True) class ScenarioFinancialTotals: currency: str | None inflows: float outflows: float net: float by_category: dict[str, float] def to_dict(self) -> dict[str, object]: return { "currency": self.currency, "inflows": _round_optional(self.inflows), "outflows": _round_optional(self.outflows), "net": _round_optional(self.net), "by_category": { key: _round_optional(value) for key, value in sorted(self.by_category.items()) }, } @dataclass(slots=True) class ScenarioDeterministicMetrics: currency: str | None discount_rate: float compounds_per_year: int npv: float | None irr: float | None payback_period: float | None notes: list[str] = field(default_factory=list) def to_dict(self) -> dict[str, object]: return { "currency": self.currency, "discount_rate": _round_optional(self.discount_rate, digits=4), "compounds_per_year": self.compounds_per_year, "npv": _round_optional(self.npv), "irr": _round_optional(self.irr, digits=6), "payback_period": _round_optional(self.payback_period, digits=4), "notes": self.notes, } @dataclass(slots=True) class ScenarioMonteCarloResult: available: bool notes: list[str] = field(default_factory=list) result: SimulationResult | None = None include_samples: bool = False def to_dict(self) -> dict[str, object]: if not self.available or self.result is None: return { "available": False, "notes": self.notes, } metrics: dict[str, dict[str, object]] = {} for metric, summary in self.result.summaries.items(): metrics[metric.value] = { "mean": _round_optional(summary.mean), "std_dev": _round_optional(summary.std_dev), "minimum": _round_optional(summary.minimum), "maximum": _round_optional(summary.maximum), "percentiles": { f"{percentile:g}": _round_optional(value) for percentile, value in sorted(summary.percentiles.items()) }, "sample_size": summary.sample_size, "failed_runs": summary.failed_runs, } samples_payload: dict[str, list[float | None]] | None = None if self.include_samples and self.result.samples: samples_payload = {} for metric, samples in self.result.samples.items(): samples_payload[metric.value] = [ _sanitize_float(sample) for sample in samples.tolist() ] payload: dict[str, object] = { "available": True, "iterations": self.result.iterations, "metrics": metrics, "notes": self.notes, } if samples_payload: payload["samples"] = samples_payload return payload @dataclass(slots=True) class ScenarioReport: scenario: Scenario totals: ScenarioFinancialTotals deterministic: ScenarioDeterministicMetrics monte_carlo: ScenarioMonteCarloResult | None def to_dict(self) -> dict[str, object]: scenario_info = { "id": self.scenario.id, "project_id": self.scenario.project_id, "name": self.scenario.name, "description": self.scenario.description, "status": self.scenario.status.value if hasattr(self.scenario.status, 'value') else self.scenario.status, "start_date": self.scenario.start_date, "end_date": self.scenario.end_date, "currency": self.scenario.currency, "primary_resource": self.scenario.primary_resource.value if self.scenario.primary_resource and hasattr(self.scenario.primary_resource, 'value') else self.scenario.primary_resource, "discount_rate": _round_optional(self.deterministic.discount_rate, digits=4), "created_at": self.scenario.created_at, "updated_at": self.scenario.updated_at, "simulation_parameter_count": len(self.scenario.simulation_parameters or []), } payload: dict[str, object] = { "scenario": scenario_info, "financials": self.totals.to_dict(), "metrics": self.deterministic.to_dict(), } if self.monte_carlo is not None: payload["monte_carlo"] = self.monte_carlo.to_dict() return payload @dataclass(slots=True) class AggregatedMetric: average: float | None minimum: float | None maximum: float | None def to_dict(self) -> dict[str, object]: return { "average": _round_optional(self.average), "minimum": _round_optional(self.minimum), "maximum": _round_optional(self.maximum), } @dataclass(slots=True) class ProjectAggregates: total_inflows: float total_outflows: float total_net: float deterministic_metrics: dict[str, AggregatedMetric] def to_dict(self) -> dict[str, object]: return { "financials": { "total_inflows": _round_optional(self.total_inflows), "total_outflows": _round_optional(self.total_outflows), "total_net": _round_optional(self.total_net), }, "deterministic_metrics": { metric: data.to_dict() for metric, data in sorted(self.deterministic_metrics.items()) }, } @dataclass(slots=True) class MetricComparison: metric: str direction: str best: tuple[int, str, float] | None worst: tuple[int, str, float] | None average: float | None def to_dict(self) -> dict[str, object]: return { "metric": self.metric, "direction": self.direction, "best": _comparison_entry(self.best), "worst": _comparison_entry(self.worst), "average": _round_optional(self.average), } def parse_include_tokens(raw: str | None) -> IncludeOptions: tokens: set[str] = set() if raw: for part in raw.split(","): token = part.strip().lower() if token: tokens.add(token) if "all" in tokens: return IncludeOptions(distribution=True, samples=True) return IncludeOptions( distribution=bool({"distribution", "monte_carlo", "mc"} & tokens), samples="samples" in tokens, ) def validate_percentiles(values: Sequence[float] | None) -> tuple[float, ...]: if not values: return DEFAULT_PERCENTILES seen: set[float] = set() cleaned: list[float] = [] for value in values: percentile = float(value) if percentile < 0.0 or percentile > 100.0: raise ValueError("Percentiles must be between 0 and 100.") if percentile not in seen: seen.add(percentile) cleaned.append(percentile) if not cleaned: return DEFAULT_PERCENTILES return tuple(cleaned) class ReportingService: """Coordinates project and scenario reporting aggregation.""" def __init__(self, uow: UnitOfWork) -> None: self._uow = uow def project_summary( self, project: Project, *, filters: ReportFilters, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], ) -> dict[str, object]: scenarios = self._load_scenarios(project.id, filters) reports = [ self._build_scenario_report( scenario, include_distribution=include.distribution, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) for scenario in scenarios ] aggregates = self._aggregate_project(reports) return { "project": _project_payload(project), "scenario_count": len(reports), "filters": filters.to_dict(), "aggregates": aggregates.to_dict(), "scenarios": [report.to_dict() for report in reports], } def scenario_comparison( self, project: Project, scenarios: Sequence[Scenario], *, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], ) -> dict[str, object]: reports = [ self._build_scenario_report( self._reload_scenario(scenario.id), include_distribution=include.distribution, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) for scenario in scenarios ] comparison = { metric: data.to_dict() for metric, data in self._build_comparisons(reports).items() } return { "project": _project_payload(project), "scenarios": [report.to_dict() for report in reports], "comparison": comparison, } def scenario_distribution( self, scenario: Scenario, *, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], ) -> dict[str, object]: report = self._build_scenario_report( self._reload_scenario(scenario.id), include_distribution=True, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) return { "scenario": report.to_dict()["scenario"], "summary": report.totals.to_dict(), "metrics": report.deterministic.to_dict(), "monte_carlo": ( report.monte_carlo.to_dict() if report.monte_carlo else { "available": False} ), } def _load_scenarios(self, project_id: int, filters: ReportFilters) -> list[Scenario]: scenarios = self._uow.scenarios.list_for_project( project_id, with_children=True) return [scenario for scenario in scenarios if filters.matches(scenario)] def _reload_scenario(self, scenario_id: int) -> Scenario: return self._uow.scenarios.get(scenario_id, with_children=True) def _build_scenario_report( self, scenario: Scenario, *, include_distribution: bool, include_samples: bool, iterations: int, percentiles: tuple[float, ...], ) -> ScenarioReport: cash_flows, totals = _build_cash_flows(scenario) deterministic = _calculate_deterministic_metrics( scenario, cash_flows, totals) monte_carlo: ScenarioMonteCarloResult | None = None if include_distribution: monte_carlo = _run_monte_carlo( scenario, cash_flows, include_samples=include_samples, iterations=iterations, percentiles=percentiles, ) return ScenarioReport( scenario=scenario, totals=totals, deterministic=deterministic, monte_carlo=monte_carlo, ) def _aggregate_project(self, reports: Sequence[ScenarioReport]) -> ProjectAggregates: total_inflows = sum(report.totals.inflows for report in reports) total_outflows = sum(report.totals.outflows for report in reports) total_net = sum(report.totals.net for report in reports) metrics: dict[str, AggregatedMetric] = {} for metric_name in ("npv", "irr", "payback_period"): values = [ getattr(report.deterministic, metric_name) for report in reports if getattr(report.deterministic, metric_name) is not None ] if values: metrics[metric_name] = AggregatedMetric( average=sum(values) / len(values), minimum=min(values), maximum=max(values), ) return ProjectAggregates( total_inflows=total_inflows, total_outflows=total_outflows, total_net=total_net, deterministic_metrics=metrics, ) def _build_comparisons( self, reports: Sequence[ScenarioReport] ) -> Mapping[str, MetricComparison]: comparisons: dict[str, MetricComparison] = {} for metric_name, direction in ( ("npv", "higher_is_better"), ("irr", "higher_is_better"), ("payback_period", "lower_is_better"), ): entries: list[tuple[int, str, float]] = [] for report in reports: value = getattr(report.deterministic, metric_name) if value is None: continue entries.append( (report.scenario.id, report.scenario.name, value)) if not entries: continue if direction == "higher_is_better": best = max(entries, key=lambda item: item[2]) worst = min(entries, key=lambda item: item[2]) else: best = min(entries, key=lambda item: item[2]) worst = max(entries, key=lambda item: item[2]) average = sum(item[2] for item in entries) / len(entries) comparisons[metric_name] = MetricComparison( metric=metric_name, direction=direction, best=best, worst=worst, average=average, ) return comparisons def build_project_summary_context( self, project: Project, filters: ReportFilters, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], request: Request, ) -> dict[str, object]: """Build template context for project summary page.""" scenarios = self._load_scenarios(project.id, filters) reports = [ self._build_scenario_report( scenario, include_distribution=include.distribution, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) for scenario in scenarios ] aggregates = self._aggregate_project(reports) return { "request": request, "project": _project_payload(project), "scenario_count": len(reports), "aggregates": aggregates.to_dict(), "scenarios": [report.to_dict() for report in reports], "filters": filters.to_dict(), "include_options": include, "iterations": iterations, "percentiles": percentiles, "title": f"Project Summary · {project.name}", "subtitle": "Aggregated financial and simulation insights across scenarios.", "actions": [ { "href": request.url_for( "reports.project_summary", project_id=project.id, ), "label": "Download JSON", } ], "chart_data": self._generate_npv_comparison_chart(reports), } def build_scenario_comparison_context( self, project: Project, scenarios: Sequence[Scenario], include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], request: Request, ) -> dict[str, object]: """Build template context for scenario comparison page.""" reports = [ self._build_scenario_report( self._reload_scenario(scenario.id), include_distribution=include.distribution, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) for scenario in scenarios ] comparison = { metric: data.to_dict() for metric, data in self._build_comparisons(reports).items() } comparison_json_url = request.url_for( "reports.project_scenario_comparison", project_id=project.id, ) scenario_ids = [str(s.id) for s in scenarios] comparison_query = urlencode( [("scenario_ids", str(identifier)) for identifier in scenario_ids] ) if comparison_query: comparison_json_url = f"{comparison_json_url}?{comparison_query}" return { "request": request, "project": _project_payload(project), "scenarios": [report.to_dict() for report in reports], "comparison": comparison, "include_options": include, "iterations": iterations, "percentiles": percentiles, "title": f"Scenario Comparison · {project.name}", "subtitle": "Evaluate deterministic metrics and Monte Carlo trends side by side.", "actions": [ { "href": comparison_json_url, "label": "Download JSON", } ], } def build_scenario_distribution_context( self, scenario: Scenario, include: IncludeOptions, iterations: int, percentiles: tuple[float, ...], request: Request, ) -> dict[str, object]: """Build template context for scenario distribution page.""" report = self._build_scenario_report( self._reload_scenario(scenario.id), include_distribution=True, include_samples=include.samples, iterations=iterations, percentiles=percentiles, ) return { "request": request, "scenario": report.to_dict()["scenario"], "summary": report.totals.to_dict(), "metrics": report.deterministic.to_dict(), "monte_carlo": ( report.monte_carlo.to_dict() if report.monte_carlo else { "available": False} ), "include_options": include, "iterations": iterations, "percentiles": percentiles, "title": f"Scenario Distribution · {scenario.name}", "subtitle": "Deterministic and simulated distributions for a single scenario.", "actions": [ { "href": request.url_for( "reports.scenario_distribution", scenario_id=scenario.id, ), "label": "Download JSON", } ], "chart_data": self._generate_distribution_histogram(report.monte_carlo) if report.monte_carlo else "{}", } def _generate_npv_comparison_chart(self, reports: Sequence[ScenarioReport]) -> str: """Generate Plotly chart JSON for NPV comparison across scenarios.""" scenario_names = [] npv_values = [] for report in reports: scenario_names.append(report.scenario.name) npv_values.append(report.deterministic.npv or 0) fig = go.Figure(data=[ go.Bar( x=scenario_names, y=npv_values, name='NPV', marker_color='lightblue' ) ]) fig.update_layout( title="NPV Comparison Across Scenarios", xaxis_title="Scenario", yaxis_title="NPV", showlegend=False ) return pio.to_json(fig) or "{}" def _generate_distribution_histogram(self, monte_carlo: ScenarioMonteCarloResult) -> str: """Generate Plotly histogram for Monte Carlo distribution.""" if not monte_carlo.available or not monte_carlo.result or not monte_carlo.result.samples: return "{}" # Get NPV samples npv_samples = monte_carlo.result.samples.get(SimulationMetric.NPV, []) if len(npv_samples) == 0: return "{}" fig = go.Figure(data=[ go.Histogram( x=npv_samples, nbinsx=50, name='NPV Distribution', marker_color='lightgreen' ) ]) fig.update_layout( title="Monte Carlo NPV Distribution", xaxis_title="NPV", yaxis_title="Frequency", showlegend=False ) return pio.to_json(fig) or "{}" def _build_cash_flows(scenario: Scenario) -> tuple[list[CashFlow], ScenarioFinancialTotals]: cash_flows: list[CashFlow] = [] by_category: dict[str, float] = {} inflows = 0.0 outflows = 0.0 net = 0.0 period_index = 0 for financial_input in scenario.financial_inputs or []: sign = _COST_CATEGORY_SIGNS.get(financial_input.category, -1.0) amount = float(financial_input.amount) * sign net += amount if amount >= 0: inflows += amount else: outflows += -amount by_category.setdefault(financial_input.category.value, 0.0) by_category[financial_input.category.value] += amount if financial_input.effective_date is not None: cash_flows.append( CashFlow(amount=amount, date=financial_input.effective_date) ) else: cash_flows.append( CashFlow(amount=amount, period_index=period_index)) period_index += 1 currency = scenario.currency if currency is None and scenario.financial_inputs: currency = scenario.financial_inputs[0].currency totals = ScenarioFinancialTotals( currency=currency, inflows=inflows, outflows=outflows, net=net, by_category=by_category, ) return cash_flows, totals def _calculate_deterministic_metrics( scenario: Scenario, cash_flows: Sequence[CashFlow], totals: ScenarioFinancialTotals, ) -> ScenarioDeterministicMetrics: notes: list[str] = [] discount_rate = _normalise_discount_rate(scenario.discount_rate) if scenario.discount_rate is None: notes.append( f"Discount rate not set; defaulted to {discount_rate:.2%}." ) if not cash_flows: notes.append( "No financial inputs available for deterministic metrics.") return ScenarioDeterministicMetrics( currency=totals.currency, discount_rate=discount_rate, compounds_per_year=1, npv=None, irr=None, payback_period=None, notes=notes, ) npv_value: float | None try: npv_value = net_present_value( discount_rate, cash_flows, compounds_per_year=1, ) except ValueError as exc: npv_value = None notes.append(f"NPV unavailable: {exc}.") irr_value: float | None try: irr_value = internal_rate_of_return( cash_flows, compounds_per_year=1, ) except (ValueError, ConvergenceError) as exc: irr_value = None notes.append(f"IRR unavailable: {exc}.") payback_value: float | None try: payback_value = payback_period( cash_flows, compounds_per_year=1, ) except (ValueError, PaybackNotReachedError) as exc: payback_value = None notes.append(f"Payback period unavailable: {exc}.") return ScenarioDeterministicMetrics( currency=totals.currency, discount_rate=discount_rate, compounds_per_year=1, npv=npv_value, irr=irr_value, payback_period=payback_value, notes=notes, ) def _run_monte_carlo( scenario: Scenario, cash_flows: Sequence[CashFlow], *, include_samples: bool, iterations: int, percentiles: tuple[float, ...], ) -> ScenarioMonteCarloResult: if not cash_flows: return ScenarioMonteCarloResult( available=False, notes=["No financial inputs available for Monte Carlo simulation."], ) discount_rate = _normalise_discount_rate(scenario.discount_rate) specs = [CashFlowSpec(cash_flow=flow) for flow in cash_flows] notes: list[str] = [] if not scenario.simulation_parameters: notes.append( "Scenario has no stochastic parameters; simulation mirrors deterministic cash flows." ) config = SimulationConfig( iterations=iterations, discount_rate=discount_rate, metrics=( SimulationMetric.NPV, SimulationMetric.IRR, SimulationMetric.PAYBACK, ), percentiles=percentiles, return_samples=include_samples, ) try: result = run_monte_carlo(specs, config) except Exception as exc: # pragma: no cover - safeguard for unexpected failures notes.append(f"Simulation failed: {exc}.") return ScenarioMonteCarloResult(available=False, notes=notes) return ScenarioMonteCarloResult( available=True, notes=notes, result=result, include_samples=include_samples, ) def _normalise_discount_rate(value: float | None) -> float: if value is None: return DEFAULT_DISCOUNT_RATE rate = float(value) if rate > 1.0: return rate / 100.0 return rate def _sanitize_float(value: float | None) -> float | None: if value is None: return None if math.isnan(value) or math.isinf(value): return None return float(value) def _round_optional(value: float | None, *, digits: int = 2) -> float | None: clean = _sanitize_float(value) if clean is None: return None return round(clean, digits) def _comparison_entry(entry: tuple[int, str, float] | None) -> dict[str, object] | None: if entry is None: return None scenario_id, name, value = entry return { "scenario_id": scenario_id, "name": name, "value": _round_optional(value), } def _project_payload(project: Project) -> dict[str, object]: return { "id": project.id, "name": project.name, "location": project.location, "operation_type": project.operation_type.value, "description": project.description, "created_at": project.created_at, "updated_at": project.updated_at, }