Files
calminer/routes/calculations.py
zwitschi 1feae7ff85 feat: Add Processing Opex functionality
- Introduced OpexValidationError for handling validation errors in processing opex calculations.
- Implemented ProjectProcessingOpexRepository and ScenarioProcessingOpexRepository for managing project and scenario-level processing opex snapshots.
- Enhanced UnitOfWork to include repositories for processing opex.
- Updated sidebar navigation and scenario detail templates to include links to the new Processing Opex Planner.
- Created a new template for the Processing Opex Planner with form handling for input components and parameters.
- Developed integration tests for processing opex calculations, covering HTML and JSON flows, including validation for currency mismatches and unsupported frequencies.
- Added unit tests for the calculation logic, ensuring correct handling of various scenarios and edge cases.
2025-11-13 09:26:57 +01:00

1589 lines
50 KiB
Python

"""Routes handling financial calculation workflows."""
from __future__ import annotations
from decimal import Decimal
from typing import Any, Sequence
from fastapi import APIRouter, Depends, Query, Request, status
from fastapi.responses import HTMLResponse, JSONResponse, Response
from fastapi.templating import Jinja2Templates
from pydantic import ValidationError
from starlette.datastructures import FormData
from dependencies import get_pricing_metadata, get_unit_of_work, require_authenticated_user
from models import (
Project,
ProjectCapexSnapshot,
ProjectProcessingOpexSnapshot,
ProjectProfitability,
Scenario,
ScenarioCapexSnapshot,
ScenarioProcessingOpexSnapshot,
ScenarioProfitability,
User,
)
from schemas.calculations import (
CapexCalculationOptions,
CapexCalculationRequest,
CapexCalculationResult,
CapexComponentInput,
CapexParameters,
ProcessingOpexCalculationRequest,
ProcessingOpexCalculationResult,
ProcessingOpexComponentInput,
ProcessingOpexOptions,
ProcessingOpexParameters,
ProfitabilityCalculationRequest,
ProfitabilityCalculationResult,
)
from services.calculations import (
calculate_initial_capex,
calculate_processing_opex,
calculate_profitability,
)
from services.exceptions import (
CapexValidationError,
EntityNotFoundError,
OpexValidationError,
ProfitabilityValidationError,
)
from services.pricing import PricingMetadata
from services.unit_of_work import UnitOfWork
from routes.template_filters import register_common_filters
router = APIRouter(prefix="/calculations", tags=["Calculations"])
templates = Jinja2Templates(directory="templates")
register_common_filters(templates)
_SUPPORTED_METALS: tuple[dict[str, str], ...] = (
{"value": "copper", "label": "Copper"},
{"value": "gold", "label": "Gold"},
{"value": "lithium", "label": "Lithium"},
)
_SUPPORTED_METAL_VALUES = {entry["value"] for entry in _SUPPORTED_METALS}
_DEFAULT_EVALUATION_PERIODS = 10
_CAPEX_CATEGORY_OPTIONS: tuple[dict[str, str], ...] = (
{"value": "equipment", "label": "Equipment"},
{"value": "infrastructure", "label": "Infrastructure"},
{"value": "land", "label": "Land & Property"},
{"value": "miscellaneous", "label": "Miscellaneous"},
)
_DEFAULT_CAPEX_HORIZON_YEARS = 5
_OPEX_CATEGORY_OPTIONS: tuple[dict[str, str], ...] = (
{"value": "labor", "label": "Labor"},
{"value": "materials", "label": "Materials"},
{"value": "energy", "label": "Energy"},
{"value": "maintenance", "label": "Maintenance"},
{"value": "other", "label": "Other"},
)
_OPEX_FREQUENCY_OPTIONS: tuple[dict[str, str], ...] = (
{"value": "daily", "label": "Daily"},
{"value": "weekly", "label": "Weekly"},
{"value": "monthly", "label": "Monthly"},
{"value": "quarterly", "label": "Quarterly"},
{"value": "annually", "label": "Annually"},
)
_DEFAULT_OPEX_HORIZON_YEARS = 5
_PROCESSING_OPEX_TEMPLATE = "scenarios/opex.html"
def _combine_impurity_metadata(metadata: PricingMetadata) -> list[dict[str, object]]:
"""Build impurity rows combining thresholds and penalties."""
thresholds = getattr(metadata, "impurity_thresholds", {}) or {}
penalties = getattr(metadata, "impurity_penalty_per_ppm", {}) or {}
impurity_codes = sorted({*thresholds.keys(), *penalties.keys()})
combined: list[dict[str, object]] = []
for code in impurity_codes:
combined.append(
{
"name": code,
"threshold": float(thresholds.get(code, 0.0)),
"penalty": float(penalties.get(code, 0.0)),
"value": None,
}
)
return combined
def _value_or_blank(value: Any) -> Any:
if value is None:
return ""
if isinstance(value, Decimal):
return float(value)
return value
def _normalise_impurity_entries(entries: Any) -> list[dict[str, Any]]:
if not entries:
return []
normalised: list[dict[str, Any]] = []
for entry in entries:
if isinstance(entry, dict):
getter = entry.get # type: ignore[assignment]
else:
def getter(key, default=None, _entry=entry): return getattr(
_entry, key, default)
normalised.append(
{
"name": getter("name", "") or "",
"value": _value_or_blank(getter("value")),
"threshold": _value_or_blank(getter("threshold")),
"penalty": _value_or_blank(getter("penalty")),
}
)
return normalised
def _build_default_form_data(
*,
metadata: PricingMetadata,
project: Project | None,
scenario: Scenario | None,
) -> dict[str, Any]:
payable_default = (
float(metadata.default_payable_pct)
if getattr(metadata, "default_payable_pct", None) is not None
else 100.0
)
moisture_threshold_default = (
float(metadata.moisture_threshold_pct)
if getattr(metadata, "moisture_threshold_pct", None) is not None
else 0.0
)
moisture_penalty_default = (
float(metadata.moisture_penalty_per_pct)
if getattr(metadata, "moisture_penalty_per_pct", None) is not None
else 0.0
)
base_metal_entry = next(iter(_SUPPORTED_METALS), None)
metal = base_metal_entry["value"] if base_metal_entry else ""
scenario_resource = getattr(scenario, "primary_resource", None)
if scenario_resource is not None:
candidate = getattr(scenario_resource, "value", str(scenario_resource))
if candidate in _SUPPORTED_METAL_VALUES:
metal = candidate
currency = ""
scenario_currency = getattr(scenario, "currency", None)
metadata_currency = getattr(metadata, "default_currency", None)
if scenario_currency:
currency = str(scenario_currency).upper()
elif metadata_currency:
currency = str(metadata_currency).upper()
discount_rate = ""
scenario_discount = getattr(scenario, "discount_rate", None)
if scenario_discount is not None:
discount_rate = float(scenario_discount) # type: ignore[arg-type]
return {
"metal": metal,
"ore_tonnage": "",
"head_grade_pct": "",
"recovery_pct": "",
"payable_pct": payable_default,
"reference_price": "",
"treatment_charge": "",
"smelting_charge": "",
"processing_opex": "",
"moisture_pct": "",
"moisture_threshold_pct": moisture_threshold_default,
"moisture_penalty_per_pct": moisture_penalty_default,
"premiums": "",
"fx_rate": 1.0,
"currency_code": currency,
"impurities": None,
"initial_capex": "",
"sustaining_capex": "",
"discount_rate": discount_rate,
"periods": _DEFAULT_EVALUATION_PERIODS,
}
def _prepare_form_data_for_display(
*,
defaults: dict[str, Any],
overrides: dict[str, Any] | None = None,
allow_empty_override: bool = False,
) -> dict[str, Any]:
data = dict(defaults)
if overrides:
for key, value in overrides.items():
if key == "csrf_token":
continue
if key == "impurities":
data["impurities"] = _normalise_impurity_entries(value)
continue
if value is None and not allow_empty_override:
continue
data[key] = _value_or_blank(value)
# Normalise defaults and ensure strings for None.
for key, value in list(data.items()):
if key == "impurities":
if value is None:
data[key] = None
else:
data[key] = _normalise_impurity_entries(value)
continue
data[key] = _value_or_blank(value)
return data
def _coerce_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
return lowered in {"1", "true", "yes", "on"}
return bool(value)
def _serialise_capex_component_entry(component: Any) -> dict[str, Any]:
if isinstance(component, CapexComponentInput):
raw = component.model_dump()
elif isinstance(component, dict):
raw = dict(component)
else:
raw = {
"id": getattr(component, "id", None),
"name": getattr(component, "name", None),
"category": getattr(component, "category", None),
"amount": getattr(component, "amount", None),
"currency": getattr(component, "currency", None),
"spend_year": getattr(component, "spend_year", None),
"notes": getattr(component, "notes", None),
}
return {
"id": raw.get("id"),
"name": _value_or_blank(raw.get("name")),
"category": raw.get("category") or "equipment",
"amount": _value_or_blank(raw.get("amount")),
"currency": _value_or_blank(raw.get("currency")),
"spend_year": _value_or_blank(raw.get("spend_year")),
"notes": _value_or_blank(raw.get("notes")),
}
def _serialise_capex_parameters(parameters: Any) -> dict[str, Any]:
if isinstance(parameters, CapexParameters):
raw = parameters.model_dump()
elif isinstance(parameters, dict):
raw = dict(parameters)
else:
raw = {}
return {
"currency_code": _value_or_blank(raw.get("currency_code")),
"contingency_pct": _value_or_blank(raw.get("contingency_pct")),
"discount_rate_pct": _value_or_blank(raw.get("discount_rate_pct")),
"evaluation_horizon_years": _value_or_blank(
raw.get("evaluation_horizon_years")
),
}
def _serialise_capex_options(options: Any) -> dict[str, Any]:
if isinstance(options, CapexCalculationOptions):
raw = options.model_dump()
elif isinstance(options, dict):
raw = dict(options)
else:
raw = {}
return {"persist": _coerce_bool(raw.get("persist", False))}
def _build_capex_defaults(
*,
project: Project | None,
scenario: Scenario | None,
) -> dict[str, Any]:
currency = ""
if scenario and getattr(scenario, "currency", None):
currency = str(scenario.currency).upper()
elif project and getattr(project, "currency", None):
currency = str(project.currency).upper()
discount_rate = ""
scenario_discount = getattr(scenario, "discount_rate", None)
if scenario_discount is not None:
discount_rate = float(scenario_discount)
return {
"components": [],
"parameters": {
"currency_code": currency or None,
"contingency_pct": None,
"discount_rate_pct": discount_rate,
"evaluation_horizon_years": _DEFAULT_CAPEX_HORIZON_YEARS,
},
"options": {
"persist": bool(scenario or project),
},
"currency_code": currency or None,
"default_horizon": _DEFAULT_CAPEX_HORIZON_YEARS,
"last_updated_at": getattr(scenario, "capex_updated_at", None),
}
def _prepare_capex_context(
request: Request,
*,
project: Project | None,
scenario: Scenario | None,
form_data: dict[str, Any] | None = None,
result: CapexCalculationResult | None = None,
errors: list[str] | None = None,
notices: list[str] | None = None,
component_errors: list[str] | None = None,
component_notices: list[str] | None = None,
) -> dict[str, Any]:
if form_data is not None and hasattr(form_data, "model_dump"):
form_data = form_data.model_dump() # type: ignore[assignment]
defaults = _build_capex_defaults(project=project, scenario=scenario)
raw_components: list[Any] = []
if form_data and "components" in form_data:
raw_components = list(form_data.get("components") or [])
components = [
_serialise_capex_component_entry(component) for component in raw_components
]
raw_parameters = defaults["parameters"].copy()
if form_data and form_data.get("parameters"):
raw_parameters.update(
_serialise_capex_parameters(form_data.get("parameters"))
)
parameters = _serialise_capex_parameters(raw_parameters)
raw_options = defaults["options"].copy()
if form_data and form_data.get("options"):
raw_options.update(_serialise_capex_options(form_data.get("options")))
options = _serialise_capex_options(raw_options)
currency_code = parameters.get(
"currency_code") or defaults["currency_code"]
return {
"request": request,
"project": project,
"scenario": scenario,
"components": components,
"parameters": parameters,
"options": options,
"currency_code": currency_code,
"category_options": _CAPEX_CATEGORY_OPTIONS,
"default_horizon": defaults["default_horizon"],
"last_updated_at": defaults["last_updated_at"],
"result": result,
"errors": errors or [],
"notices": notices or [],
"component_errors": component_errors or [],
"component_notices": component_notices or [],
"cancel_url": request.headers.get("Referer"),
"form_action": request.url.path,
"csrf_token": None,
}
def _serialise_opex_component_entry(component: Any) -> dict[str, Any]:
if isinstance(component, ProcessingOpexComponentInput):
raw = component.model_dump()
elif isinstance(component, dict):
raw = dict(component)
else:
raw = {
"id": getattr(component, "id", None),
"name": getattr(component, "name", None),
"category": getattr(component, "category", None),
"unit_cost": getattr(component, "unit_cost", None),
"quantity": getattr(component, "quantity", None),
"frequency": getattr(component, "frequency", None),
"currency": getattr(component, "currency", None),
"period_start": getattr(component, "period_start", None),
"period_end": getattr(component, "period_end", None),
"notes": getattr(component, "notes", None),
}
return {
"id": raw.get("id"),
"name": _value_or_blank(raw.get("name")),
"category": raw.get("category") or "labor",
"unit_cost": _value_or_blank(raw.get("unit_cost")),
"quantity": _value_or_blank(raw.get("quantity")),
"frequency": raw.get("frequency") or "monthly",
"currency": _value_or_blank(raw.get("currency")),
"period_start": _value_or_blank(raw.get("period_start")),
"period_end": _value_or_blank(raw.get("period_end")),
"notes": _value_or_blank(raw.get("notes")),
}
def _serialise_opex_parameters(parameters: Any) -> dict[str, Any]:
if isinstance(parameters, ProcessingOpexParameters):
raw = parameters.model_dump()
elif isinstance(parameters, dict):
raw = dict(parameters)
else:
raw = {}
return {
"currency_code": _value_or_blank(raw.get("currency_code")),
"escalation_pct": _value_or_blank(raw.get("escalation_pct")),
"discount_rate_pct": _value_or_blank(raw.get("discount_rate_pct")),
"evaluation_horizon_years": _value_or_blank(
raw.get("evaluation_horizon_years")
),
"apply_escalation": _coerce_bool(raw.get("apply_escalation", True)),
}
def _serialise_opex_options(options: Any) -> dict[str, Any]:
if isinstance(options, ProcessingOpexOptions):
raw = options.model_dump()
elif isinstance(options, dict):
raw = dict(options)
else:
raw = {}
return {
"persist": _coerce_bool(raw.get("persist", False)),
"snapshot_notes": _value_or_blank(raw.get("snapshot_notes")),
}
def _build_opex_defaults(
*,
project: Project | None,
scenario: Scenario | None,
) -> dict[str, Any]:
currency = ""
if scenario and getattr(scenario, "currency", None):
currency = str(scenario.currency).upper()
elif project and getattr(project, "currency", None):
currency = str(project.currency).upper()
discount_rate = ""
scenario_discount = getattr(scenario, "discount_rate", None)
if scenario_discount is not None:
discount_rate = float(scenario_discount)
last_updated_at = getattr(scenario, "opex_updated_at", None)
return {
"components": [],
"parameters": {
"currency_code": currency or None,
"escalation_pct": None,
"discount_rate_pct": discount_rate,
"evaluation_horizon_years": _DEFAULT_OPEX_HORIZON_YEARS,
"apply_escalation": True,
},
"options": {
"persist": bool(scenario or project),
"snapshot_notes": None,
},
"currency_code": currency or None,
"default_horizon": _DEFAULT_OPEX_HORIZON_YEARS,
"last_updated_at": last_updated_at,
}
def _prepare_opex_context(
request: Request,
*,
project: Project | None,
scenario: Scenario | None,
form_data: dict[str, Any] | None = None,
result: ProcessingOpexCalculationResult | None = None,
errors: list[str] | None = None,
notices: list[str] | None = None,
component_errors: list[str] | None = None,
component_notices: list[str] | None = None,
) -> dict[str, Any]:
if form_data is not None and hasattr(form_data, "model_dump"):
form_data = form_data.model_dump() # type: ignore[assignment]
defaults = _build_opex_defaults(project=project, scenario=scenario)
raw_components: list[Any] = []
if form_data and "components" in form_data:
raw_components = list(form_data.get("components") or [])
components = [
_serialise_opex_component_entry(component) for component in raw_components
]
raw_parameters = defaults["parameters"].copy()
if form_data and form_data.get("parameters"):
raw_parameters.update(
_serialise_opex_parameters(form_data.get("parameters"))
)
parameters = _serialise_opex_parameters(raw_parameters)
raw_options = defaults["options"].copy()
if form_data and form_data.get("options"):
raw_options.update(_serialise_opex_options(form_data.get("options")))
options = _serialise_opex_options(raw_options)
currency_code = parameters.get(
"currency_code") or defaults["currency_code"]
return {
"request": request,
"project": project,
"scenario": scenario,
"components": components,
"parameters": parameters,
"options": options,
"currency_code": currency_code,
"category_options": _OPEX_CATEGORY_OPTIONS,
"frequency_options": _OPEX_FREQUENCY_OPTIONS,
"default_horizon": defaults["default_horizon"],
"last_updated_at": defaults["last_updated_at"],
"result": result,
"errors": errors or [],
"notices": notices or [],
"component_errors": component_errors or [],
"component_notices": component_notices or [],
"cancel_url": request.headers.get("Referer"),
"form_action": request.url.path,
"csrf_token": None,
}
def _format_error_location(location: tuple[Any, ...]) -> str:
path = ""
for part in location:
if isinstance(part, int):
path += f"[{part}]"
else:
if path:
path += f".{part}"
else:
path = str(part)
return path or "(input)"
def _partition_capex_error_messages(
errors: Sequence[Any],
) -> tuple[list[str], list[str]]:
general: list[str] = []
component_specific: list[str] = []
for error in errors:
if isinstance(error, dict):
mapping = error
else:
try:
mapping = dict(error)
except TypeError:
mapping = {}
location = tuple(mapping.get("loc", ()))
message = mapping.get("msg", "Invalid value")
formatted_location = _format_error_location(location)
entry = f"{formatted_location} - {message}"
if location and location[0] == "components":
component_specific.append(entry)
else:
general.append(entry)
return general, component_specific
def _partition_opex_error_messages(
errors: Sequence[Any],
) -> tuple[list[str], list[str]]:
general: list[str] = []
component_specific: list[str] = []
for error in errors:
if isinstance(error, dict):
mapping = error
else:
try:
mapping = dict(error)
except TypeError:
mapping = {}
location = tuple(mapping.get("loc", ()))
message = mapping.get("msg", "Invalid value")
formatted_location = _format_error_location(location)
entry = f"{formatted_location} - {message}"
if location and location[0] == "components":
component_specific.append(entry)
else:
general.append(entry)
return general, component_specific
def _opex_form_to_payload(form: FormData) -> dict[str, Any]:
data: dict[str, Any] = {}
components: dict[int, dict[str, Any]] = {}
parameters: dict[str, Any] = {}
options: dict[str, Any] = {}
for key, value in form.multi_items():
normalised_value = _normalise_form_value(value)
if key.startswith("components["):
try:
index_part = key[len("components["):]
index_str, remainder = index_part.split("]", 1)
field = remainder.strip()[1:-1]
index = int(index_str)
except (ValueError, IndexError):
continue
entry = components.setdefault(index, {})
entry[field] = normalised_value
continue
if key.startswith("parameters["):
field = key[len("parameters["):-1]
if field == "apply_escalation":
parameters[field] = _coerce_bool(normalised_value)
else:
parameters[field] = normalised_value
continue
if key.startswith("options["):
field = key[len("options["):-1]
options[field] = normalised_value
continue
if key == "csrf_token":
continue
data[key] = normalised_value
if components:
ordered = [components[index] for index in sorted(components.keys())]
data["components"] = ordered
if parameters:
data["parameters"] = parameters
if options:
if "persist" in options:
options["persist"] = _coerce_bool(options.get("persist"))
data["options"] = options
return data
def _capex_form_to_payload(form: FormData) -> dict[str, Any]:
data: dict[str, Any] = {}
components: dict[int, dict[str, Any]] = {}
parameters: dict[str, Any] = {}
options: dict[str, Any] = {}
for key, value in form.multi_items():
normalised_value = _normalise_form_value(value)
if key.startswith("components["):
try:
index_part = key[len("components["):]
index_str, remainder = index_part.split("]", 1)
field = remainder.strip()[1:-1]
index = int(index_str)
except (ValueError, IndexError):
continue
entry = components.setdefault(index, {})
entry[field] = normalised_value
continue
if key.startswith("parameters["):
field = key[len("parameters["):-1]
parameters[field] = normalised_value
continue
if key.startswith("options["):
field = key[len("options["):-1]
options[field] = normalised_value
continue
if key == "csrf_token":
continue
data[key] = normalised_value
if components:
ordered = [
components[index] for index in sorted(components.keys())
]
data["components"] = ordered
if parameters:
data["parameters"] = parameters
if options:
options["persist"] = _coerce_bool(options.get("persist"))
data["options"] = options
return data
async def _extract_opex_payload(request: Request) -> dict[str, Any]:
content_type = request.headers.get("content-type", "").lower()
if content_type.startswith("application/json"):
body = await request.json()
return body if isinstance(body, dict) else {}
form = await request.form()
return _opex_form_to_payload(form)
async def _extract_capex_payload(request: Request) -> dict[str, Any]:
content_type = request.headers.get("content-type", "").lower()
if content_type.startswith("application/json"):
body = await request.json()
return body if isinstance(body, dict) else {}
form = await request.form()
return _capex_form_to_payload(form)
def _prepare_default_context(
request: Request,
*,
project: Project | None = None,
scenario: Scenario | None = None,
metadata: PricingMetadata,
form_data: dict[str, Any] | None = None,
allow_empty_override: bool = False,
result: ProfitabilityCalculationResult | None = None,
) -> dict[str, object]:
"""Assemble template context shared across calculation endpoints."""
defaults = _build_default_form_data(
metadata=metadata,
project=project,
scenario=scenario,
)
data = _prepare_form_data_for_display(
defaults=defaults,
overrides=form_data,
allow_empty_override=allow_empty_override,
)
return {
"request": request,
"project": project,
"scenario": scenario,
"metadata": metadata,
"metadata_impurities": _combine_impurity_metadata(metadata),
"supported_metals": _SUPPORTED_METALS,
"data": data,
"result": result,
"errors": [],
"notices": [],
"cancel_url": request.headers.get("Referer"),
"form_action": request.url.path,
"csrf_token": None,
"default_periods": _DEFAULT_EVALUATION_PERIODS,
}
def _load_project_and_scenario(
*,
uow: UnitOfWork,
project_id: int | None,
scenario_id: int | None,
) -> tuple[Project | None, Scenario | None]:
project: Project | None = None
scenario: Scenario | None = None
if project_id is not None and uow.projects is not None:
try:
project = uow.projects.get(project_id, with_children=False)
except EntityNotFoundError:
project = None
if scenario_id is not None and uow.scenarios is not None:
try:
scenario = uow.scenarios.get(scenario_id, with_children=False)
except EntityNotFoundError:
scenario = None
if scenario is not None and project is None:
project = scenario.project
return project, scenario
def _is_json_request(request: Request) -> bool:
content_type = request.headers.get("content-type", "").lower()
accept = request.headers.get("accept", "").lower()
return "application/json" in content_type or "application/json" in accept
def _normalise_form_value(value: Any) -> Any:
if isinstance(value, str):
stripped = value.strip()
return stripped if stripped != "" else None
return value
def _form_to_payload(form: FormData) -> dict[str, Any]:
data: dict[str, Any] = {}
impurities: dict[int, dict[str, Any]] = {}
for key, value in form.multi_items():
normalised_value = _normalise_form_value(value)
if key.startswith("impurities[") and "]" in key:
try:
index_part = key.split("[", 1)[1]
index_str, remainder = index_part.split("]", 1)
field = remainder.strip("[]")
if not field:
continue
index = int(index_str)
except (ValueError, IndexError):
continue
entry = impurities.setdefault(index, {})
entry[field] = normalised_value
continue
if key == "csrf_token":
continue
data[key] = normalised_value
if impurities:
ordered = []
for _, entry in sorted(impurities.items()):
if not entry.get("name"):
continue
ordered.append(entry)
if ordered:
data["impurities"] = ordered
return data
async def _extract_payload(request: Request) -> dict[str, Any]:
if request.headers.get("content-type", "").lower().startswith("application/json"):
return await request.json()
form = await request.form()
return _form_to_payload(form)
def _list_from_context(context: dict[str, Any], key: str) -> list:
value = context.get(key)
if isinstance(value, list):
return value
new_list: list = []
context[key] = new_list
return new_list
def _should_persist_snapshot(
*,
project: Project | None,
scenario: Scenario | None,
payload: ProfitabilityCalculationRequest,
) -> bool:
"""Determine whether to persist the profitability result.
Current strategy persists automatically when a scenario or project context
is provided. This can be refined later to honour explicit user choices.
"""
return bool(scenario or project)
def _persist_profitability_snapshots(
*,
uow: UnitOfWork,
project: Project | None,
scenario: Scenario | None,
user: User | None,
request_model: ProfitabilityCalculationRequest,
result: ProfitabilityCalculationResult,
) -> None:
if not _should_persist_snapshot(project=project, scenario=scenario, payload=request_model):
return
created_by_id = getattr(user, "id", None)
revenue_total = float(result.pricing.net_revenue)
processing_total = float(result.costs.processing_opex_total)
sustaining_total = float(result.costs.sustaining_capex_total)
initial_capex = float(result.costs.initial_capex)
net_cash_flow_total = revenue_total - (
processing_total + sustaining_total + initial_capex
)
npv_value = (
float(result.metrics.npv)
if result.metrics.npv is not None
else None
)
irr_value = (
float(result.metrics.irr)
if result.metrics.irr is not None
else None
)
payback_value = (
float(result.metrics.payback_period)
if result.metrics.payback_period is not None
else None
)
margin_value = (
float(result.metrics.margin)
if result.metrics.margin is not None
else None
)
payload = {
"request": request_model.model_dump(mode="json"),
"result": result.model_dump(),
}
if scenario and uow.scenario_profitability:
scenario_snapshot = ScenarioProfitability(
scenario_id=scenario.id,
created_by_id=created_by_id,
calculation_source="calculations.profitability",
currency_code=result.currency,
npv=npv_value,
irr_pct=irr_value,
payback_period_years=payback_value,
margin_pct=margin_value,
revenue_total=revenue_total,
processing_opex_total=processing_total,
sustaining_capex_total=sustaining_total,
initial_capex=initial_capex,
net_cash_flow_total=net_cash_flow_total,
payload=payload,
)
uow.scenario_profitability.create(scenario_snapshot)
if project and uow.project_profitability:
project_snapshot = ProjectProfitability(
project_id=project.id,
created_by_id=created_by_id,
calculation_source="calculations.profitability",
currency_code=result.currency,
npv=npv_value,
irr_pct=irr_value,
payback_period_years=payback_value,
margin_pct=margin_value,
revenue_total=revenue_total,
processing_opex_total=processing_total,
sustaining_capex_total=sustaining_total,
initial_capex=initial_capex,
net_cash_flow_total=net_cash_flow_total,
payload=payload,
)
uow.project_profitability.create(project_snapshot)
def _should_persist_capex(
*,
project: Project | None,
scenario: Scenario | None,
request_model: CapexCalculationRequest,
) -> bool:
"""Determine whether capex snapshots should be stored."""
persist_requested = bool(
getattr(request_model, "options", None)
and request_model.options.persist
)
return persist_requested and bool(project or scenario)
def _persist_capex_snapshots(
*,
uow: UnitOfWork,
project: Project | None,
scenario: Scenario | None,
user: User | None,
request_model: CapexCalculationRequest,
result: CapexCalculationResult,
) -> None:
if not _should_persist_capex(
project=project,
scenario=scenario,
request_model=request_model,
):
return
created_by_id = getattr(user, "id", None)
totals = result.totals
component_count = len(result.components)
payload = {
"request": request_model.model_dump(mode="json"),
"result": result.model_dump(),
}
if scenario and uow.scenario_capex:
scenario_snapshot = ScenarioCapexSnapshot(
scenario_id=scenario.id,
created_by_id=created_by_id,
calculation_source="calculations.capex",
currency_code=result.currency,
total_capex=float(totals.overall),
contingency_pct=float(totals.contingency_pct),
contingency_amount=float(totals.contingency_amount),
total_with_contingency=float(totals.with_contingency),
component_count=component_count,
payload=payload,
)
uow.scenario_capex.create(scenario_snapshot)
if project and uow.project_capex:
project_snapshot = ProjectCapexSnapshot(
project_id=project.id,
created_by_id=created_by_id,
calculation_source="calculations.capex",
currency_code=result.currency,
total_capex=float(totals.overall),
contingency_pct=float(totals.contingency_pct),
contingency_amount=float(totals.contingency_amount),
total_with_contingency=float(totals.with_contingency),
component_count=component_count,
payload=payload,
)
uow.project_capex.create(project_snapshot)
def _should_persist_opex(
*,
project: Project | None,
scenario: Scenario | None,
request_model: ProcessingOpexCalculationRequest,
) -> bool:
persist_requested = bool(
getattr(request_model, "options", None)
and request_model.options.persist
)
return persist_requested and bool(project or scenario)
def _persist_opex_snapshots(
*,
uow: UnitOfWork,
project: Project | None,
scenario: Scenario | None,
user: User | None,
request_model: ProcessingOpexCalculationRequest,
result: ProcessingOpexCalculationResult,
) -> None:
if not _should_persist_opex(
project=project,
scenario=scenario,
request_model=request_model,
):
return
created_by_id = getattr(user, "id", None)
totals = result.totals
metrics = result.metrics
parameters = result.parameters
overall_annual = float(totals.overall_annual)
escalated_total = (
float(totals.escalated_total)
if totals.escalated_total is not None
else None
)
annual_average = (
float(metrics.annual_average)
if metrics.annual_average is not None
else None
)
evaluation_horizon = (
int(parameters.evaluation_horizon_years)
if parameters.evaluation_horizon_years is not None
else None
)
escalation_pct = (
float(totals.escalation_pct)
if totals.escalation_pct is not None
else (
float(parameters.escalation_pct)
if parameters.escalation_pct is not None and parameters.apply_escalation
else None
)
)
apply_escalation = bool(parameters.apply_escalation)
component_count = len(result.components)
payload = {
"request": request_model.model_dump(mode="json"),
"result": result.model_dump(),
}
if scenario and uow.scenario_processing_opex:
scenario_snapshot = ScenarioProcessingOpexSnapshot(
scenario_id=scenario.id,
created_by_id=created_by_id,
calculation_source="calculations.processing_opex",
currency_code=result.currency,
overall_annual=overall_annual,
escalated_total=escalated_total,
annual_average=annual_average,
evaluation_horizon_years=evaluation_horizon,
escalation_pct=escalation_pct,
apply_escalation=apply_escalation,
component_count=component_count,
payload=payload,
)
uow.scenario_processing_opex.create(scenario_snapshot)
if project and uow.project_processing_opex:
project_snapshot = ProjectProcessingOpexSnapshot(
project_id=project.id,
created_by_id=created_by_id,
calculation_source="calculations.processing_opex",
currency_code=result.currency,
overall_annual=overall_annual,
escalated_total=escalated_total,
annual_average=annual_average,
evaluation_horizon_years=evaluation_horizon,
escalation_pct=escalation_pct,
apply_escalation=apply_escalation,
component_count=component_count,
payload=payload,
)
uow.project_processing_opex.create(project_snapshot)
@router.get(
"/processing-opex",
response_class=HTMLResponse,
name="calculations.processing_opex_form",
)
def processing_opex_form(
request: Request,
_: User = Depends(require_authenticated_user),
uow: UnitOfWork = Depends(get_unit_of_work),
project_id: int | None = Query(
None, description="Optional project identifier"),
scenario_id: int | None = Query(
None, description="Optional scenario identifier"),
) -> HTMLResponse:
"""Render the processing opex planner with default context."""
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
context = _prepare_opex_context(
request,
project=project,
scenario=scenario,
)
return templates.TemplateResponse(_PROCESSING_OPEX_TEMPLATE, context)
@router.post(
"/processing-opex",
name="calculations.processing_opex_submit",
)
async def processing_opex_submit(
request: Request,
current_user: User = Depends(require_authenticated_user),
uow: UnitOfWork = Depends(get_unit_of_work),
project_id: int | None = Query(
None, description="Optional project identifier"),
scenario_id: int | None = Query(
None, description="Optional scenario identifier"),
) -> Response:
"""Handle processing opex submissions and respond with HTML or JSON."""
wants_json = _is_json_request(request)
payload_data = await _extract_opex_payload(request)
try:
request_model = ProcessingOpexCalculationRequest.model_validate(
payload_data
)
result = calculate_processing_opex(request_model)
except ValidationError as exc:
if wants_json:
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"errors": exc.errors()},
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
general_errors, component_errors = _partition_opex_error_messages(
exc.errors()
)
context = _prepare_opex_context(
request,
project=project,
scenario=scenario,
form_data=payload_data,
errors=general_errors,
component_errors=component_errors,
)
return templates.TemplateResponse(
_PROCESSING_OPEX_TEMPLATE,
context,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
except OpexValidationError as exc:
if wants_json:
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"errors": list(exc.field_errors or []),
"message": exc.message,
},
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
errors = list(exc.field_errors or []) or [exc.message]
context = _prepare_opex_context(
request,
project=project,
scenario=scenario,
form_data=payload_data,
errors=errors,
)
return templates.TemplateResponse(
_PROCESSING_OPEX_TEMPLATE,
context,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
_persist_opex_snapshots(
uow=uow,
project=project,
scenario=scenario,
user=current_user,
request_model=request_model,
result=result,
)
if wants_json:
return JSONResponse(
status_code=status.HTTP_200_OK,
content=result.model_dump(),
)
context = _prepare_opex_context(
request,
project=project,
scenario=scenario,
form_data=request_model.model_dump(mode="json"),
result=result,
)
notices = _list_from_context(context, "notices")
notices.append("Processing opex calculation completed successfully.")
return templates.TemplateResponse(
_PROCESSING_OPEX_TEMPLATE,
context,
status_code=status.HTTP_200_OK,
)
@router.get(
"/capex",
response_class=HTMLResponse,
name="calculations.capex_form",
)
def capex_form(
request: Request,
_: User = Depends(require_authenticated_user),
uow: UnitOfWork = Depends(get_unit_of_work),
project_id: int | None = Query(
None, description="Optional project identifier"),
scenario_id: int | None = Query(
None, description="Optional scenario identifier"),
) -> HTMLResponse:
"""Render the initial capex planner template with defaults."""
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
context = _prepare_capex_context(
request,
project=project,
scenario=scenario,
)
return templates.TemplateResponse("scenarios/capex.html", context)
@router.post(
"/capex",
name="calculations.capex_submit",
)
async def capex_submit(
request: Request,
current_user: User = Depends(require_authenticated_user),
uow: UnitOfWork = Depends(get_unit_of_work),
project_id: int | None = Query(
None, description="Optional project identifier"),
scenario_id: int | None = Query(
None, description="Optional scenario identifier"),
) -> Response:
"""Process capex submissions and return aggregated results."""
wants_json = _is_json_request(request)
payload_data = await _extract_capex_payload(request)
try:
request_model = CapexCalculationRequest.model_validate(payload_data)
result = calculate_initial_capex(request_model)
except ValidationError as exc:
if wants_json:
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"errors": exc.errors()},
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
general_errors, component_errors = _partition_capex_error_messages(
exc.errors()
)
context = _prepare_capex_context(
request,
project=project,
scenario=scenario,
form_data=payload_data,
errors=general_errors,
component_errors=component_errors,
)
return templates.TemplateResponse(
"scenarios/capex.html",
context,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
except CapexValidationError as exc:
if wants_json:
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"errors": list(exc.field_errors or []),
"message": exc.message,
},
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
errors = list(exc.field_errors or []) or [exc.message]
context = _prepare_capex_context(
request,
project=project,
scenario=scenario,
form_data=payload_data,
errors=errors,
)
return templates.TemplateResponse(
"scenarios/capex.html",
context,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
_persist_capex_snapshots(
uow=uow,
project=project,
scenario=scenario,
user=current_user,
request_model=request_model,
result=result,
)
if wants_json:
return JSONResponse(
status_code=status.HTTP_200_OK,
content=result.model_dump(),
)
context = _prepare_capex_context(
request,
project=project,
scenario=scenario,
form_data=request_model.model_dump(mode="json"),
result=result,
)
notices = _list_from_context(context, "notices")
notices.append("Initial capex calculation completed successfully.")
return templates.TemplateResponse(
"scenarios/capex.html",
context,
status_code=status.HTTP_200_OK,
)
@router.get(
"/profitability",
response_class=HTMLResponse,
name="calculations.profitability_form",
)
def profitability_form(
request: Request,
_: User = Depends(require_authenticated_user),
metadata: PricingMetadata = Depends(get_pricing_metadata),
uow: UnitOfWork = Depends(get_unit_of_work),
project_id: int | None = Query(
None, description="Optional project identifier"),
scenario_id: int | None = Query(
None, description="Optional scenario identifier"),
) -> HTMLResponse:
"""Render the profitability calculation form with default metadata."""
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
context = _prepare_default_context(
request,
project=project,
scenario=scenario,
metadata=metadata,
)
return templates.TemplateResponse("scenarios/profitability.html", context)
@router.post(
"/profitability",
name="calculations.profitability_submit",
)
async def profitability_submit(
request: Request,
current_user: User = Depends(require_authenticated_user),
metadata: PricingMetadata = Depends(get_pricing_metadata),
uow: UnitOfWork = Depends(get_unit_of_work),
project_id: int | None = Query(
None, description="Optional project identifier"),
scenario_id: int | None = Query(
None, description="Optional scenario identifier"),
) -> Response:
"""Handle profitability calculations and return HTML or JSON."""
wants_json = _is_json_request(request)
payload_data = await _extract_payload(request)
try:
request_model = ProfitabilityCalculationRequest.model_validate(
payload_data)
result = calculate_profitability(request_model, metadata=metadata)
except ValidationError as exc:
if wants_json:
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"errors": exc.errors()},
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
context = _prepare_default_context(
request,
project=project,
scenario=scenario,
metadata=metadata,
form_data=payload_data,
allow_empty_override=True,
)
errors = _list_from_context(context, "errors")
errors.extend(
[f"{err['loc']} - {err['msg']}" for err in exc.errors()]
)
return templates.TemplateResponse(
"scenarios/profitability.html",
context,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
except ProfitabilityValidationError as exc:
if wants_json:
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"errors": exc.field_errors or [],
"message": exc.message,
},
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
context = _prepare_default_context(
request,
project=project,
scenario=scenario,
metadata=metadata,
form_data=payload_data,
allow_empty_override=True,
)
messages = list(exc.field_errors or []) or [exc.message]
errors = _list_from_context(context, "errors")
errors.extend(messages)
return templates.TemplateResponse(
"scenarios/profitability.html",
context,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
project, scenario = _load_project_and_scenario(
uow=uow, project_id=project_id, scenario_id=scenario_id
)
_persist_profitability_snapshots(
uow=uow,
project=project,
scenario=scenario,
user=current_user,
request_model=request_model,
result=result,
)
if wants_json:
return JSONResponse(
status_code=status.HTTP_200_OK,
content=result.model_dump(),
)
context = _prepare_default_context(
request,
project=project,
scenario=scenario,
metadata=metadata,
form_data=request_model.model_dump(mode="json"),
result=result,
)
notices = _list_from_context(context, "notices")
notices.append("Profitability calculation completed successfully.")
return templates.TemplateResponse(
"scenarios/profitability.html",
context,
status_code=status.HTTP_200_OK,
)