253 lines
8.1 KiB
Python
253 lines
8.1 KiB
Python
"""Financial calculation helpers for project evaluation metrics."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import date, datetime
|
|
from math import isclose, isfinite
|
|
from typing import Iterable, List, Sequence, Tuple
|
|
|
|
Number = float
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class CashFlow:
|
|
"""Represents a dated cash flow in scenario currency."""
|
|
|
|
amount: Number
|
|
period_index: int | None = None
|
|
date: date | datetime | None = None
|
|
|
|
|
|
class ConvergenceError(RuntimeError):
|
|
"""Raised when an iterative solver fails to converge."""
|
|
|
|
|
|
class PaybackNotReachedError(RuntimeError):
|
|
"""Raised when cumulative cash flows never reach a non-negative total."""
|
|
|
|
|
|
def _coerce_date(value: date | datetime) -> date:
|
|
if isinstance(value, datetime):
|
|
return value.date()
|
|
return value
|
|
|
|
|
|
def normalize_cash_flows(
|
|
cash_flows: Iterable[CashFlow],
|
|
*,
|
|
compounds_per_year: int = 1,
|
|
) -> List[Tuple[Number, float]]:
|
|
"""Normalise cash flows to ``(amount, periods)`` tuples.
|
|
|
|
When explicit ``period_index`` values are provided they take precedence. If
|
|
only dates are supplied, the first dated cash flow anchors the timeline and
|
|
subsequent cash flows convert their day offsets into fractional periods
|
|
based on ``compounds_per_year``. When neither a period index nor a date is
|
|
present, cash flows are treated as sequential periods in input order.
|
|
"""
|
|
|
|
flows: Sequence[CashFlow] = list(cash_flows)
|
|
if not flows:
|
|
return []
|
|
|
|
if compounds_per_year <= 0:
|
|
raise ValueError("compounds_per_year must be a positive integer")
|
|
|
|
base_date: date | None = None
|
|
for flow in flows:
|
|
if flow.date is not None:
|
|
base_date = _coerce_date(flow.date)
|
|
break
|
|
|
|
normalised: List[Tuple[Number, float]] = []
|
|
for idx, flow in enumerate(flows):
|
|
amount = float(flow.amount)
|
|
if flow.period_index is not None:
|
|
periods = float(flow.period_index)
|
|
elif flow.date is not None and base_date is not None:
|
|
current_date = _coerce_date(flow.date)
|
|
delta_days = (current_date - base_date).days
|
|
period_length_days = 365.0 / float(compounds_per_year)
|
|
periods = delta_days / period_length_days
|
|
else:
|
|
periods = float(idx)
|
|
normalised.append((amount, periods))
|
|
|
|
return normalised
|
|
|
|
|
|
def discount_factor(rate: Number, periods: float, *, compounds_per_year: int = 1) -> float:
|
|
"""Return the factor used to discount a value ``periods`` steps in the future."""
|
|
|
|
if compounds_per_year <= 0:
|
|
raise ValueError("compounds_per_year must be a positive integer")
|
|
|
|
periodic_rate = rate / float(compounds_per_year)
|
|
return (1.0 + periodic_rate) ** (-periods)
|
|
|
|
|
|
def net_present_value(
|
|
rate: Number,
|
|
cash_flows: Iterable[CashFlow],
|
|
*,
|
|
residual_value: Number | None = None,
|
|
residual_periods: float | None = None,
|
|
compounds_per_year: int = 1,
|
|
) -> float:
|
|
"""Calculate Net Present Value for ``cash_flows``.
|
|
|
|
``rate`` is a decimal (``0.1`` for 10%). Cash flows are discounted using the
|
|
given compounding frequency. When ``residual_value`` is provided it is
|
|
discounted at ``residual_periods`` periods; by default the value occurs one
|
|
period after the final cash flow.
|
|
"""
|
|
|
|
normalised = normalize_cash_flows(
|
|
cash_flows,
|
|
compounds_per_year=compounds_per_year,
|
|
)
|
|
|
|
if not normalised and residual_value is None:
|
|
return 0.0
|
|
|
|
total = 0.0
|
|
for amount, periods in normalised:
|
|
factor = discount_factor(
|
|
rate, periods, compounds_per_year=compounds_per_year)
|
|
total += amount * factor
|
|
|
|
if residual_value is not None:
|
|
if residual_periods is None:
|
|
last_period = normalised[-1][1] if normalised else 0.0
|
|
residual_periods = last_period + 1.0
|
|
factor = discount_factor(
|
|
rate, residual_periods, compounds_per_year=compounds_per_year)
|
|
total += float(residual_value) * factor
|
|
|
|
return total
|
|
|
|
|
|
def internal_rate_of_return(
|
|
cash_flows: Iterable[CashFlow],
|
|
*,
|
|
guess: Number = 0.1,
|
|
max_iterations: int = 100,
|
|
tolerance: float = 1e-6,
|
|
compounds_per_year: int = 1,
|
|
) -> float:
|
|
"""Return the internal rate of return for ``cash_flows``.
|
|
|
|
Uses Newton-Raphson iteration with a bracketed fallback when the derivative
|
|
becomes unstable. Raises :class:`ConvergenceError` if no root is found.
|
|
"""
|
|
|
|
flows = normalize_cash_flows(
|
|
cash_flows,
|
|
compounds_per_year=compounds_per_year,
|
|
)
|
|
if not flows:
|
|
raise ValueError("cash_flows must contain at least one item")
|
|
|
|
amounts = [amount for amount, _ in flows]
|
|
if not any(amount < 0 for amount in amounts) or not any(amount > 0 for amount in amounts):
|
|
raise ValueError(
|
|
"cash_flows must include both negative and positive values")
|
|
|
|
def _npv_with_flows(rate: float) -> float:
|
|
periodic_rate = rate / float(compounds_per_year)
|
|
if periodic_rate <= -1.0:
|
|
return float("inf")
|
|
total = 0.0
|
|
for amount, periods in flows:
|
|
factor = (1.0 + periodic_rate) ** (-periods)
|
|
total += amount * factor
|
|
return total
|
|
|
|
def _derivative(rate: float) -> float:
|
|
periodic_rate = rate / float(compounds_per_year)
|
|
if periodic_rate <= -1.0:
|
|
return float("inf")
|
|
derivative = 0.0
|
|
for amount, periods in flows:
|
|
factor = (1.0 + periodic_rate) ** (-periods - 1.0)
|
|
derivative += -amount * periods * \
|
|
factor / float(compounds_per_year)
|
|
return derivative
|
|
|
|
rate = float(guess)
|
|
for _ in range(max_iterations):
|
|
value = _npv_with_flows(rate)
|
|
if isclose(value, 0.0, abs_tol=tolerance):
|
|
return rate
|
|
derivative = _derivative(rate)
|
|
if derivative == 0.0 or not isfinite(derivative):
|
|
break
|
|
next_rate = rate - value / derivative
|
|
if abs(next_rate - rate) < tolerance:
|
|
return next_rate
|
|
rate = next_rate
|
|
|
|
# Fallback to bracketed bisection between sensible bounds.
|
|
lower_bound = -0.99 * float(compounds_per_year)
|
|
upper_bound = 10.0
|
|
lower_value = _npv_with_flows(lower_bound)
|
|
upper_value = _npv_with_flows(upper_bound)
|
|
|
|
attempts = 0
|
|
while lower_value * upper_value > 0 and attempts < 12:
|
|
upper_bound *= 2.0
|
|
upper_value = _npv_with_flows(upper_bound)
|
|
attempts += 1
|
|
|
|
if lower_value * upper_value > 0:
|
|
raise ConvergenceError(
|
|
"IRR could not be bracketed within default bounds")
|
|
|
|
for _ in range(max_iterations * 2):
|
|
midpoint = (lower_bound + upper_bound) / 2.0
|
|
mid_value = _npv_with_flows(midpoint)
|
|
if isclose(mid_value, 0.0, abs_tol=tolerance):
|
|
return midpoint
|
|
if lower_value * mid_value < 0:
|
|
upper_bound = midpoint
|
|
upper_value = mid_value
|
|
else:
|
|
lower_bound = midpoint
|
|
lower_value = mid_value
|
|
raise ConvergenceError("IRR solver failed to converge")
|
|
|
|
|
|
def payback_period(
|
|
cash_flows: Iterable[CashFlow],
|
|
*,
|
|
allow_fractional: bool = True,
|
|
compounds_per_year: int = 1,
|
|
) -> float:
|
|
"""Return the period index where cumulative cash flow becomes non-negative."""
|
|
|
|
flows = normalize_cash_flows(
|
|
cash_flows,
|
|
compounds_per_year=compounds_per_year,
|
|
)
|
|
if not flows:
|
|
raise ValueError("cash_flows must contain at least one item")
|
|
|
|
flows = sorted(flows, key=lambda item: item[1])
|
|
cumulative = 0.0
|
|
previous_period = flows[0][1]
|
|
|
|
for index, (amount, periods) in enumerate(flows):
|
|
next_cumulative = cumulative + amount
|
|
if next_cumulative >= 0.0:
|
|
if not allow_fractional or isclose(amount, 0.0):
|
|
return periods
|
|
prev_period = previous_period if index > 0 else periods
|
|
fraction = -cumulative / amount
|
|
return prev_period + fraction * (periods - prev_period)
|
|
cumulative = next_cumulative
|
|
previous_period = periods
|
|
|
|
raise PaybackNotReachedError(
|
|
"Cumulative cash flow never becomes non-negative")
|