feat: Enhance currency handling and validation across scenarios
- Updated form template to prefill currency input with default value and added help text for clarity. - Modified integration tests to assert more descriptive error messages for invalid currency codes. - Introduced new tests for currency normalization and validation in various scenarios, including imports and exports. - Added comprehensive tests for pricing calculations, ensuring defaults are respected and overrides function correctly. - Implemented unit tests for pricing settings repository, ensuring CRUD operations and default settings are handled properly. - Enhanced scenario pricing evaluation tests to validate currency handling and metadata defaults. - Added simulation tests to ensure Monte Carlo runs are accurate and handle various distribution scenarios.
This commit is contained in:
248
services/financial.py
Normal file
248
services/financial.py
Normal file
@@ -0,0 +1,248 @@
|
||||
from __future__ import annotations
|
||||
|
||||
"""Financial calculation helpers for project evaluation metrics."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import date, datetime
|
||||
from math import isclose, isfinite
|
||||
from typing import Iterable, List, Sequence, Tuple
|
||||
|
||||
Number = float
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class CashFlow:
|
||||
"""Represents a dated cash flow in scenario currency."""
|
||||
|
||||
amount: Number
|
||||
period_index: int | None = None
|
||||
date: date | datetime | None = None
|
||||
|
||||
|
||||
class ConvergenceError(RuntimeError):
|
||||
"""Raised when an iterative solver fails to converge."""
|
||||
|
||||
|
||||
class PaybackNotReachedError(RuntimeError):
|
||||
"""Raised when cumulative cash flows never reach a non-negative total."""
|
||||
|
||||
|
||||
def _coerce_date(value: date | datetime) -> date:
|
||||
if isinstance(value, datetime):
|
||||
return value.date()
|
||||
return value
|
||||
|
||||
|
||||
def normalize_cash_flows(
|
||||
cash_flows: Iterable[CashFlow],
|
||||
*,
|
||||
compounds_per_year: int = 1,
|
||||
) -> List[Tuple[Number, float]]:
|
||||
"""Normalise cash flows to ``(amount, periods)`` tuples.
|
||||
|
||||
When explicit ``period_index`` values are provided they take precedence. If
|
||||
only dates are supplied, the first dated cash flow anchors the timeline and
|
||||
subsequent cash flows convert their day offsets into fractional periods
|
||||
based on ``compounds_per_year``. When neither a period index nor a date is
|
||||
present, cash flows are treated as sequential periods in input order.
|
||||
"""
|
||||
|
||||
flows: Sequence[CashFlow] = list(cash_flows)
|
||||
if not flows:
|
||||
return []
|
||||
|
||||
if compounds_per_year <= 0:
|
||||
raise ValueError("compounds_per_year must be a positive integer")
|
||||
|
||||
base_date: date | None = None
|
||||
for flow in flows:
|
||||
if flow.date is not None:
|
||||
base_date = _coerce_date(flow.date)
|
||||
break
|
||||
|
||||
normalised: List[Tuple[Number, float]] = []
|
||||
for idx, flow in enumerate(flows):
|
||||
amount = float(flow.amount)
|
||||
if flow.period_index is not None:
|
||||
periods = float(flow.period_index)
|
||||
elif flow.date is not None and base_date is not None:
|
||||
current_date = _coerce_date(flow.date)
|
||||
delta_days = (current_date - base_date).days
|
||||
period_length_days = 365.0 / float(compounds_per_year)
|
||||
periods = delta_days / period_length_days
|
||||
else:
|
||||
periods = float(idx)
|
||||
normalised.append((amount, periods))
|
||||
|
||||
return normalised
|
||||
|
||||
|
||||
def discount_factor(rate: Number, periods: float, *, compounds_per_year: int = 1) -> float:
|
||||
"""Return the factor used to discount a value ``periods`` steps in the future."""
|
||||
|
||||
if compounds_per_year <= 0:
|
||||
raise ValueError("compounds_per_year must be a positive integer")
|
||||
|
||||
periodic_rate = rate / float(compounds_per_year)
|
||||
return (1.0 + periodic_rate) ** (-periods)
|
||||
|
||||
|
||||
def net_present_value(
|
||||
rate: Number,
|
||||
cash_flows: Iterable[CashFlow],
|
||||
*,
|
||||
residual_value: Number | None = None,
|
||||
residual_periods: float | None = None,
|
||||
compounds_per_year: int = 1,
|
||||
) -> float:
|
||||
"""Calculate Net Present Value for ``cash_flows``.
|
||||
|
||||
``rate`` is a decimal (``0.1`` for 10%). Cash flows are discounted using the
|
||||
given compounding frequency. When ``residual_value`` is provided it is
|
||||
discounted at ``residual_periods`` periods; by default the value occurs one
|
||||
period after the final cash flow.
|
||||
"""
|
||||
|
||||
normalised = normalize_cash_flows(
|
||||
cash_flows,
|
||||
compounds_per_year=compounds_per_year,
|
||||
)
|
||||
|
||||
if not normalised and residual_value is None:
|
||||
return 0.0
|
||||
|
||||
total = 0.0
|
||||
for amount, periods in normalised:
|
||||
factor = discount_factor(
|
||||
rate, periods, compounds_per_year=compounds_per_year)
|
||||
total += amount * factor
|
||||
|
||||
if residual_value is not None:
|
||||
if residual_periods is None:
|
||||
last_period = normalised[-1][1] if normalised else 0.0
|
||||
residual_periods = last_period + 1.0
|
||||
factor = discount_factor(
|
||||
rate, residual_periods, compounds_per_year=compounds_per_year)
|
||||
total += float(residual_value) * factor
|
||||
|
||||
return total
|
||||
|
||||
|
||||
def internal_rate_of_return(
|
||||
cash_flows: Iterable[CashFlow],
|
||||
*,
|
||||
guess: Number = 0.1,
|
||||
max_iterations: int = 100,
|
||||
tolerance: float = 1e-6,
|
||||
compounds_per_year: int = 1,
|
||||
) -> float:
|
||||
"""Return the internal rate of return for ``cash_flows``.
|
||||
|
||||
Uses Newton-Raphson iteration with a bracketed fallback when the derivative
|
||||
becomes unstable. Raises :class:`ConvergenceError` if no root is found.
|
||||
"""
|
||||
|
||||
flows = normalize_cash_flows(
|
||||
cash_flows,
|
||||
compounds_per_year=compounds_per_year,
|
||||
)
|
||||
if not flows:
|
||||
raise ValueError("cash_flows must contain at least one item")
|
||||
|
||||
amounts = [amount for amount, _ in flows]
|
||||
if not any(amount < 0 for amount in amounts) or not any(amount > 0 for amount in amounts):
|
||||
raise ValueError("cash_flows must include both negative and positive values")
|
||||
|
||||
def _npv_with_flows(rate: float) -> float:
|
||||
periodic_rate = rate / float(compounds_per_year)
|
||||
if periodic_rate <= -1.0:
|
||||
return float("inf")
|
||||
total = 0.0
|
||||
for amount, periods in flows:
|
||||
factor = (1.0 + periodic_rate) ** (-periods)
|
||||
total += amount * factor
|
||||
return total
|
||||
|
||||
def _derivative(rate: float) -> float:
|
||||
periodic_rate = rate / float(compounds_per_year)
|
||||
if periodic_rate <= -1.0:
|
||||
return float("inf")
|
||||
derivative = 0.0
|
||||
for amount, periods in flows:
|
||||
factor = (1.0 + periodic_rate) ** (-periods - 1.0)
|
||||
derivative += -amount * periods * factor / float(compounds_per_year)
|
||||
return derivative
|
||||
|
||||
rate = float(guess)
|
||||
for _ in range(max_iterations):
|
||||
value = _npv_with_flows(rate)
|
||||
if isclose(value, 0.0, abs_tol=tolerance):
|
||||
return rate
|
||||
derivative = _derivative(rate)
|
||||
if derivative == 0.0 or not isfinite(derivative):
|
||||
break
|
||||
next_rate = rate - value / derivative
|
||||
if abs(next_rate - rate) < tolerance:
|
||||
return next_rate
|
||||
rate = next_rate
|
||||
|
||||
# Fallback to bracketed bisection between sensible bounds.
|
||||
lower_bound = -0.99 * float(compounds_per_year)
|
||||
upper_bound = 10.0
|
||||
lower_value = _npv_with_flows(lower_bound)
|
||||
upper_value = _npv_with_flows(upper_bound)
|
||||
|
||||
attempts = 0
|
||||
while lower_value * upper_value > 0 and attempts < 12:
|
||||
upper_bound *= 2.0
|
||||
upper_value = _npv_with_flows(upper_bound)
|
||||
attempts += 1
|
||||
|
||||
if lower_value * upper_value > 0:
|
||||
raise ConvergenceError("IRR could not be bracketed within default bounds")
|
||||
|
||||
for _ in range(max_iterations * 2):
|
||||
midpoint = (lower_bound + upper_bound) / 2.0
|
||||
mid_value = _npv_with_flows(midpoint)
|
||||
if isclose(mid_value, 0.0, abs_tol=tolerance):
|
||||
return midpoint
|
||||
if lower_value * mid_value < 0:
|
||||
upper_bound = midpoint
|
||||
upper_value = mid_value
|
||||
else:
|
||||
lower_bound = midpoint
|
||||
lower_value = mid_value
|
||||
raise ConvergenceError("IRR solver failed to converge")
|
||||
|
||||
|
||||
def payback_period(
|
||||
cash_flows: Iterable[CashFlow],
|
||||
*,
|
||||
allow_fractional: bool = True,
|
||||
compounds_per_year: int = 1,
|
||||
) -> float:
|
||||
"""Return the period index where cumulative cash flow becomes non-negative."""
|
||||
|
||||
flows = normalize_cash_flows(
|
||||
cash_flows,
|
||||
compounds_per_year=compounds_per_year,
|
||||
)
|
||||
if not flows:
|
||||
raise ValueError("cash_flows must contain at least one item")
|
||||
|
||||
flows = sorted(flows, key=lambda item: item[1])
|
||||
cumulative = 0.0
|
||||
previous_period = flows[0][1]
|
||||
|
||||
for index, (amount, periods) in enumerate(flows):
|
||||
next_cumulative = cumulative + amount
|
||||
if next_cumulative >= 0.0:
|
||||
if not allow_fractional or isclose(amount, 0.0):
|
||||
return periods
|
||||
prev_period = previous_period if index > 0 else periods
|
||||
fraction = -cumulative / amount
|
||||
return prev_period + fraction * (periods - prev_period)
|
||||
cumulative = next_cumulative
|
||||
previous_period = periods
|
||||
|
||||
raise PaybackNotReachedError("Cumulative cash flow never becomes non-negative")
|
||||
Reference in New Issue
Block a user