Files
calminer/services/financial.py
zwitschi 795a9f99f4 feat: Enhance currency handling and validation across scenarios
- Updated form template to prefill currency input with default value and added help text for clarity.
- Modified integration tests to assert more descriptive error messages for invalid currency codes.
- Introduced new tests for currency normalization and validation in various scenarios, including imports and exports.
- Added comprehensive tests for pricing calculations, ensuring defaults are respected and overrides function correctly.
- Implemented unit tests for pricing settings repository, ensuring CRUD operations and default settings are handled properly.
- Enhanced scenario pricing evaluation tests to validate currency handling and metadata defaults.
- Added simulation tests to ensure Monte Carlo runs are accurate and handle various distribution scenarios.
2025-11-11 18:29:59 +01:00

249 lines
8.0 KiB
Python

from __future__ import annotations
"""Financial calculation helpers for project evaluation metrics."""
from dataclasses import dataclass
from datetime import date, datetime
from math import isclose, isfinite
from typing import Iterable, List, Sequence, Tuple
Number = float
@dataclass(frozen=True, slots=True)
class CashFlow:
"""Represents a dated cash flow in scenario currency."""
amount: Number
period_index: int | None = None
date: date | datetime | None = None
class ConvergenceError(RuntimeError):
"""Raised when an iterative solver fails to converge."""
class PaybackNotReachedError(RuntimeError):
"""Raised when cumulative cash flows never reach a non-negative total."""
def _coerce_date(value: date | datetime) -> date:
if isinstance(value, datetime):
return value.date()
return value
def normalize_cash_flows(
cash_flows: Iterable[CashFlow],
*,
compounds_per_year: int = 1,
) -> List[Tuple[Number, float]]:
"""Normalise cash flows to ``(amount, periods)`` tuples.
When explicit ``period_index`` values are provided they take precedence. If
only dates are supplied, the first dated cash flow anchors the timeline and
subsequent cash flows convert their day offsets into fractional periods
based on ``compounds_per_year``. When neither a period index nor a date is
present, cash flows are treated as sequential periods in input order.
"""
flows: Sequence[CashFlow] = list(cash_flows)
if not flows:
return []
if compounds_per_year <= 0:
raise ValueError("compounds_per_year must be a positive integer")
base_date: date | None = None
for flow in flows:
if flow.date is not None:
base_date = _coerce_date(flow.date)
break
normalised: List[Tuple[Number, float]] = []
for idx, flow in enumerate(flows):
amount = float(flow.amount)
if flow.period_index is not None:
periods = float(flow.period_index)
elif flow.date is not None and base_date is not None:
current_date = _coerce_date(flow.date)
delta_days = (current_date - base_date).days
period_length_days = 365.0 / float(compounds_per_year)
periods = delta_days / period_length_days
else:
periods = float(idx)
normalised.append((amount, periods))
return normalised
def discount_factor(rate: Number, periods: float, *, compounds_per_year: int = 1) -> float:
"""Return the factor used to discount a value ``periods`` steps in the future."""
if compounds_per_year <= 0:
raise ValueError("compounds_per_year must be a positive integer")
periodic_rate = rate / float(compounds_per_year)
return (1.0 + periodic_rate) ** (-periods)
def net_present_value(
rate: Number,
cash_flows: Iterable[CashFlow],
*,
residual_value: Number | None = None,
residual_periods: float | None = None,
compounds_per_year: int = 1,
) -> float:
"""Calculate Net Present Value for ``cash_flows``.
``rate`` is a decimal (``0.1`` for 10%). Cash flows are discounted using the
given compounding frequency. When ``residual_value`` is provided it is
discounted at ``residual_periods`` periods; by default the value occurs one
period after the final cash flow.
"""
normalised = normalize_cash_flows(
cash_flows,
compounds_per_year=compounds_per_year,
)
if not normalised and residual_value is None:
return 0.0
total = 0.0
for amount, periods in normalised:
factor = discount_factor(
rate, periods, compounds_per_year=compounds_per_year)
total += amount * factor
if residual_value is not None:
if residual_periods is None:
last_period = normalised[-1][1] if normalised else 0.0
residual_periods = last_period + 1.0
factor = discount_factor(
rate, residual_periods, compounds_per_year=compounds_per_year)
total += float(residual_value) * factor
return total
def internal_rate_of_return(
cash_flows: Iterable[CashFlow],
*,
guess: Number = 0.1,
max_iterations: int = 100,
tolerance: float = 1e-6,
compounds_per_year: int = 1,
) -> float:
"""Return the internal rate of return for ``cash_flows``.
Uses Newton-Raphson iteration with a bracketed fallback when the derivative
becomes unstable. Raises :class:`ConvergenceError` if no root is found.
"""
flows = normalize_cash_flows(
cash_flows,
compounds_per_year=compounds_per_year,
)
if not flows:
raise ValueError("cash_flows must contain at least one item")
amounts = [amount for amount, _ in flows]
if not any(amount < 0 for amount in amounts) or not any(amount > 0 for amount in amounts):
raise ValueError("cash_flows must include both negative and positive values")
def _npv_with_flows(rate: float) -> float:
periodic_rate = rate / float(compounds_per_year)
if periodic_rate <= -1.0:
return float("inf")
total = 0.0
for amount, periods in flows:
factor = (1.0 + periodic_rate) ** (-periods)
total += amount * factor
return total
def _derivative(rate: float) -> float:
periodic_rate = rate / float(compounds_per_year)
if periodic_rate <= -1.0:
return float("inf")
derivative = 0.0
for amount, periods in flows:
factor = (1.0 + periodic_rate) ** (-periods - 1.0)
derivative += -amount * periods * factor / float(compounds_per_year)
return derivative
rate = float(guess)
for _ in range(max_iterations):
value = _npv_with_flows(rate)
if isclose(value, 0.0, abs_tol=tolerance):
return rate
derivative = _derivative(rate)
if derivative == 0.0 or not isfinite(derivative):
break
next_rate = rate - value / derivative
if abs(next_rate - rate) < tolerance:
return next_rate
rate = next_rate
# Fallback to bracketed bisection between sensible bounds.
lower_bound = -0.99 * float(compounds_per_year)
upper_bound = 10.0
lower_value = _npv_with_flows(lower_bound)
upper_value = _npv_with_flows(upper_bound)
attempts = 0
while lower_value * upper_value > 0 and attempts < 12:
upper_bound *= 2.0
upper_value = _npv_with_flows(upper_bound)
attempts += 1
if lower_value * upper_value > 0:
raise ConvergenceError("IRR could not be bracketed within default bounds")
for _ in range(max_iterations * 2):
midpoint = (lower_bound + upper_bound) / 2.0
mid_value = _npv_with_flows(midpoint)
if isclose(mid_value, 0.0, abs_tol=tolerance):
return midpoint
if lower_value * mid_value < 0:
upper_bound = midpoint
upper_value = mid_value
else:
lower_bound = midpoint
lower_value = mid_value
raise ConvergenceError("IRR solver failed to converge")
def payback_period(
cash_flows: Iterable[CashFlow],
*,
allow_fractional: bool = True,
compounds_per_year: int = 1,
) -> float:
"""Return the period index where cumulative cash flow becomes non-negative."""
flows = normalize_cash_flows(
cash_flows,
compounds_per_year=compounds_per_year,
)
if not flows:
raise ValueError("cash_flows must contain at least one item")
flows = sorted(flows, key=lambda item: item[1])
cumulative = 0.0
previous_period = flows[0][1]
for index, (amount, periods) in enumerate(flows):
next_cumulative = cumulative + amount
if next_cumulative >= 0.0:
if not allow_fractional or isclose(amount, 0.0):
return periods
prev_period = previous_period if index > 0 else periods
fraction = -cumulative / amount
return prev_period + fraction * (periods - prev_period)
cumulative = next_cumulative
previous_period = periods
raise PaybackNotReachedError("Cumulative cash flow never becomes non-negative")