Files
calminer/services/simulation.py
zwitschi 795a9f99f4 feat: Enhance currency handling and validation across scenarios
- Updated form template to prefill currency input with default value and added help text for clarity.
- Modified integration tests to assert more descriptive error messages for invalid currency codes.
- Introduced new tests for currency normalization and validation in various scenarios, including imports and exports.
- Added comprehensive tests for pricing calculations, ensuring defaults are respected and overrides function correctly.
- Implemented unit tests for pricing settings repository, ensuring CRUD operations and default settings are handled properly.
- Enhanced scenario pricing evaluation tests to validate currency handling and metadata defaults.
- Added simulation tests to ensure Monte Carlo runs are accurate and handle various distribution scenarios.
2025-11-11 18:29:59 +01:00

353 lines
11 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Iterable, Mapping, Sequence
import numpy as np
from numpy.random import Generator, default_rng
from .financial import (
CashFlow,
ConvergenceError,
PaybackNotReachedError,
internal_rate_of_return,
net_present_value,
payback_period,
)
class DistributionConfigError(ValueError):
"""Raised when a distribution specification is invalid."""
class SimulationMetric(Enum):
"""Supported Monte Carlo summary metrics."""
NPV = "npv"
IRR = "irr"
PAYBACK = "payback"
class DistributionType(Enum):
"""Supported probability distribution families."""
NORMAL = "normal"
LOGNORMAL = "lognormal"
TRIANGULAR = "triangular"
DISCRETE = "discrete"
class DistributionSource(Enum):
"""Origins for parameter values when sourcing dynamically."""
STATIC = "static"
SCENARIO_FIELD = "scenario_field"
METADATA_KEY = "metadata_key"
@dataclass(frozen=True, slots=True)
class DistributionSpec:
"""Defines the stochastic behaviour for a single cash flow."""
type: DistributionType
parameters: Mapping[str, Any]
source: DistributionSource = DistributionSource.STATIC
source_key: str | None = None
@dataclass(frozen=True, slots=True)
class CashFlowSpec:
"""Pairs a baseline cash flow with an optional distribution."""
cash_flow: CashFlow
distribution: DistributionSpec | None = None
@dataclass(frozen=True, slots=True)
class SimulationConfig:
"""Controls Monte Carlo simulation behaviour."""
iterations: int
discount_rate: float
seed: int | None = None
metrics: Sequence[SimulationMetric] = (
SimulationMetric.NPV, SimulationMetric.IRR, SimulationMetric.PAYBACK)
percentiles: Sequence[float] = (5.0, 50.0, 95.0)
compounds_per_year: int = 1
return_samples: bool = False
residual_value: float | None = None
residual_periods: float | None = None
@dataclass(frozen=True, slots=True)
class MetricSummary:
"""Aggregated statistics for a simulated metric."""
mean: float
std_dev: float
minimum: float
maximum: float
percentiles: Mapping[float, float]
sample_size: int
failed_runs: int
@dataclass(frozen=True, slots=True)
class SimulationResult:
"""Monte Carlo output including per-metric summaries."""
iterations: int
summaries: Mapping[SimulationMetric, MetricSummary]
samples: Mapping[SimulationMetric, np.ndarray] | None = None
def run_monte_carlo(
cash_flows: Sequence[CashFlowSpec],
config: SimulationConfig,
*,
scenario_context: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
rng: Generator | None = None,
) -> SimulationResult:
"""Execute Monte Carlo simulation for the provided cash flows."""
if config.iterations <= 0:
raise ValueError("iterations must be greater than zero")
if config.compounds_per_year <= 0:
raise ValueError("compounds_per_year must be greater than zero")
for pct in config.percentiles:
if pct < 0.0 or pct > 100.0:
raise ValueError("percentiles must be within [0, 100]")
generator = rng or default_rng(config.seed)
metric_arrays: Dict[SimulationMetric, np.ndarray] = {
metric: np.empty(config.iterations, dtype=float)
for metric in config.metrics
}
for idx in range(config.iterations):
iteration_flows = [
_realise_cash_flow(
spec,
generator,
scenario_context=scenario_context,
metadata=metadata,
)
for spec in cash_flows
]
if SimulationMetric.NPV in metric_arrays:
metric_arrays[SimulationMetric.NPV][idx] = net_present_value(
config.discount_rate,
iteration_flows,
residual_value=config.residual_value,
residual_periods=config.residual_periods,
compounds_per_year=config.compounds_per_year,
)
if SimulationMetric.IRR in metric_arrays:
try:
metric_arrays[SimulationMetric.IRR][idx] = internal_rate_of_return(
iteration_flows,
compounds_per_year=config.compounds_per_year,
)
except (ValueError, ConvergenceError):
metric_arrays[SimulationMetric.IRR][idx] = np.nan
if SimulationMetric.PAYBACK in metric_arrays:
try:
metric_arrays[SimulationMetric.PAYBACK][idx] = payback_period(
iteration_flows,
compounds_per_year=config.compounds_per_year,
)
except (ValueError, PaybackNotReachedError):
metric_arrays[SimulationMetric.PAYBACK][idx] = np.nan
summaries = {
metric: _summarise(metric_arrays[metric], config.percentiles)
for metric in metric_arrays
}
samples = metric_arrays if config.return_samples else None
return SimulationResult(
iterations=config.iterations,
summaries=summaries,
samples=samples,
)
def _realise_cash_flow(
spec: CashFlowSpec,
generator: Generator,
*,
scenario_context: Mapping[str, Any] | None,
metadata: Mapping[str, Any] | None,
) -> CashFlow:
if spec.distribution is None:
return spec.cash_flow
distribution = spec.distribution
base_amount = spec.cash_flow.amount
params = _resolve_parameters(
distribution,
base_amount,
scenario_context=scenario_context,
metadata=metadata,
)
sample = _sample_distribution(
distribution.type,
params,
generator,
)
return CashFlow(
amount=float(sample),
period_index=spec.cash_flow.period_index,
date=spec.cash_flow.date,
)
def _resolve_parameters(
distribution: DistributionSpec,
base_amount: float,
*,
scenario_context: Mapping[str, Any] | None,
metadata: Mapping[str, Any] | None,
) -> Dict[str, Any]:
params = dict(distribution.parameters)
if distribution.source == DistributionSource.SCENARIO_FIELD:
if distribution.source_key is None:
raise DistributionConfigError(
"source_key is required for scenario_field sourcing")
if not scenario_context or distribution.source_key not in scenario_context:
raise DistributionConfigError(
f"scenario field '{distribution.source_key}' not found for distribution"
)
params.setdefault("mean", float(
scenario_context[distribution.source_key]))
elif distribution.source == DistributionSource.METADATA_KEY:
if distribution.source_key is None:
raise DistributionConfigError(
"source_key is required for metadata_key sourcing")
if not metadata or distribution.source_key not in metadata:
raise DistributionConfigError(
f"metadata key '{distribution.source_key}' not found for distribution"
)
params.setdefault("mean", float(metadata[distribution.source_key]))
else:
params.setdefault("mean", float(base_amount))
return params
def _sample_distribution(
distribution_type: DistributionType,
params: Mapping[str, Any],
generator: Generator,
) -> float:
if distribution_type is DistributionType.NORMAL:
return _sample_normal(params, generator)
if distribution_type is DistributionType.LOGNORMAL:
return _sample_lognormal(params, generator)
if distribution_type is DistributionType.TRIANGULAR:
return _sample_triangular(params, generator)
if distribution_type is DistributionType.DISCRETE:
return _sample_discrete(params, generator)
raise DistributionConfigError(
f"Unsupported distribution type: {distribution_type}")
def _sample_normal(params: Mapping[str, Any], generator: Generator) -> float:
if "std_dev" not in params:
raise DistributionConfigError("normal distribution requires 'std_dev'")
std_dev = float(params["std_dev"])
if std_dev < 0:
raise DistributionConfigError("std_dev must be non-negative")
mean = float(params.get("mean", 0.0))
if std_dev == 0:
return mean
return float(generator.normal(loc=mean, scale=std_dev))
def _sample_lognormal(params: Mapping[str, Any], generator: Generator) -> float:
if "sigma" not in params:
raise DistributionConfigError(
"lognormal distribution requires 'sigma'")
sigma = float(params["sigma"])
if sigma < 0:
raise DistributionConfigError("sigma must be non-negative")
if "mean" not in params:
raise DistributionConfigError(
"lognormal distribution requires 'mean' (mu in log space)")
mean = float(params["mean"])
return float(generator.lognormal(mean=mean, sigma=sigma))
def _sample_triangular(params: Mapping[str, Any], generator: Generator) -> float:
required = {"min", "mode", "max"}
if not required.issubset(params):
missing = ", ".join(sorted(required - params.keys()))
raise DistributionConfigError(
f"triangular distribution missing parameters: {missing}")
left = float(params["min"])
mode = float(params["mode"])
right = float(params["max"])
if not (left <= mode <= right):
raise DistributionConfigError(
"triangular distribution requires min <= mode <= max")
if left == right:
return mode
return float(generator.triangular(left=left, mode=mode, right=right))
def _sample_discrete(params: Mapping[str, Any], generator: Generator) -> float:
values = params.get("values")
probabilities = params.get("probabilities")
if not isinstance(values, Sequence) or not isinstance(probabilities, Sequence):
raise DistributionConfigError(
"discrete distribution requires 'values' and 'probabilities' sequences")
if len(values) != len(probabilities) or not values:
raise DistributionConfigError(
"values and probabilities must be non-empty and of equal length")
probs = np.array(probabilities, dtype=float)
if np.any(probs < 0):
raise DistributionConfigError("probabilities must be non-negative")
total = probs.sum()
if not np.isclose(total, 1.0):
raise DistributionConfigError("probabilities must sum to 1.0")
probs = probs / total
choices = np.array(values, dtype=float)
return float(generator.choice(choices, p=probs))
def _summarise(values: np.ndarray, percentiles: Sequence[float]) -> MetricSummary:
clean = values[~np.isnan(values)]
sample_size = clean.size
failed_runs = values.size - sample_size
if sample_size == 0:
percentile_map: Dict[float, float] = {
pct: float("nan") for pct in percentiles}
return MetricSummary(
mean=float("nan"),
std_dev=float("nan"),
minimum=float("nan"),
maximum=float("nan"),
percentiles=percentile_map,
sample_size=0,
failed_runs=failed_runs,
)
percentile_map = {
pct: float(np.percentile(clean, pct)) for pct in percentiles
}
return MetricSummary(
mean=float(np.mean(clean)),
std_dev=float(np.std(clean, ddof=1)) if sample_size > 1 else 0.0,
minimum=float(np.min(clean)),
maximum=float(np.max(clean)),
percentiles=percentile_map,
sample_size=sample_size,
failed_runs=failed_runs,
)