Total Initial Capex
++ {{ result.totals.overall | currency_display(result.currency) }} +
+diff --git a/models/__init__.py b/models/__init__.py
index df529c1..bd4a9f4 100644
--- a/models/__init__.py
+++ b/models/__init__.py
@@ -28,6 +28,7 @@ from .scenario import Scenario
from .simulation_parameter import SimulationParameter
from .user import Role, User, UserRole, password_context
from .profitability_snapshot import ProjectProfitability, ScenarioProfitability
+from .capex_snapshot import ProjectCapexSnapshot, ScenarioCapexSnapshot
__all__ = [
"FinancialCategory",
@@ -35,11 +36,13 @@ __all__ = [
"MiningOperationType",
"Project",
"ProjectProfitability",
+ "ProjectCapexSnapshot",
"PricingSettings",
"PricingMetalSettings",
"PricingImpuritySettings",
"Scenario",
"ScenarioProfitability",
+ "ScenarioCapexSnapshot",
"ScenarioStatus",
"DistributionType",
"SimulationParameter",
diff --git a/models/capex_snapshot.py b/models/capex_snapshot.py
new file mode 100644
index 0000000..87b7169
--- /dev/null
+++ b/models/capex_snapshot.py
@@ -0,0 +1,111 @@
+from __future__ import annotations
+
+from datetime import datetime
+from typing import TYPE_CHECKING
+
+from sqlalchemy import JSON, DateTime, ForeignKey, Integer, Numeric, String
+from sqlalchemy.orm import Mapped, mapped_column, relationship
+from sqlalchemy.sql import func
+
+from config.database import Base
+
+if TYPE_CHECKING: # pragma: no cover
+ from .project import Project
+ from .scenario import Scenario
+ from .user import User
+
+
+class ProjectCapexSnapshot(Base):
+ """Snapshot of aggregated initial capex metrics at the project level."""
+
+ __tablename__ = "project_capex_snapshots"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ project_id: Mapped[int] = mapped_column(
+ ForeignKey("projects.id", ondelete="CASCADE"), nullable=False, index=True
+ )
+ created_by_id: Mapped[int | None] = mapped_column(
+ ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
+ )
+ calculation_source: Mapped[str | None] = mapped_column(
+ String(64), nullable=True)
+ calculated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now()
+ )
+ currency_code: Mapped[str | None] = mapped_column(String(3), nullable=True)
+ total_capex: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ contingency_pct: Mapped[float | None] = mapped_column(
+ Numeric(12, 6), nullable=True)
+ contingency_amount: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ total_with_contingency: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ component_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
+ payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now()
+ )
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
+ )
+
+ project: Mapped[Project] = relationship(
+ "Project", back_populates="capex_snapshots"
+ )
+ created_by: Mapped[User | None] = relationship("User")
+
+ def __repr__(self) -> str: # pragma: no cover
+ return (
+ "ProjectCapexSnapshot(id={id!r}, project_id={project_id!r}, total_capex={total_capex!r})".format(
+ id=self.id, project_id=self.project_id, total_capex=self.total_capex
+ )
+ )
+
+
+class ScenarioCapexSnapshot(Base):
+ """Snapshot of initial capex metrics for an individual scenario."""
+
+ __tablename__ = "scenario_capex_snapshots"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ scenario_id: Mapped[int] = mapped_column(
+ ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False, index=True
+ )
+ created_by_id: Mapped[int | None] = mapped_column(
+ ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
+ )
+ calculation_source: Mapped[str | None] = mapped_column(
+ String(64), nullable=True)
+ calculated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now()
+ )
+ currency_code: Mapped[str | None] = mapped_column(String(3), nullable=True)
+ total_capex: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ contingency_pct: Mapped[float | None] = mapped_column(
+ Numeric(12, 6), nullable=True)
+ contingency_amount: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ total_with_contingency: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ component_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
+ payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now()
+ )
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
+ )
+
+ scenario: Mapped[Scenario] = relationship(
+ "Scenario", back_populates="capex_snapshots"
+ )
+ created_by: Mapped[User | None] = relationship("User")
+
+ def __repr__(self) -> str: # pragma: no cover
+ return (
+ "ScenarioCapexSnapshot(id={id!r}, scenario_id={scenario_id!r}, total_capex={total_capex!r})".format(
+ id=self.id, scenario_id=self.scenario_id, total_capex=self.total_capex
+ )
+ )
diff --git a/models/project.py b/models/project.py
index 2b5c3f7..af8a680 100644
--- a/models/project.py
+++ b/models/project.py
@@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, List
from .enums import MiningOperationType, sql_enum
from .profitability_snapshot import ProjectProfitability
+from .capex_snapshot import ProjectCapexSnapshot
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
@@ -59,6 +60,13 @@ class Project(Base):
order_by=lambda: ProjectProfitability.calculated_at.desc(),
passive_deletes=True,
)
+ capex_snapshots: Mapped[List["ProjectCapexSnapshot"]] = relationship(
+ "ProjectCapexSnapshot",
+ back_populates="project",
+ cascade="all, delete-orphan",
+ order_by=lambda: ProjectCapexSnapshot.calculated_at.desc(),
+ passive_deletes=True,
+ )
@property
def latest_profitability(self) -> "ProjectProfitability | None":
@@ -68,5 +76,13 @@ class Project(Base):
return None
return self.profitability_snapshots[0]
+ @property
+ def latest_capex(self) -> "ProjectCapexSnapshot | None":
+ """Return the most recent capex snapshot, if any."""
+
+ if not self.capex_snapshots:
+ return None
+ return self.capex_snapshots[0]
+
def __repr__(self) -> str: # pragma: no cover - helpful for debugging
return f"Project(id={self.id!r}, name={self.name!r})"
diff --git a/models/scenario.py b/models/scenario.py
index 900dc3e..7234a25 100644
--- a/models/scenario.py
+++ b/models/scenario.py
@@ -20,6 +20,7 @@ from config.database import Base
from services.currency import normalise_currency
from .enums import ResourceType, ScenarioStatus, sql_enum
from .profitability_snapshot import ScenarioProfitability
+from .capex_snapshot import ScenarioCapexSnapshot
if TYPE_CHECKING: # pragma: no cover
from .financial_input import FinancialInput
@@ -83,6 +84,13 @@ class Scenario(Base):
order_by=lambda: ScenarioProfitability.calculated_at.desc(),
passive_deletes=True,
)
+ capex_snapshots: Mapped[List["ScenarioCapexSnapshot"]] = relationship(
+ "ScenarioCapexSnapshot",
+ back_populates="scenario",
+ cascade="all, delete-orphan",
+ order_by=lambda: ScenarioCapexSnapshot.calculated_at.desc(),
+ passive_deletes=True,
+ )
@validates("currency")
def _normalise_currency(self, key: str, value: str | None) -> str | None:
@@ -99,3 +107,11 @@ class Scenario(Base):
if not self.profitability_snapshots:
return None
return self.profitability_snapshots[0]
+
+ @property
+ def latest_capex(self) -> "ScenarioCapexSnapshot | None":
+ """Return the most recent capex snapshot for this scenario."""
+
+ if not self.capex_snapshots:
+ return None
+ return self.capex_snapshots[0]
diff --git a/routes/calculations.py b/routes/calculations.py
index 5a09339..8063a21 100644
--- a/routes/calculations.py
+++ b/routes/calculations.py
@@ -3,7 +3,7 @@
from __future__ import annotations
from decimal import Decimal
-from typing import Any
+from typing import Any, Sequence
from fastapi import APIRouter, Depends, Query, Request, status
from fastapi.responses import HTMLResponse, JSONResponse, Response
@@ -14,22 +14,31 @@ from starlette.datastructures import FormData
from dependencies import get_pricing_metadata, get_unit_of_work, require_authenticated_user
from models import (
Project,
+ ProjectCapexSnapshot,
ProjectProfitability,
Scenario,
+ ScenarioCapexSnapshot,
ScenarioProfitability,
User,
)
from schemas.calculations import (
+ CapexCalculationOptions,
+ CapexCalculationRequest,
+ CapexCalculationResult,
+ CapexComponentInput,
+ CapexParameters,
ProfitabilityCalculationRequest,
ProfitabilityCalculationResult,
)
-from services.calculations import calculate_profitability
-from services.exceptions import EntityNotFoundError, ProfitabilityValidationError
+from services.calculations import calculate_initial_capex, calculate_profitability
+from services.exceptions import CapexValidationError, EntityNotFoundError, ProfitabilityValidationError
from services.pricing import PricingMetadata
from services.unit_of_work import UnitOfWork
+from routes.template_filters import register_common_filters
router = APIRouter(prefix="/calculations", tags=["Calculations"])
templates = Jinja2Templates(directory="templates")
+register_common_filters(templates)
_SUPPORTED_METALS: tuple[dict[str, str], ...] = (
{"value": "copper", "label": "Copper"},
@@ -39,6 +48,14 @@ _SUPPORTED_METALS: tuple[dict[str, str], ...] = (
_SUPPORTED_METAL_VALUES = {entry["value"] for entry in _SUPPORTED_METALS}
_DEFAULT_EVALUATION_PERIODS = 10
+_CAPEX_CATEGORY_OPTIONS: tuple[dict[str, str], ...] = (
+ {"value": "equipment", "label": "Equipment"},
+ {"value": "infrastructure", "label": "Infrastructure"},
+ {"value": "land", "label": "Land & Property"},
+ {"value": "miscellaneous", "label": "Miscellaneous"},
+)
+_DEFAULT_CAPEX_HORIZON_YEARS = 5
+
def _combine_impurity_metadata(metadata: PricingMetadata) -> list[dict[str, object]]:
"""Build impurity rows combining thresholds and penalties."""
@@ -190,6 +207,266 @@ def _prepare_form_data_for_display(
return data
+def _coerce_bool(value: Any) -> bool:
+ if isinstance(value, bool):
+ return value
+ if isinstance(value, str):
+ lowered = value.strip().lower()
+ return lowered in {"1", "true", "yes", "on"}
+ return bool(value)
+
+
+def _serialise_capex_component_entry(component: Any) -> dict[str, Any]:
+ if isinstance(component, CapexComponentInput):
+ raw = component.model_dump()
+ elif isinstance(component, dict):
+ raw = dict(component)
+ else:
+ raw = {
+ "id": getattr(component, "id", None),
+ "name": getattr(component, "name", None),
+ "category": getattr(component, "category", None),
+ "amount": getattr(component, "amount", None),
+ "currency": getattr(component, "currency", None),
+ "spend_year": getattr(component, "spend_year", None),
+ "notes": getattr(component, "notes", None),
+ }
+
+ return {
+ "id": raw.get("id"),
+ "name": _value_or_blank(raw.get("name")),
+ "category": raw.get("category") or "equipment",
+ "amount": _value_or_blank(raw.get("amount")),
+ "currency": _value_or_blank(raw.get("currency")),
+ "spend_year": _value_or_blank(raw.get("spend_year")),
+ "notes": _value_or_blank(raw.get("notes")),
+ }
+
+
+def _serialise_capex_parameters(parameters: Any) -> dict[str, Any]:
+ if isinstance(parameters, CapexParameters):
+ raw = parameters.model_dump()
+ elif isinstance(parameters, dict):
+ raw = dict(parameters)
+ else:
+ raw = {}
+
+ return {
+ "currency_code": _value_or_blank(raw.get("currency_code")),
+ "contingency_pct": _value_or_blank(raw.get("contingency_pct")),
+ "discount_rate_pct": _value_or_blank(raw.get("discount_rate_pct")),
+ "evaluation_horizon_years": _value_or_blank(
+ raw.get("evaluation_horizon_years")
+ ),
+ }
+
+
+def _serialise_capex_options(options: Any) -> dict[str, Any]:
+ if isinstance(options, CapexCalculationOptions):
+ raw = options.model_dump()
+ elif isinstance(options, dict):
+ raw = dict(options)
+ else:
+ raw = {}
+
+ return {"persist": _coerce_bool(raw.get("persist", False))}
+
+
+def _build_capex_defaults(
+ *,
+ project: Project | None,
+ scenario: Scenario | None,
+) -> dict[str, Any]:
+ currency = ""
+ if scenario and getattr(scenario, "currency", None):
+ currency = str(scenario.currency).upper()
+ elif project and getattr(project, "currency", None):
+ currency = str(project.currency).upper()
+
+ discount_rate = ""
+ scenario_discount = getattr(scenario, "discount_rate", None)
+ if scenario_discount is not None:
+ discount_rate = float(scenario_discount)
+
+ return {
+ "components": [],
+ "parameters": {
+ "currency_code": currency or None,
+ "contingency_pct": None,
+ "discount_rate_pct": discount_rate,
+ "evaluation_horizon_years": _DEFAULT_CAPEX_HORIZON_YEARS,
+ },
+ "options": {
+ "persist": bool(scenario or project),
+ },
+ "currency_code": currency or None,
+ "default_horizon": _DEFAULT_CAPEX_HORIZON_YEARS,
+ "last_updated_at": getattr(scenario, "capex_updated_at", None),
+ }
+
+
+def _prepare_capex_context(
+ request: Request,
+ *,
+ project: Project | None,
+ scenario: Scenario | None,
+ form_data: dict[str, Any] | None = None,
+ result: CapexCalculationResult | None = None,
+ errors: list[str] | None = None,
+ notices: list[str] | None = None,
+ component_errors: list[str] | None = None,
+ component_notices: list[str] | None = None,
+) -> dict[str, Any]:
+ if form_data is not None and hasattr(form_data, "model_dump"):
+ form_data = form_data.model_dump() # type: ignore[assignment]
+
+ defaults = _build_capex_defaults(project=project, scenario=scenario)
+
+ raw_components: list[Any] = []
+ if form_data and "components" in form_data:
+ raw_components = list(form_data.get("components") or [])
+ components = [
+ _serialise_capex_component_entry(component) for component in raw_components
+ ]
+
+ raw_parameters = defaults["parameters"].copy()
+ if form_data and form_data.get("parameters"):
+ raw_parameters.update(
+ _serialise_capex_parameters(form_data.get("parameters"))
+ )
+ parameters = _serialise_capex_parameters(raw_parameters)
+
+ raw_options = defaults["options"].copy()
+ if form_data and form_data.get("options"):
+ raw_options.update(_serialise_capex_options(form_data.get("options")))
+ options = _serialise_capex_options(raw_options)
+
+ currency_code = parameters.get(
+ "currency_code") or defaults["currency_code"]
+
+ return {
+ "request": request,
+ "project": project,
+ "scenario": scenario,
+ "components": components,
+ "parameters": parameters,
+ "options": options,
+ "currency_code": currency_code,
+ "category_options": _CAPEX_CATEGORY_OPTIONS,
+ "default_horizon": defaults["default_horizon"],
+ "last_updated_at": defaults["last_updated_at"],
+ "result": result,
+ "errors": errors or [],
+ "notices": notices or [],
+ "component_errors": component_errors or [],
+ "component_notices": component_notices or [],
+ "cancel_url": request.headers.get("Referer"),
+ "form_action": request.url.path,
+ "csrf_token": None,
+ }
+
+
+def _format_error_location(location: tuple[Any, ...]) -> str:
+ path = ""
+ for part in location:
+ if isinstance(part, int):
+ path += f"[{part}]"
+ else:
+ if path:
+ path += f".{part}"
+ else:
+ path = str(part)
+ return path or "(input)"
+
+
+def _partition_capex_error_messages(
+ errors: Sequence[Any],
+) -> tuple[list[str], list[str]]:
+ general: list[str] = []
+ component_specific: list[str] = []
+
+ for error in errors:
+ if isinstance(error, dict):
+ mapping = error
+ else:
+ try:
+ mapping = dict(error)
+ except TypeError:
+ mapping = {}
+
+ location = tuple(mapping.get("loc", ()))
+ message = mapping.get("msg", "Invalid value")
+ formatted_location = _format_error_location(location)
+ entry = f"{formatted_location} - {message}"
+ if location and location[0] == "components":
+ component_specific.append(entry)
+ else:
+ general.append(entry)
+
+ return general, component_specific
+
+
+def _capex_form_to_payload(form: FormData) -> dict[str, Any]:
+ data: dict[str, Any] = {}
+ components: dict[int, dict[str, Any]] = {}
+ parameters: dict[str, Any] = {}
+ options: dict[str, Any] = {}
+
+ for key, value in form.multi_items():
+ normalised_value = _normalise_form_value(value)
+
+ if key.startswith("components["):
+ try:
+ index_part = key[len("components["):]
+ index_str, remainder = index_part.split("]", 1)
+ field = remainder.strip()[1:-1]
+ index = int(index_str)
+ except (ValueError, IndexError):
+ continue
+ entry = components.setdefault(index, {})
+ entry[field] = normalised_value
+ continue
+
+ if key.startswith("parameters["):
+ field = key[len("parameters["):-1]
+ parameters[field] = normalised_value
+ continue
+
+ if key.startswith("options["):
+ field = key[len("options["):-1]
+ options[field] = normalised_value
+ continue
+
+ if key == "csrf_token":
+ continue
+
+ data[key] = normalised_value
+
+ if components:
+ ordered = [
+ components[index] for index in sorted(components.keys())
+ ]
+ data["components"] = ordered
+
+ if parameters:
+ data["parameters"] = parameters
+
+ if options:
+ options["persist"] = _coerce_bool(options.get("persist"))
+ data["options"] = options
+
+ return data
+
+
+async def _extract_capex_payload(request: Request) -> dict[str, Any]:
+ content_type = request.headers.get("content-type", "").lower()
+ if content_type.startswith("application/json"):
+ body = await request.json()
+ return body if isinstance(body, dict) else {}
+ form = await request.form()
+ return _capex_form_to_payload(form)
+
+
def _prepare_default_context(
request: Request,
*,
@@ -424,6 +701,205 @@ def _persist_profitability_snapshots(
uow.project_profitability.create(project_snapshot)
+def _should_persist_capex(
+ *,
+ project: Project | None,
+ scenario: Scenario | None,
+ request_model: CapexCalculationRequest,
+) -> bool:
+ """Determine whether capex snapshots should be stored."""
+
+ persist_requested = bool(
+ getattr(request_model, "options", None)
+ and request_model.options.persist
+ )
+ return persist_requested and bool(project or scenario)
+
+
+def _persist_capex_snapshots(
+ *,
+ uow: UnitOfWork,
+ project: Project | None,
+ scenario: Scenario | None,
+ user: User | None,
+ request_model: CapexCalculationRequest,
+ result: CapexCalculationResult,
+) -> None:
+ if not _should_persist_capex(
+ project=project,
+ scenario=scenario,
+ request_model=request_model,
+ ):
+ return
+
+ created_by_id = getattr(user, "id", None)
+ totals = result.totals
+ component_count = len(result.components)
+
+ payload = {
+ "request": request_model.model_dump(mode="json"),
+ "result": result.model_dump(),
+ }
+
+ if scenario and uow.scenario_capex:
+ scenario_snapshot = ScenarioCapexSnapshot(
+ scenario_id=scenario.id,
+ created_by_id=created_by_id,
+ calculation_source="calculations.capex",
+ currency_code=result.currency,
+ total_capex=float(totals.overall),
+ contingency_pct=float(totals.contingency_pct),
+ contingency_amount=float(totals.contingency_amount),
+ total_with_contingency=float(totals.with_contingency),
+ component_count=component_count,
+ payload=payload,
+ )
+ uow.scenario_capex.create(scenario_snapshot)
+
+ if project and uow.project_capex:
+ project_snapshot = ProjectCapexSnapshot(
+ project_id=project.id,
+ created_by_id=created_by_id,
+ calculation_source="calculations.capex",
+ currency_code=result.currency,
+ total_capex=float(totals.overall),
+ contingency_pct=float(totals.contingency_pct),
+ contingency_amount=float(totals.contingency_amount),
+ total_with_contingency=float(totals.with_contingency),
+ component_count=component_count,
+ payload=payload,
+ )
+ uow.project_capex.create(project_snapshot)
+
+
+@router.get(
+ "/capex",
+ response_class=HTMLResponse,
+ name="calculations.capex_form",
+)
+def capex_form(
+ request: Request,
+ _: User = Depends(require_authenticated_user),
+ uow: UnitOfWork = Depends(get_unit_of_work),
+ project_id: int | None = Query(
+ None, description="Optional project identifier"),
+ scenario_id: int | None = Query(
+ None, description="Optional scenario identifier"),
+) -> HTMLResponse:
+ """Render the initial capex planner template with defaults."""
+
+ project, scenario = _load_project_and_scenario(
+ uow=uow, project_id=project_id, scenario_id=scenario_id
+ )
+ context = _prepare_capex_context(
+ request,
+ project=project,
+ scenario=scenario,
+ )
+ return templates.TemplateResponse("scenarios/capex.html", context)
+
+
+@router.post(
+ "/capex",
+ name="calculations.capex_submit",
+)
+async def capex_submit(
+ request: Request,
+ _: User = Depends(require_authenticated_user),
+ uow: UnitOfWork = Depends(get_unit_of_work),
+ project_id: int | None = Query(
+ None, description="Optional project identifier"),
+ scenario_id: int | None = Query(
+ None, description="Optional scenario identifier"),
+) -> Response:
+ """Process capex submissions and return aggregated results."""
+
+ wants_json = _is_json_request(request)
+ payload_data = await _extract_capex_payload(request)
+
+ try:
+ request_model = CapexCalculationRequest.model_validate(payload_data)
+ result = calculate_initial_capex(request_model)
+ except ValidationError as exc:
+ if wants_json:
+ return JSONResponse(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ content={"errors": exc.errors()},
+ )
+
+ project, scenario = _load_project_and_scenario(
+ uow=uow, project_id=project_id, scenario_id=scenario_id
+ )
+ general_errors, component_errors = _partition_capex_error_messages(
+ exc.errors()
+ )
+ context = _prepare_capex_context(
+ request,
+ project=project,
+ scenario=scenario,
+ form_data=payload_data,
+ errors=general_errors,
+ component_errors=component_errors,
+ )
+ return templates.TemplateResponse(
+ "scenarios/capex.html",
+ context,
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ )
+ except CapexValidationError as exc:
+ if wants_json:
+ return JSONResponse(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ content={
+ "errors": list(exc.field_errors or []),
+ "message": exc.message,
+ },
+ )
+
+ project, scenario = _load_project_and_scenario(
+ uow=uow, project_id=project_id, scenario_id=scenario_id
+ )
+ errors = list(exc.field_errors or []) or [exc.message]
+ context = _prepare_capex_context(
+ request,
+ project=project,
+ scenario=scenario,
+ form_data=payload_data,
+ errors=errors,
+ )
+ return templates.TemplateResponse(
+ "scenarios/capex.html",
+ context,
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ )
+
+ project, scenario = _load_project_and_scenario(
+ uow=uow, project_id=project_id, scenario_id=scenario_id
+ )
+
+ if wants_json:
+ return JSONResponse(
+ status_code=status.HTTP_200_OK,
+ content=result.model_dump(),
+ )
+
+ context = _prepare_capex_context(
+ request,
+ project=project,
+ scenario=scenario,
+ form_data=request_model.model_dump(mode="json"),
+ result=result,
+ )
+ notices = _list_from_context(context, "notices")
+ notices.append("Initial capex calculation completed successfully.")
+
+ return templates.TemplateResponse(
+ "scenarios/capex.html",
+ context,
+ status_code=status.HTTP_200_OK,
+ )
+
+
@router.get(
"/profitability",
response_class=HTMLResponse,
diff --git a/routes/reports.py b/routes/reports.py
index 9a3b65a..4a82a75 100644
--- a/routes/reports.py
+++ b/routes/reports.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-from datetime import date, datetime
+from datetime import date
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from fastapi.encoders import jsonable_encoder
@@ -24,96 +24,11 @@ from services.reporting import (
validate_percentiles,
)
from services.unit_of_work import UnitOfWork
+from routes.template_filters import register_common_filters
router = APIRouter(prefix="/reports", tags=["Reports"])
templates = Jinja2Templates(directory="templates")
-
-# Add custom Jinja2 filters
-
-
-def format_datetime(value):
- """Format a datetime object for display in templates."""
- if not isinstance(value, datetime):
- return ""
- if value.tzinfo is None:
- # Assume UTC if no timezone
- from datetime import timezone
- value = value.replace(tzinfo=timezone.utc)
- # Format as readable date/time
- return value.strftime("%Y-%m-%d %H:%M UTC")
-
-
-def currency_display(value, currency_code):
- """Format a numeric value with currency symbol/code."""
- if value is None:
- return "—"
-
- # Format the number
- if isinstance(value, (int, float)):
- formatted_value = f"{value:,.2f}"
- else:
- formatted_value = str(value)
-
- # Add currency code
- if currency_code:
- return f"{currency_code} {formatted_value}"
- return formatted_value
-
-
-def format_metric(value, metric_name, currency_code=None):
- """Format metric values appropriately based on metric type."""
- if value is None:
- return "—"
-
- # For currency-related metrics, use currency formatting
- currency_metrics = {'npv', 'inflows', 'outflows',
- 'net', 'total_inflows', 'total_outflows', 'total_net'}
- if metric_name in currency_metrics and currency_code:
- return currency_display(value, currency_code)
-
- # For percentage metrics
- percentage_metrics = {'irr', 'payback_period'}
- if metric_name in percentage_metrics:
- if isinstance(value, (int, float)):
- return f"{value:.2f}%"
- return f"{value}%"
-
- # Default numeric formatting
- if isinstance(value, (int, float)):
- return f"{value:,.2f}"
-
- return str(value)
-
-
-def percentage_display(value):
- """Format a value as a percentage."""
- if value is None:
- return "—"
-
- if isinstance(value, (int, float)):
- return f"{value:.2f}%"
-
- return f"{value}%"
-
-
-def period_display(value):
- """Format a period value (like payback period)."""
- if value is None:
- return "—"
-
- if isinstance(value, (int, float)):
- if value == int(value):
- return f"{int(value)} years"
- return f"{value:.1f} years"
-
- return str(value)
-
-
-templates.env.filters['format_datetime'] = format_datetime
-templates.env.filters['currency_display'] = currency_display
-templates.env.filters['format_metric'] = format_metric
-templates.env.filters['percentage_display'] = percentage_display
-templates.env.filters['period_display'] = period_display
+register_common_filters(templates)
READ_ROLES = ("viewer", "analyst", "project_manager", "admin")
MANAGE_ROLES = ("project_manager", "admin")
diff --git a/routes/template_filters.py b/routes/template_filters.py
new file mode 100644
index 0000000..2050279
--- /dev/null
+++ b/routes/template_filters.py
@@ -0,0 +1,95 @@
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import Any
+
+from fastapi.templating import Jinja2Templates
+
+
+def format_datetime(value: Any) -> str:
+ """Render datetime values consistently for templates."""
+ if not isinstance(value, datetime):
+ return ""
+ if value.tzinfo is None:
+ value = value.replace(tzinfo=timezone.utc)
+ return value.strftime("%Y-%m-%d %H:%M UTC")
+
+
+def currency_display(value: Any, currency_code: str | None) -> str:
+ """Format numeric values with currency context."""
+ if value is None:
+ return "—"
+ if isinstance(value, (int, float)):
+ formatted_value = f"{value:,.2f}"
+ else:
+ formatted_value = str(value)
+ if currency_code:
+ return f"{currency_code} {formatted_value}"
+ return formatted_value
+
+
+def format_metric(value: Any, metric_name: str, currency_code: str | None = None) -> str:
+ """Format metrics according to their semantic type."""
+ if value is None:
+ return "—"
+
+ currency_metrics = {
+ "npv",
+ "inflows",
+ "outflows",
+ "net",
+ "total_inflows",
+ "total_outflows",
+ "total_net",
+ }
+ if metric_name in currency_metrics and currency_code:
+ return currency_display(value, currency_code)
+
+ percentage_metrics = {"irr", "payback_period"}
+ if metric_name in percentage_metrics:
+ if isinstance(value, (int, float)):
+ return f"{value:.2f}%"
+ return f"{value}%"
+
+ if isinstance(value, (int, float)):
+ return f"{value:,.2f}"
+
+ return str(value)
+
+
+def percentage_display(value: Any) -> str:
+ """Format numeric values as percentages."""
+ if value is None:
+ return "—"
+ if isinstance(value, (int, float)):
+ return f"{value:.2f}%"
+ return f"{value}%"
+
+
+def period_display(value: Any) -> str:
+ """Format period values in years."""
+ if value is None:
+ return "—"
+ if isinstance(value, (int, float)):
+ if value == int(value):
+ return f"{int(value)} years"
+ return f"{value:.1f} years"
+ return str(value)
+
+
+def register_common_filters(templates: Jinja2Templates) -> None:
+ templates.env.filters["format_datetime"] = format_datetime
+ templates.env.filters["currency_display"] = currency_display
+ templates.env.filters["format_metric"] = format_metric
+ templates.env.filters["percentage_display"] = percentage_display
+ templates.env.filters["period_display"] = period_display
+
+
+__all__ = [
+ "format_datetime",
+ "currency_display",
+ "format_metric",
+ "percentage_display",
+ "period_display",
+ "register_common_filters",
+]
diff --git a/schemas/calculations.py b/schemas/calculations.py
index 01744f2..638b16e 100644
--- a/schemas/calculations.py
+++ b/schemas/calculations.py
@@ -97,6 +97,106 @@ class ProfitabilityCalculationResult(BaseModel):
currency: str | None
+class CapexComponentInput(BaseModel):
+ """Capex component entry supplied by the UI."""
+
+ id: int | None = Field(default=None, ge=1)
+ name: str = Field(..., min_length=1)
+ category: str = Field(..., min_length=1)
+ amount: float = Field(..., ge=0)
+ currency: str | None = Field(None, min_length=3, max_length=3)
+ spend_year: int | None = Field(None, ge=0, le=120)
+ notes: str | None = Field(None, max_length=500)
+
+ @field_validator("currency")
+ @classmethod
+ def _uppercase_currency(cls, value: str | None) -> str | None:
+ if value is None:
+ return None
+ return value.strip().upper()
+
+ @field_validator("category")
+ @classmethod
+ def _normalise_category(cls, value: str) -> str:
+ return value.strip().lower()
+
+ @field_validator("name")
+ @classmethod
+ def _trim_name(cls, value: str) -> str:
+ return value.strip()
+
+
+class CapexParameters(BaseModel):
+ """Global parameters applied to capex calculations."""
+
+ currency_code: str | None = Field(None, min_length=3, max_length=3)
+ contingency_pct: float | None = Field(0, ge=0, le=100)
+ discount_rate_pct: float | None = Field(None, ge=0, le=100)
+ evaluation_horizon_years: int | None = Field(10, ge=1, le=100)
+
+ @field_validator("currency_code")
+ @classmethod
+ def _uppercase_currency(cls, value: str | None) -> str | None:
+ if value is None:
+ return None
+ return value.strip().upper()
+
+
+class CapexCalculationOptions(BaseModel):
+ """Optional behaviour flags for capex calculations."""
+
+ persist: bool = False
+
+
+class CapexCalculationRequest(BaseModel):
+ """Request payload for capex aggregation."""
+
+ components: List[CapexComponentInput] = Field(default_factory=list)
+ parameters: CapexParameters = Field(
+ default_factory=CapexParameters, # type: ignore[arg-type]
+ )
+ options: CapexCalculationOptions = Field(
+ default_factory=CapexCalculationOptions, # type: ignore[arg-type]
+ )
+
+
+class CapexCategoryBreakdown(BaseModel):
+ """Breakdown entry describing category totals."""
+
+ category: str
+ amount: float = Field(..., ge=0)
+ share: float | None = Field(None, ge=0, le=100)
+
+
+class CapexTotals(BaseModel):
+ """Aggregated totals for capex workflows."""
+
+ overall: float = Field(..., ge=0)
+ contingency_pct: float = Field(0, ge=0, le=100)
+ contingency_amount: float = Field(..., ge=0)
+ with_contingency: float = Field(..., ge=0)
+ by_category: List[CapexCategoryBreakdown] = Field(default_factory=list)
+
+
+class CapexTimelineEntry(BaseModel):
+ """Spend profile entry grouped by year."""
+
+ year: int
+ spend: float = Field(..., ge=0)
+ cumulative: float = Field(..., ge=0)
+
+
+class CapexCalculationResult(BaseModel):
+ """Response body for capex calculations."""
+
+ totals: CapexTotals
+ timeline: List[CapexTimelineEntry] = Field(default_factory=list)
+ components: List[CapexComponentInput] = Field(default_factory=list)
+ parameters: CapexParameters
+ options: CapexCalculationOptions
+ currency: str | None
+
+
__all__ = [
"ImpurityInput",
"ProfitabilityCalculationRequest",
@@ -104,5 +204,13 @@ __all__ = [
"ProfitabilityMetrics",
"CashFlowEntry",
"ProfitabilityCalculationResult",
+ "CapexComponentInput",
+ "CapexParameters",
+ "CapexCalculationOptions",
+ "CapexCalculationRequest",
+ "CapexCategoryBreakdown",
+ "CapexTotals",
+ "CapexTimelineEntry",
+ "CapexCalculationResult",
"ValidationError",
]
diff --git a/services/calculations.py b/services/calculations.py
index 410ef5c..8d071f7 100644
--- a/services/calculations.py
+++ b/services/calculations.py
@@ -2,8 +2,10 @@
from __future__ import annotations
+from collections import defaultdict
+
from services.currency import CurrencyValidationError, normalise_currency
-from services.exceptions import ProfitabilityValidationError
+from services.exceptions import CapexValidationError, ProfitabilityValidationError
from services.financial import (
CashFlow,
ConvergenceError,
@@ -14,6 +16,13 @@ from services.financial import (
)
from services.pricing import PricingInput, PricingMetadata, PricingResult, calculate_pricing
from schemas.calculations import (
+ CapexCalculationRequest,
+ CapexCalculationResult,
+ CapexCategoryBreakdown,
+ CapexComponentInput,
+ CapexParameters,
+ CapexTotals,
+ CapexTimelineEntry,
CashFlowEntry,
ProfitabilityCalculationRequest,
ProfitabilityCalculationResult,
@@ -202,4 +211,125 @@ def calculate_profitability(
)
-__all__ = ["calculate_profitability"]
+def calculate_initial_capex(
+ request: CapexCalculationRequest,
+) -> CapexCalculationResult:
+ """Aggregate capex components into totals and timelines."""
+
+ if not request.components:
+ raise CapexValidationError(
+ "At least one capex component is required for calculation.",
+ ["components"],
+ )
+
+ parameters = request.parameters
+
+ base_currency = parameters.currency_code
+ if base_currency:
+ try:
+ base_currency = normalise_currency(base_currency)
+ except CurrencyValidationError as exc:
+ raise CapexValidationError(
+ str(exc), ["parameters.currency_code"]
+ ) from exc
+
+ overall = 0.0
+ category_totals: dict[str, float] = defaultdict(float)
+ timeline_totals: dict[int, float] = defaultdict(float)
+ normalised_components: list[CapexComponentInput] = []
+
+ for index, component in enumerate(request.components):
+ amount = float(component.amount)
+ overall += amount
+
+ category_totals[component.category] += amount
+
+ spend_year = component.spend_year or 0
+ timeline_totals[spend_year] += amount
+
+ component_currency = component.currency
+ if component_currency:
+ try:
+ component_currency = normalise_currency(component_currency)
+ except CurrencyValidationError as exc:
+ raise CapexValidationError(
+ str(exc), [f"components[{index}].currency"]
+ ) from exc
+
+ if base_currency is None and component_currency:
+ base_currency = component_currency
+ elif (
+ base_currency is not None
+ and component_currency is not None
+ and component_currency != base_currency
+ ):
+ raise CapexValidationError(
+ (
+ "Component currency does not match the global currency. "
+ f"Expected {base_currency}, got {component_currency}."
+ ),
+ [f"components[{index}].currency"],
+ )
+
+ normalised_components.append(
+ CapexComponentInput(
+ id=component.id,
+ name=component.name,
+ category=component.category,
+ amount=amount,
+ currency=component_currency,
+ spend_year=component.spend_year,
+ notes=component.notes,
+ )
+ )
+
+ contingency_pct = float(parameters.contingency_pct or 0.0)
+ contingency_amount = overall * (contingency_pct / 100.0)
+ grand_total = overall + contingency_amount
+
+ category_breakdowns: list[CapexCategoryBreakdown] = []
+ if category_totals:
+ for category, total in sorted(category_totals.items()):
+ share = (total / overall * 100.0) if overall else None
+ category_breakdowns.append(
+ CapexCategoryBreakdown(
+ category=category,
+ amount=total,
+ share=share,
+ )
+ )
+
+ cumulative = 0.0
+ timeline_entries: list[CapexTimelineEntry] = []
+ for year, spend in sorted(timeline_totals.items()):
+ cumulative += spend
+ timeline_entries.append(
+ CapexTimelineEntry(year=year, spend=spend, cumulative=cumulative)
+ )
+
+ try:
+ currency = normalise_currency(base_currency) if base_currency else None
+ except CurrencyValidationError as exc:
+ raise CapexValidationError(
+ str(exc), ["parameters.currency_code"]
+ ) from exc
+
+ totals = CapexTotals(
+ overall=overall,
+ contingency_pct=contingency_pct,
+ contingency_amount=contingency_amount,
+ with_contingency=grand_total,
+ by_category=category_breakdowns,
+ )
+
+ return CapexCalculationResult(
+ totals=totals,
+ timeline=timeline_entries,
+ components=normalised_components,
+ parameters=parameters,
+ options=request.options,
+ currency=currency,
+ )
+
+
+__all__ = ["calculate_profitability", "calculate_initial_capex"]
diff --git a/services/exceptions.py b/services/exceptions.py
index f7d592a..a22a784 100644
--- a/services/exceptions.py
+++ b/services/exceptions.py
@@ -37,3 +37,14 @@ class ProfitabilityValidationError(Exception):
def __str__(self) -> str: # pragma: no cover - mirrors message for logging
return self.message
+
+
+@dataclass(eq=False)
+class CapexValidationError(Exception):
+ """Raised when capex calculation inputs fail domain validation."""
+
+ message: str
+ field_errors: Sequence[str] | None = None
+
+ def __str__(self) -> str: # pragma: no cover - mirrors message for logging
+ return self.message
diff --git a/services/repositories.py b/services/repositories.py
index a53e31a..79ec2f1 100644
--- a/services/repositories.py
+++ b/services/repositories.py
@@ -15,9 +15,11 @@ from models import (
PricingImpuritySettings,
PricingMetalSettings,
PricingSettings,
+ ProjectCapexSnapshot,
ProjectProfitability,
Role,
Scenario,
+ ScenarioCapexSnapshot,
ScenarioProfitability,
ScenarioStatus,
SimulationParameter,
@@ -469,6 +471,106 @@ class ScenarioProfitabilityRepository:
self.session.delete(entity)
+class ProjectCapexRepository:
+ """Persistence operations for project-level capex snapshots."""
+
+ def __init__(self, session: Session) -> None:
+ self.session = session
+
+ def create(self, snapshot: ProjectCapexSnapshot) -> ProjectCapexSnapshot:
+ self.session.add(snapshot)
+ self.session.flush()
+ return snapshot
+
+ def list_for_project(
+ self,
+ project_id: int,
+ *,
+ limit: int | None = None,
+ ) -> Sequence[ProjectCapexSnapshot]:
+ stmt = (
+ select(ProjectCapexSnapshot)
+ .where(ProjectCapexSnapshot.project_id == project_id)
+ .order_by(ProjectCapexSnapshot.calculated_at.desc())
+ )
+ if limit is not None:
+ stmt = stmt.limit(limit)
+ return self.session.execute(stmt).scalars().all()
+
+ def latest_for_project(
+ self,
+ project_id: int,
+ ) -> ProjectCapexSnapshot | None:
+ stmt = (
+ select(ProjectCapexSnapshot)
+ .where(ProjectCapexSnapshot.project_id == project_id)
+ .order_by(ProjectCapexSnapshot.calculated_at.desc())
+ .limit(1)
+ )
+ return self.session.execute(stmt).scalar_one_or_none()
+
+ def delete(self, snapshot_id: int) -> None:
+ stmt = select(ProjectCapexSnapshot).where(
+ ProjectCapexSnapshot.id == snapshot_id
+ )
+ entity = self.session.execute(stmt).scalar_one_or_none()
+ if entity is None:
+ raise EntityNotFoundError(
+ f"Project capex snapshot {snapshot_id} not found"
+ )
+ self.session.delete(entity)
+
+
+class ScenarioCapexRepository:
+ """Persistence operations for scenario-level capex snapshots."""
+
+ def __init__(self, session: Session) -> None:
+ self.session = session
+
+ def create(self, snapshot: ScenarioCapexSnapshot) -> ScenarioCapexSnapshot:
+ self.session.add(snapshot)
+ self.session.flush()
+ return snapshot
+
+ def list_for_scenario(
+ self,
+ scenario_id: int,
+ *,
+ limit: int | None = None,
+ ) -> Sequence[ScenarioCapexSnapshot]:
+ stmt = (
+ select(ScenarioCapexSnapshot)
+ .where(ScenarioCapexSnapshot.scenario_id == scenario_id)
+ .order_by(ScenarioCapexSnapshot.calculated_at.desc())
+ )
+ if limit is not None:
+ stmt = stmt.limit(limit)
+ return self.session.execute(stmt).scalars().all()
+
+ def latest_for_scenario(
+ self,
+ scenario_id: int,
+ ) -> ScenarioCapexSnapshot | None:
+ stmt = (
+ select(ScenarioCapexSnapshot)
+ .where(ScenarioCapexSnapshot.scenario_id == scenario_id)
+ .order_by(ScenarioCapexSnapshot.calculated_at.desc())
+ .limit(1)
+ )
+ return self.session.execute(stmt).scalar_one_or_none()
+
+ def delete(self, snapshot_id: int) -> None:
+ stmt = select(ScenarioCapexSnapshot).where(
+ ScenarioCapexSnapshot.id == snapshot_id
+ )
+ entity = self.session.execute(stmt).scalar_one_or_none()
+ if entity is None:
+ raise EntityNotFoundError(
+ f"Scenario capex snapshot {snapshot_id} not found"
+ )
+ self.session.delete(entity)
+
+
class FinancialInputRepository:
"""Persistence operations for FinancialInput entities."""
diff --git a/services/unit_of_work.py b/services/unit_of_work.py
index 91e3d80..821a34b 100644
--- a/services/unit_of_work.py
+++ b/services/unit_of_work.py
@@ -14,9 +14,11 @@ from services.repositories import (
PricingSettingsSeedResult,
ProjectRepository,
ProjectProfitabilityRepository,
+ ProjectCapexRepository,
RoleRepository,
ScenarioRepository,
ScenarioProfitabilityRepository,
+ ScenarioCapexRepository,
SimulationParameterRepository,
UserRepository,
ensure_admin_user as ensure_admin_user_record,
@@ -39,7 +41,9 @@ class UnitOfWork(AbstractContextManager["UnitOfWork"]):
self.financial_inputs: FinancialInputRepository | None = None
self.simulation_parameters: SimulationParameterRepository | None = None
self.project_profitability: ProjectProfitabilityRepository | None = None
+ self.project_capex: ProjectCapexRepository | None = None
self.scenario_profitability: ScenarioProfitabilityRepository | None = None
+ self.scenario_capex: ScenarioCapexRepository | None = None
self.users: UserRepository | None = None
self.roles: RoleRepository | None = None
self.pricing_settings: PricingSettingsRepository | None = None
@@ -53,9 +57,11 @@ class UnitOfWork(AbstractContextManager["UnitOfWork"]):
self.session)
self.project_profitability = ProjectProfitabilityRepository(
self.session)
+ self.project_capex = ProjectCapexRepository(self.session)
self.scenario_profitability = ScenarioProfitabilityRepository(
self.session
)
+ self.scenario_capex = ScenarioCapexRepository(self.session)
self.users = UserRepository(self.session)
self.roles = RoleRepository(self.session)
self.pricing_settings = PricingSettingsRepository(self.session)
@@ -75,7 +81,9 @@ class UnitOfWork(AbstractContextManager["UnitOfWork"]):
self.financial_inputs = None
self.simulation_parameters = None
self.project_profitability = None
+ self.project_capex = None
self.scenario_profitability = None
+ self.scenario_capex = None
self.users = None
self.roles = None
self.pricing_settings = None
diff --git a/templates/scenarios/capex.html b/templates/scenarios/capex.html
new file mode 100644
index 0000000..cb79566
--- /dev/null
+++ b/templates/scenarios/capex.html
@@ -0,0 +1,264 @@
+{% extends "base.html" %}
+{% block title %}Initial Capex Planner · CalMiner{% endblock %}
+
+{% block content %}
+
+
+ Capture upfront capital requirements and review categorized totals.Initial Capex Planner
+
Calculated totals and categorized breakdowns.
++ {{ result.totals.overall | currency_display(result.currency) }} +
++ {{ result.totals.contingency_amount | currency_display(result.currency) }} +
++ {{ result.totals.with_contingency | currency_display(result.currency) }} +
+| Category | +Amount | +Share | +
|---|---|---|
| {{ row.category }} | +{{ row.amount | currency_display(result.currency) }} | +{{ row.share | percentage_display }} | +
| Year | +Spend | +Cumulative | +
|---|---|---|
| {{ entry.year }} | +{{ entry.spend | currency_display(result.currency) }} | +{{ entry.cumulative | currency_display(result.currency) }} | +
Provide component details and calculate to see capex totals.
+ {% endif %} +Charts render after calculations complete.
+