Files
calminer/services/reporting.py
zwitschi acf6f50bbd
Some checks failed
CI / lint (push) Successful in 15s
CI / build (push) Has been skipped
CI / test (push) Failing after 17s
CI / deploy (push) Has been skipped
feat: Add NPV comparison and distribution charts to reporting
- Implemented NPV comparison chart generation using Plotly in ReportingService.
- Added distribution histogram for Monte Carlo results.
- Updated reporting templates to include new charts and improved layout.
- Created new settings and currencies management pages.
- Enhanced sidebar navigation with dynamic URL handling.
- Improved CSS styles for chart containers and overall layout.
- Added new simulation and theme settings pages with placeholders for future features.
2025-11-12 19:39:27 +01:00

876 lines
28 KiB
Python

"""Reporting service layer aggregating deterministic and simulation metrics."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import date
import math
from typing import Mapping, Sequence
from urllib.parse import urlencode
import plotly.graph_objects as go
import plotly.io as pio
from fastapi import Request
from models import FinancialCategory, Project, Scenario
from services.financial import (
CashFlow,
ConvergenceError,
PaybackNotReachedError,
internal_rate_of_return,
net_present_value,
payback_period,
)
from services.simulation import (
CashFlowSpec,
SimulationConfig,
SimulationMetric,
SimulationResult,
run_monte_carlo,
)
from services.unit_of_work import UnitOfWork
DEFAULT_DISCOUNT_RATE = 0.1
DEFAULT_ITERATIONS = 500
DEFAULT_PERCENTILES: tuple[float, float, float] = (5.0, 50.0, 95.0)
_COST_CATEGORY_SIGNS: Mapping[FinancialCategory, float] = {
FinancialCategory.REVENUE: 1.0,
FinancialCategory.CAPITAL_EXPENDITURE: -1.0,
FinancialCategory.OPERATING_EXPENDITURE: -1.0,
FinancialCategory.CONTINGENCY: -1.0,
FinancialCategory.OTHER: -1.0,
}
@dataclass(frozen=True)
class IncludeOptions:
"""Flags controlling optional sections in report payloads."""
distribution: bool = False
samples: bool = False
@dataclass(slots=True)
class ReportFilters:
"""Filter parameters applied when selecting scenarios for a report."""
scenario_ids: set[int] | None = None
start_date: date | None = None
end_date: date | None = None
def matches(self, scenario: Scenario) -> bool:
if self.scenario_ids is not None and scenario.id not in self.scenario_ids:
return False
if self.start_date and scenario.start_date and scenario.start_date < self.start_date:
return False
if self.end_date and scenario.end_date and scenario.end_date > self.end_date:
return False
return True
def to_dict(self) -> dict[str, object]:
payload: dict[str, object] = {}
if self.scenario_ids is not None:
payload["scenario_ids"] = sorted(self.scenario_ids)
if self.start_date is not None:
payload["start_date"] = self.start_date
if self.end_date is not None:
payload["end_date"] = self.end_date
return payload
@dataclass(slots=True)
class ScenarioFinancialTotals:
currency: str | None
inflows: float
outflows: float
net: float
by_category: dict[str, float]
def to_dict(self) -> dict[str, object]:
return {
"currency": self.currency,
"inflows": _round_optional(self.inflows),
"outflows": _round_optional(self.outflows),
"net": _round_optional(self.net),
"by_category": {
key: _round_optional(value) for key, value in sorted(self.by_category.items())
},
}
@dataclass(slots=True)
class ScenarioDeterministicMetrics:
currency: str | None
discount_rate: float
compounds_per_year: int
npv: float | None
irr: float | None
payback_period: float | None
notes: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, object]:
return {
"currency": self.currency,
"discount_rate": _round_optional(self.discount_rate, digits=4),
"compounds_per_year": self.compounds_per_year,
"npv": _round_optional(self.npv),
"irr": _round_optional(self.irr, digits=6),
"payback_period": _round_optional(self.payback_period, digits=4),
"notes": self.notes,
}
@dataclass(slots=True)
class ScenarioMonteCarloResult:
available: bool
notes: list[str] = field(default_factory=list)
result: SimulationResult | None = None
include_samples: bool = False
def to_dict(self) -> dict[str, object]:
if not self.available or self.result is None:
return {
"available": False,
"notes": self.notes,
}
metrics: dict[str, dict[str, object]] = {}
for metric, summary in self.result.summaries.items():
metrics[metric.value] = {
"mean": _round_optional(summary.mean),
"std_dev": _round_optional(summary.std_dev),
"minimum": _round_optional(summary.minimum),
"maximum": _round_optional(summary.maximum),
"percentiles": {
f"{percentile:g}": _round_optional(value)
for percentile, value in sorted(summary.percentiles.items())
},
"sample_size": summary.sample_size,
"failed_runs": summary.failed_runs,
}
samples_payload: dict[str, list[float | None]] | None = None
if self.include_samples and self.result.samples:
samples_payload = {}
for metric, samples in self.result.samples.items():
samples_payload[metric.value] = [
_sanitize_float(sample) for sample in samples.tolist()
]
payload: dict[str, object] = {
"available": True,
"iterations": self.result.iterations,
"metrics": metrics,
"notes": self.notes,
}
if samples_payload:
payload["samples"] = samples_payload
return payload
@dataclass(slots=True)
class ScenarioReport:
scenario: Scenario
totals: ScenarioFinancialTotals
deterministic: ScenarioDeterministicMetrics
monte_carlo: ScenarioMonteCarloResult | None
def to_dict(self) -> dict[str, object]:
scenario_info = {
"id": self.scenario.id,
"project_id": self.scenario.project_id,
"name": self.scenario.name,
"description": self.scenario.description,
"status": self.scenario.status.value if hasattr(self.scenario.status, 'value') else self.scenario.status,
"start_date": self.scenario.start_date,
"end_date": self.scenario.end_date,
"currency": self.scenario.currency,
"primary_resource": self.scenario.primary_resource.value
if self.scenario.primary_resource and hasattr(self.scenario.primary_resource, 'value')
else self.scenario.primary_resource,
"discount_rate": _round_optional(self.deterministic.discount_rate, digits=4),
"created_at": self.scenario.created_at,
"updated_at": self.scenario.updated_at,
"simulation_parameter_count": len(self.scenario.simulation_parameters or []),
}
payload: dict[str, object] = {
"scenario": scenario_info,
"financials": self.totals.to_dict(),
"metrics": self.deterministic.to_dict(),
}
if self.monte_carlo is not None:
payload["monte_carlo"] = self.monte_carlo.to_dict()
return payload
@dataclass(slots=True)
class AggregatedMetric:
average: float | None
minimum: float | None
maximum: float | None
def to_dict(self) -> dict[str, object]:
return {
"average": _round_optional(self.average),
"minimum": _round_optional(self.minimum),
"maximum": _round_optional(self.maximum),
}
@dataclass(slots=True)
class ProjectAggregates:
total_inflows: float
total_outflows: float
total_net: float
deterministic_metrics: dict[str, AggregatedMetric]
def to_dict(self) -> dict[str, object]:
return {
"financials": {
"total_inflows": _round_optional(self.total_inflows),
"total_outflows": _round_optional(self.total_outflows),
"total_net": _round_optional(self.total_net),
},
"deterministic_metrics": {
metric: data.to_dict()
for metric, data in sorted(self.deterministic_metrics.items())
},
}
@dataclass(slots=True)
class MetricComparison:
metric: str
direction: str
best: tuple[int, str, float] | None
worst: tuple[int, str, float] | None
average: float | None
def to_dict(self) -> dict[str, object]:
return {
"metric": self.metric,
"direction": self.direction,
"best": _comparison_entry(self.best),
"worst": _comparison_entry(self.worst),
"average": _round_optional(self.average),
}
def parse_include_tokens(raw: str | None) -> IncludeOptions:
tokens: set[str] = set()
if raw:
for part in raw.split(","):
token = part.strip().lower()
if token:
tokens.add(token)
if "all" in tokens:
return IncludeOptions(distribution=True, samples=True)
return IncludeOptions(
distribution=bool({"distribution", "monte_carlo", "mc"} & tokens),
samples="samples" in tokens,
)
def validate_percentiles(values: Sequence[float] | None) -> tuple[float, ...]:
if not values:
return DEFAULT_PERCENTILES
seen: set[float] = set()
cleaned: list[float] = []
for value in values:
percentile = float(value)
if percentile < 0.0 or percentile > 100.0:
raise ValueError("Percentiles must be between 0 and 100.")
if percentile not in seen:
seen.add(percentile)
cleaned.append(percentile)
if not cleaned:
return DEFAULT_PERCENTILES
return tuple(cleaned)
class ReportingService:
"""Coordinates project and scenario reporting aggregation."""
def __init__(self, uow: UnitOfWork) -> None:
self._uow = uow
def project_summary(
self,
project: Project,
*,
filters: ReportFilters,
include: IncludeOptions,
iterations: int,
percentiles: tuple[float, ...],
) -> dict[str, object]:
scenarios = self._load_scenarios(project.id, filters)
reports = [
self._build_scenario_report(
scenario,
include_distribution=include.distribution,
include_samples=include.samples,
iterations=iterations,
percentiles=percentiles,
)
for scenario in scenarios
]
aggregates = self._aggregate_project(reports)
return {
"project": _project_payload(project),
"scenario_count": len(reports),
"filters": filters.to_dict(),
"aggregates": aggregates.to_dict(),
"scenarios": [report.to_dict() for report in reports],
}
def scenario_comparison(
self,
project: Project,
scenarios: Sequence[Scenario],
*,
include: IncludeOptions,
iterations: int,
percentiles: tuple[float, ...],
) -> dict[str, object]:
reports = [
self._build_scenario_report(
self._reload_scenario(scenario.id),
include_distribution=include.distribution,
include_samples=include.samples,
iterations=iterations,
percentiles=percentiles,
)
for scenario in scenarios
]
comparison = {
metric: data.to_dict()
for metric, data in self._build_comparisons(reports).items()
}
return {
"project": _project_payload(project),
"scenarios": [report.to_dict() for report in reports],
"comparison": comparison,
}
def scenario_distribution(
self,
scenario: Scenario,
*,
include: IncludeOptions,
iterations: int,
percentiles: tuple[float, ...],
) -> dict[str, object]:
report = self._build_scenario_report(
self._reload_scenario(scenario.id),
include_distribution=True,
include_samples=include.samples,
iterations=iterations,
percentiles=percentiles,
)
return {
"scenario": report.to_dict()["scenario"],
"summary": report.totals.to_dict(),
"metrics": report.deterministic.to_dict(),
"monte_carlo": (
report.monte_carlo.to_dict() if report.monte_carlo else {
"available": False}
),
}
def _load_scenarios(self, project_id: int, filters: ReportFilters) -> list[Scenario]:
scenarios = self._uow.scenarios.list_for_project(
project_id, with_children=True)
return [scenario for scenario in scenarios if filters.matches(scenario)]
def _reload_scenario(self, scenario_id: int) -> Scenario:
return self._uow.scenarios.get(scenario_id, with_children=True)
def _build_scenario_report(
self,
scenario: Scenario,
*,
include_distribution: bool,
include_samples: bool,
iterations: int,
percentiles: tuple[float, ...],
) -> ScenarioReport:
cash_flows, totals = _build_cash_flows(scenario)
deterministic = _calculate_deterministic_metrics(
scenario, cash_flows, totals)
monte_carlo: ScenarioMonteCarloResult | None = None
if include_distribution:
monte_carlo = _run_monte_carlo(
scenario,
cash_flows,
include_samples=include_samples,
iterations=iterations,
percentiles=percentiles,
)
return ScenarioReport(
scenario=scenario,
totals=totals,
deterministic=deterministic,
monte_carlo=monte_carlo,
)
def _aggregate_project(self, reports: Sequence[ScenarioReport]) -> ProjectAggregates:
total_inflows = sum(report.totals.inflows for report in reports)
total_outflows = sum(report.totals.outflows for report in reports)
total_net = sum(report.totals.net for report in reports)
metrics: dict[str, AggregatedMetric] = {}
for metric_name in ("npv", "irr", "payback_period"):
values = [
getattr(report.deterministic, metric_name)
for report in reports
if getattr(report.deterministic, metric_name) is not None
]
if values:
metrics[metric_name] = AggregatedMetric(
average=sum(values) / len(values),
minimum=min(values),
maximum=max(values),
)
return ProjectAggregates(
total_inflows=total_inflows,
total_outflows=total_outflows,
total_net=total_net,
deterministic_metrics=metrics,
)
def _build_comparisons(
self, reports: Sequence[ScenarioReport]
) -> Mapping[str, MetricComparison]:
comparisons: dict[str, MetricComparison] = {}
for metric_name, direction in (
("npv", "higher_is_better"),
("irr", "higher_is_better"),
("payback_period", "lower_is_better"),
):
entries: list[tuple[int, str, float]] = []
for report in reports:
value = getattr(report.deterministic, metric_name)
if value is None:
continue
entries.append(
(report.scenario.id, report.scenario.name, value))
if not entries:
continue
if direction == "higher_is_better":
best = max(entries, key=lambda item: item[2])
worst = min(entries, key=lambda item: item[2])
else:
best = min(entries, key=lambda item: item[2])
worst = max(entries, key=lambda item: item[2])
average = sum(item[2] for item in entries) / len(entries)
comparisons[metric_name] = MetricComparison(
metric=metric_name,
direction=direction,
best=best,
worst=worst,
average=average,
)
return comparisons
def build_project_summary_context(
self,
project: Project,
filters: ReportFilters,
include: IncludeOptions,
iterations: int,
percentiles: tuple[float, ...],
request: Request,
) -> dict[str, object]:
"""Build template context for project summary page."""
scenarios = self._load_scenarios(project.id, filters)
reports = [
self._build_scenario_report(
scenario,
include_distribution=include.distribution,
include_samples=include.samples,
iterations=iterations,
percentiles=percentiles,
)
for scenario in scenarios
]
aggregates = self._aggregate_project(reports)
return {
"request": request,
"project": _project_payload(project),
"scenario_count": len(reports),
"aggregates": aggregates.to_dict(),
"scenarios": [report.to_dict() for report in reports],
"filters": filters.to_dict(),
"include_options": include,
"iterations": iterations,
"percentiles": percentiles,
"title": f"Project Summary · {project.name}",
"subtitle": "Aggregated financial and simulation insights across scenarios.",
"actions": [
{
"href": request.url_for(
"reports.project_summary",
project_id=project.id,
),
"label": "Download JSON",
}
],
"chart_data": self._generate_npv_comparison_chart(reports),
}
def build_scenario_comparison_context(
self,
project: Project,
scenarios: Sequence[Scenario],
include: IncludeOptions,
iterations: int,
percentiles: tuple[float, ...],
request: Request,
) -> dict[str, object]:
"""Build template context for scenario comparison page."""
reports = [
self._build_scenario_report(
self._reload_scenario(scenario.id),
include_distribution=include.distribution,
include_samples=include.samples,
iterations=iterations,
percentiles=percentiles,
)
for scenario in scenarios
]
comparison = {
metric: data.to_dict()
for metric, data in self._build_comparisons(reports).items()
}
comparison_json_url = request.url_for(
"reports.project_scenario_comparison",
project_id=project.id,
)
scenario_ids = [str(s.id) for s in scenarios]
comparison_query = urlencode(
[("scenario_ids", str(identifier)) for identifier in scenario_ids]
)
if comparison_query:
comparison_json_url = f"{comparison_json_url}?{comparison_query}"
return {
"request": request,
"project": _project_payload(project),
"scenarios": [report.to_dict() for report in reports],
"comparison": comparison,
"include_options": include,
"iterations": iterations,
"percentiles": percentiles,
"title": f"Scenario Comparison · {project.name}",
"subtitle": "Evaluate deterministic metrics and Monte Carlo trends side by side.",
"actions": [
{
"href": comparison_json_url,
"label": "Download JSON",
}
],
}
def build_scenario_distribution_context(
self,
scenario: Scenario,
include: IncludeOptions,
iterations: int,
percentiles: tuple[float, ...],
request: Request,
) -> dict[str, object]:
"""Build template context for scenario distribution page."""
report = self._build_scenario_report(
self._reload_scenario(scenario.id),
include_distribution=True,
include_samples=include.samples,
iterations=iterations,
percentiles=percentiles,
)
return {
"request": request,
"scenario": report.to_dict()["scenario"],
"summary": report.totals.to_dict(),
"metrics": report.deterministic.to_dict(),
"monte_carlo": (
report.monte_carlo.to_dict() if report.monte_carlo else {
"available": False}
),
"include_options": include,
"iterations": iterations,
"percentiles": percentiles,
"title": f"Scenario Distribution · {scenario.name}",
"subtitle": "Deterministic and simulated distributions for a single scenario.",
"actions": [
{
"href": request.url_for(
"reports.scenario_distribution",
scenario_id=scenario.id,
),
"label": "Download JSON",
}
],
"chart_data": self._generate_distribution_histogram(report.monte_carlo) if report.monte_carlo else "{}",
}
def _generate_npv_comparison_chart(self, reports: Sequence[ScenarioReport]) -> str:
"""Generate Plotly chart JSON for NPV comparison across scenarios."""
scenario_names = []
npv_values = []
for report in reports:
scenario_names.append(report.scenario.name)
npv_values.append(report.deterministic.npv or 0)
fig = go.Figure(data=[
go.Bar(
x=scenario_names,
y=npv_values,
name='NPV',
marker_color='lightblue'
)
])
fig.update_layout(
title="NPV Comparison Across Scenarios",
xaxis_title="Scenario",
yaxis_title="NPV",
showlegend=False
)
return pio.to_json(fig) or "{}"
def _generate_distribution_histogram(self, monte_carlo: ScenarioMonteCarloResult) -> str:
"""Generate Plotly histogram for Monte Carlo distribution."""
if not monte_carlo.available or not monte_carlo.result or not monte_carlo.result.samples:
return "{}"
# Get NPV samples
npv_samples = monte_carlo.result.samples.get(SimulationMetric.NPV, [])
if len(npv_samples) == 0:
return "{}"
fig = go.Figure(data=[
go.Histogram(
x=npv_samples,
nbinsx=50,
name='NPV Distribution',
marker_color='lightgreen'
)
])
fig.update_layout(
title="Monte Carlo NPV Distribution",
xaxis_title="NPV",
yaxis_title="Frequency",
showlegend=False
)
return pio.to_json(fig) or "{}"
def _build_cash_flows(scenario: Scenario) -> tuple[list[CashFlow], ScenarioFinancialTotals]:
cash_flows: list[CashFlow] = []
by_category: dict[str, float] = {}
inflows = 0.0
outflows = 0.0
net = 0.0
period_index = 0
for financial_input in scenario.financial_inputs or []:
sign = _COST_CATEGORY_SIGNS.get(financial_input.category, -1.0)
amount = float(financial_input.amount) * sign
net += amount
if amount >= 0:
inflows += amount
else:
outflows += -amount
by_category.setdefault(financial_input.category.value, 0.0)
by_category[financial_input.category.value] += amount
if financial_input.effective_date is not None:
cash_flows.append(
CashFlow(amount=amount, date=financial_input.effective_date)
)
else:
cash_flows.append(
CashFlow(amount=amount, period_index=period_index))
period_index += 1
currency = scenario.currency
if currency is None and scenario.financial_inputs:
currency = scenario.financial_inputs[0].currency
totals = ScenarioFinancialTotals(
currency=currency,
inflows=inflows,
outflows=outflows,
net=net,
by_category=by_category,
)
return cash_flows, totals
def _calculate_deterministic_metrics(
scenario: Scenario,
cash_flows: Sequence[CashFlow],
totals: ScenarioFinancialTotals,
) -> ScenarioDeterministicMetrics:
notes: list[str] = []
discount_rate = _normalise_discount_rate(scenario.discount_rate)
if scenario.discount_rate is None:
notes.append(
f"Discount rate not set; defaulted to {discount_rate:.2%}."
)
if not cash_flows:
notes.append(
"No financial inputs available for deterministic metrics.")
return ScenarioDeterministicMetrics(
currency=totals.currency,
discount_rate=discount_rate,
compounds_per_year=1,
npv=None,
irr=None,
payback_period=None,
notes=notes,
)
npv_value: float | None
try:
npv_value = net_present_value(
discount_rate,
cash_flows,
compounds_per_year=1,
)
except ValueError as exc:
npv_value = None
notes.append(f"NPV unavailable: {exc}.")
irr_value: float | None
try:
irr_value = internal_rate_of_return(
cash_flows,
compounds_per_year=1,
)
except (ValueError, ConvergenceError) as exc:
irr_value = None
notes.append(f"IRR unavailable: {exc}.")
payback_value: float | None
try:
payback_value = payback_period(
cash_flows,
compounds_per_year=1,
)
except (ValueError, PaybackNotReachedError) as exc:
payback_value = None
notes.append(f"Payback period unavailable: {exc}.")
return ScenarioDeterministicMetrics(
currency=totals.currency,
discount_rate=discount_rate,
compounds_per_year=1,
npv=npv_value,
irr=irr_value,
payback_period=payback_value,
notes=notes,
)
def _run_monte_carlo(
scenario: Scenario,
cash_flows: Sequence[CashFlow],
*,
include_samples: bool,
iterations: int,
percentiles: tuple[float, ...],
) -> ScenarioMonteCarloResult:
if not cash_flows:
return ScenarioMonteCarloResult(
available=False,
notes=["No financial inputs available for Monte Carlo simulation."],
)
discount_rate = _normalise_discount_rate(scenario.discount_rate)
specs = [CashFlowSpec(cash_flow=flow) for flow in cash_flows]
notes: list[str] = []
if not scenario.simulation_parameters:
notes.append(
"Scenario has no stochastic parameters; simulation mirrors deterministic cash flows."
)
config = SimulationConfig(
iterations=iterations,
discount_rate=discount_rate,
metrics=(
SimulationMetric.NPV,
SimulationMetric.IRR,
SimulationMetric.PAYBACK,
),
percentiles=percentiles,
return_samples=include_samples,
)
try:
result = run_monte_carlo(specs, config)
except Exception as exc: # pragma: no cover - safeguard for unexpected failures
notes.append(f"Simulation failed: {exc}.")
return ScenarioMonteCarloResult(available=False, notes=notes)
return ScenarioMonteCarloResult(
available=True,
notes=notes,
result=result,
include_samples=include_samples,
)
def _normalise_discount_rate(value: float | None) -> float:
if value is None:
return DEFAULT_DISCOUNT_RATE
rate = float(value)
if rate > 1.0:
return rate / 100.0
return rate
def _sanitize_float(value: float | None) -> float | None:
if value is None:
return None
if math.isnan(value) or math.isinf(value):
return None
return float(value)
def _round_optional(value: float | None, *, digits: int = 2) -> float | None:
clean = _sanitize_float(value)
if clean is None:
return None
return round(clean, digits)
def _comparison_entry(entry: tuple[int, str, float] | None) -> dict[str, object] | None:
if entry is None:
return None
scenario_id, name, value = entry
return {
"scenario_id": scenario_id,
"name": name,
"value": _round_optional(value),
}
def _project_payload(project: Project) -> dict[str, object]:
return {
"id": project.id,
"name": project.name,
"location": project.location,
"operation_type": project.operation_type.value,
"description": project.description,
"created_at": project.created_at,
"updated_at": project.updated_at,
}