from __future__ import annotations from dataclasses import dataclass from enum import Enum from typing import Any, Dict, Mapping, Sequence import time import numpy as np from numpy.random import Generator, default_rng from .financial import ( CashFlow, ConvergenceError, PaybackNotReachedError, internal_rate_of_return, net_present_value, payback_period, ) from monitoring.metrics import observe_simulation class DistributionConfigError(ValueError): """Raised when a distribution specification is invalid.""" class SimulationMetric(Enum): """Supported Monte Carlo summary metrics.""" NPV = "npv" IRR = "irr" PAYBACK = "payback" class DistributionType(Enum): """Supported probability distribution families.""" NORMAL = "normal" LOGNORMAL = "lognormal" TRIANGULAR = "triangular" DISCRETE = "discrete" class DistributionSource(Enum): """Origins for parameter values when sourcing dynamically.""" STATIC = "static" SCENARIO_FIELD = "scenario_field" METADATA_KEY = "metadata_key" @dataclass(frozen=True, slots=True) class DistributionSpec: """Defines the stochastic behaviour for a single cash flow.""" type: DistributionType parameters: Mapping[str, Any] source: DistributionSource = DistributionSource.STATIC source_key: str | None = None @dataclass(frozen=True, slots=True) class CashFlowSpec: """Pairs a baseline cash flow with an optional distribution.""" cash_flow: CashFlow distribution: DistributionSpec | None = None @dataclass(frozen=True, slots=True) class SimulationConfig: """Controls Monte Carlo simulation behaviour.""" iterations: int discount_rate: float seed: int | None = None metrics: Sequence[SimulationMetric] = ( SimulationMetric.NPV, SimulationMetric.IRR, SimulationMetric.PAYBACK) percentiles: Sequence[float] = (5.0, 50.0, 95.0) compounds_per_year: int = 1 return_samples: bool = False residual_value: float | None = None residual_periods: float | None = None @dataclass(frozen=True, slots=True) class MetricSummary: """Aggregated statistics for a simulated metric.""" mean: float std_dev: float minimum: float maximum: float percentiles: Mapping[float, float] sample_size: int failed_runs: int @dataclass(frozen=True, slots=True) class SimulationResult: """Monte Carlo output including per-metric summaries.""" iterations: int summaries: Mapping[SimulationMetric, MetricSummary] samples: Mapping[SimulationMetric, np.ndarray] | None = None def run_monte_carlo( cash_flows: Sequence[CashFlowSpec], config: SimulationConfig, *, scenario_context: Mapping[str, Any] | None = None, metadata: Mapping[str, Any] | None = None, rng: Generator | None = None, ) -> SimulationResult: """Execute Monte Carlo simulation for the provided cash flows.""" if config.iterations <= 0: raise ValueError("iterations must be greater than zero") if config.compounds_per_year <= 0: raise ValueError("compounds_per_year must be greater than zero") for pct in config.percentiles: if pct < 0.0 or pct > 100.0: raise ValueError("percentiles must be within [0, 100]") start_time = time.time() try: generator = rng or default_rng(config.seed) metric_arrays: Dict[SimulationMetric, np.ndarray] = { metric: np.empty(config.iterations, dtype=float) for metric in config.metrics } for idx in range(config.iterations): iteration_flows = [ _realise_cash_flow( spec, generator, scenario_context=scenario_context, metadata=metadata, ) for spec in cash_flows ] if SimulationMetric.NPV in metric_arrays: metric_arrays[SimulationMetric.NPV][idx] = net_present_value( config.discount_rate, iteration_flows, residual_value=config.residual_value, residual_periods=config.residual_periods, compounds_per_year=config.compounds_per_year, ) if SimulationMetric.IRR in metric_arrays: try: metric_arrays[SimulationMetric.IRR][idx] = internal_rate_of_return( iteration_flows, compounds_per_year=config.compounds_per_year, ) except (ValueError, ConvergenceError): metric_arrays[SimulationMetric.IRR][idx] = np.nan if SimulationMetric.PAYBACK in metric_arrays: try: metric_arrays[SimulationMetric.PAYBACK][idx] = payback_period( iteration_flows, compounds_per_year=config.compounds_per_year, ) except (ValueError, PaybackNotReachedError): metric_arrays[SimulationMetric.PAYBACK][idx] = np.nan summaries = { metric: _summarise(metric_arrays[metric], config.percentiles) for metric in metric_arrays } samples = metric_arrays if config.return_samples else None result = SimulationResult( iterations=config.iterations, summaries=summaries, samples=samples, ) # Record successful simulation duration = time.time() - start_time observe_simulation( status="success", duration_seconds=duration, ) return result except Exception as e: # Record failed simulation duration = time.time() - start_time observe_simulation( status="error", duration_seconds=duration, ) raise def _realise_cash_flow( spec: CashFlowSpec, generator: Generator, *, scenario_context: Mapping[str, Any] | None, metadata: Mapping[str, Any] | None, ) -> CashFlow: if spec.distribution is None: return spec.cash_flow distribution = spec.distribution base_amount = spec.cash_flow.amount params = _resolve_parameters( distribution, base_amount, scenario_context=scenario_context, metadata=metadata, ) sample = _sample_distribution( distribution.type, params, generator, ) return CashFlow( amount=float(sample), period_index=spec.cash_flow.period_index, date=spec.cash_flow.date, ) def _resolve_parameters( distribution: DistributionSpec, base_amount: float, *, scenario_context: Mapping[str, Any] | None, metadata: Mapping[str, Any] | None, ) -> Dict[str, Any]: params = dict(distribution.parameters) if distribution.source == DistributionSource.SCENARIO_FIELD: if distribution.source_key is None: raise DistributionConfigError( "source_key is required for scenario_field sourcing") if not scenario_context or distribution.source_key not in scenario_context: raise DistributionConfigError( f"scenario field '{distribution.source_key}' not found for distribution" ) params.setdefault("mean", float( scenario_context[distribution.source_key])) elif distribution.source == DistributionSource.METADATA_KEY: if distribution.source_key is None: raise DistributionConfigError( "source_key is required for metadata_key sourcing") if not metadata or distribution.source_key not in metadata: raise DistributionConfigError( f"metadata key '{distribution.source_key}' not found for distribution" ) params.setdefault("mean", float(metadata[distribution.source_key])) else: params.setdefault("mean", float(base_amount)) return params def _sample_distribution( distribution_type: DistributionType, params: Mapping[str, Any], generator: Generator, ) -> float: if distribution_type is DistributionType.NORMAL: return _sample_normal(params, generator) if distribution_type is DistributionType.LOGNORMAL: return _sample_lognormal(params, generator) if distribution_type is DistributionType.TRIANGULAR: return _sample_triangular(params, generator) if distribution_type is DistributionType.DISCRETE: return _sample_discrete(params, generator) raise DistributionConfigError( f"Unsupported distribution type: {distribution_type}") def _sample_normal(params: Mapping[str, Any], generator: Generator) -> float: if "std_dev" not in params: raise DistributionConfigError("normal distribution requires 'std_dev'") std_dev = float(params["std_dev"]) if std_dev < 0: raise DistributionConfigError("std_dev must be non-negative") mean = float(params.get("mean", 0.0)) if std_dev == 0: return mean return float(generator.normal(loc=mean, scale=std_dev)) def _sample_lognormal(params: Mapping[str, Any], generator: Generator) -> float: if "sigma" not in params: raise DistributionConfigError( "lognormal distribution requires 'sigma'") sigma = float(params["sigma"]) if sigma < 0: raise DistributionConfigError("sigma must be non-negative") if "mean" not in params: raise DistributionConfigError( "lognormal distribution requires 'mean' (mu in log space)") mean = float(params["mean"]) return float(generator.lognormal(mean=mean, sigma=sigma)) def _sample_triangular(params: Mapping[str, Any], generator: Generator) -> float: required = {"min", "mode", "max"} if not required.issubset(params): missing = ", ".join(sorted(required - params.keys())) raise DistributionConfigError( f"triangular distribution missing parameters: {missing}") left = float(params["min"]) mode = float(params["mode"]) right = float(params["max"]) if not (left <= mode <= right): raise DistributionConfigError( "triangular distribution requires min <= mode <= max") if left == right: return mode return float(generator.triangular(left=left, mode=mode, right=right)) def _sample_discrete(params: Mapping[str, Any], generator: Generator) -> float: values = params.get("values") probabilities = params.get("probabilities") if not isinstance(values, Sequence) or not isinstance(probabilities, Sequence): raise DistributionConfigError( "discrete distribution requires 'values' and 'probabilities' sequences") if len(values) != len(probabilities) or not values: raise DistributionConfigError( "values and probabilities must be non-empty and of equal length") probs = np.array(probabilities, dtype=float) if np.any(probs < 0): raise DistributionConfigError("probabilities must be non-negative") total = probs.sum() if not np.isclose(total, 1.0): raise DistributionConfigError("probabilities must sum to 1.0") probs = probs / total choices = np.array(values, dtype=float) return float(generator.choice(choices, p=probs)) def _summarise(values: np.ndarray, percentiles: Sequence[float]) -> MetricSummary: clean = values[~np.isnan(values)] sample_size = clean.size failed_runs = values.size - sample_size if sample_size == 0: percentile_map: Dict[float, float] = { pct: float("nan") for pct in percentiles} return MetricSummary( mean=float("nan"), std_dev=float("nan"), minimum=float("nan"), maximum=float("nan"), percentiles=percentile_map, sample_size=0, failed_runs=failed_runs, ) percentile_map = { pct: float(np.percentile(clean, pct)) for pct in percentiles } return MetricSummary( mean=float(np.mean(clean)), std_dev=float(np.std(clean, ddof=1)) if sample_size > 1 else 0.0, minimum=float(np.min(clean)), maximum=float(np.max(clean)), percentiles=percentile_map, sample_size=sample_size, failed_runs=failed_runs, )