diff --git a/main.py b/main.py
index 6d817c3..077c26b 100644
--- a/main.py
+++ b/main.py
@@ -10,6 +10,7 @@ from middleware.metrics import MetricsMiddleware
from middleware.validation import validate_json
from routes.auth import router as auth_router
from routes.dashboard import router as dashboard_router
+from routes.calculations import router as calculations_router
from routes.imports import router as imports_router
from routes.exports import router as exports_router
from routes.projects import router as projects_router
@@ -40,7 +41,16 @@ async def health() -> dict[str, str]:
return {"status": "ok"}
+@app.get("/favicon.ico", include_in_schema=False)
+async def favicon() -> Response:
+ static_directory = "static"
+ img_directory = f"{static_directory}/img"
+ favicon_img = "logo_32x32.png"
+ return StaticFiles(directory=img_directory).lookup_path(favicon_img)[0]
+
+
@app.on_event("startup")
+# TODO: use lifespan events for startup/shutdown tasks
async def ensure_admin_bootstrap() -> None:
settings = get_settings()
admin_settings = settings.admin_bootstrap_settings()
@@ -93,6 +103,7 @@ async def ensure_admin_bootstrap() -> None:
app.include_router(dashboard_router)
+app.include_router(calculations_router)
app.include_router(auth_router)
app.include_router(imports_router)
app.include_router(exports_router)
diff --git a/models/__init__.py b/models/__init__.py
index ed5eb19..df529c1 100644
--- a/models/__init__.py
+++ b/models/__init__.py
@@ -27,16 +27,19 @@ from .project import Project
from .scenario import Scenario
from .simulation_parameter import SimulationParameter
from .user import Role, User, UserRole, password_context
+from .profitability_snapshot import ProjectProfitability, ScenarioProfitability
__all__ = [
"FinancialCategory",
"FinancialInput",
"MiningOperationType",
"Project",
+ "ProjectProfitability",
"PricingSettings",
"PricingMetalSettings",
"PricingImpuritySettings",
"Scenario",
+ "ScenarioProfitability",
"ScenarioStatus",
"DistributionType",
"SimulationParameter",
diff --git a/models/profitability_snapshot.py b/models/profitability_snapshot.py
new file mode 100644
index 0000000..a9572e7
--- /dev/null
+++ b/models/profitability_snapshot.py
@@ -0,0 +1,133 @@
+from __future__ import annotations
+
+from datetime import datetime
+from typing import TYPE_CHECKING
+
+from sqlalchemy import JSON, DateTime, ForeignKey, Integer, Numeric, String
+from sqlalchemy.orm import Mapped, mapped_column, relationship
+from sqlalchemy.sql import func
+
+from config.database import Base
+
+if TYPE_CHECKING: # pragma: no cover
+ from .project import Project
+ from .scenario import Scenario
+ from .user import User
+
+
+class ProjectProfitability(Base):
+ """Snapshot of aggregated profitability metrics at the project level."""
+
+ __tablename__ = "project_profitability_snapshots"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ project_id: Mapped[int] = mapped_column(
+ ForeignKey("projects.id", ondelete="CASCADE"), nullable=False, index=True
+ )
+ created_by_id: Mapped[int | None] = mapped_column(
+ ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
+ )
+ calculation_source: Mapped[str | None] = mapped_column(
+ String(64), nullable=True)
+ calculated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now()
+ )
+ currency_code: Mapped[str | None] = mapped_column(String(3), nullable=True)
+ npv: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
+ irr_pct: Mapped[float | None] = mapped_column(
+ Numeric(12, 6), nullable=True)
+ payback_period_years: Mapped[float | None] = mapped_column(
+ Numeric(12, 4), nullable=True
+ )
+ margin_pct: Mapped[float | None] = mapped_column(
+ Numeric(12, 6), nullable=True)
+ revenue_total: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ processing_opex_total: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True
+ )
+ sustaining_capex_total: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True
+ )
+ initial_capex: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ net_cash_flow_total: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True
+ )
+ payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now()
+ )
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
+ )
+
+ project: Mapped[Project] = relationship(
+ "Project", back_populates="profitability_snapshots")
+ created_by: Mapped[User | None] = relationship("User")
+
+ def __repr__(self) -> str: # pragma: no cover
+ return (
+ "ProjectProfitability(id={id!r}, project_id={project_id!r}, npv={npv!r})".format(
+ id=self.id, project_id=self.project_id, npv=self.npv
+ )
+ )
+
+
+class ScenarioProfitability(Base):
+ """Snapshot of profitability metrics for an individual scenario."""
+
+ __tablename__ = "scenario_profitability_snapshots"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ scenario_id: Mapped[int] = mapped_column(
+ ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False, index=True
+ )
+ created_by_id: Mapped[int | None] = mapped_column(
+ ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
+ )
+ calculation_source: Mapped[str | None] = mapped_column(
+ String(64), nullable=True)
+ calculated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now()
+ )
+ currency_code: Mapped[str | None] = mapped_column(String(3), nullable=True)
+ npv: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
+ irr_pct: Mapped[float | None] = mapped_column(
+ Numeric(12, 6), nullable=True)
+ payback_period_years: Mapped[float | None] = mapped_column(
+ Numeric(12, 4), nullable=True
+ )
+ margin_pct: Mapped[float | None] = mapped_column(
+ Numeric(12, 6), nullable=True)
+ revenue_total: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ processing_opex_total: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True
+ )
+ sustaining_capex_total: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True
+ )
+ initial_capex: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True)
+ net_cash_flow_total: Mapped[float | None] = mapped_column(
+ Numeric(18, 2), nullable=True
+ )
+ payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now()
+ )
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
+ )
+
+ scenario: Mapped[Scenario] = relationship(
+ "Scenario", back_populates="profitability_snapshots")
+ created_by: Mapped[User | None] = relationship("User")
+
+ def __repr__(self) -> str: # pragma: no cover
+ return (
+ "ScenarioProfitability(id={id!r}, scenario_id={scenario_id!r}, npv={npv!r})".format(
+ id=self.id, scenario_id=self.scenario_id, npv=self.npv
+ )
+ )
diff --git a/models/project.py b/models/project.py
index b5814c0..2b5c3f7 100644
--- a/models/project.py
+++ b/models/project.py
@@ -4,6 +4,7 @@ from datetime import datetime
from typing import TYPE_CHECKING, List
from .enums import MiningOperationType, sql_enum
+from .profitability_snapshot import ProjectProfitability
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
@@ -51,6 +52,21 @@ class Project(Base):
"PricingSettings",
back_populates="projects",
)
+ profitability_snapshots: Mapped[List["ProjectProfitability"]] = relationship(
+ "ProjectProfitability",
+ back_populates="project",
+ cascade="all, delete-orphan",
+ order_by=lambda: ProjectProfitability.calculated_at.desc(),
+ passive_deletes=True,
+ )
+
+ @property
+ def latest_profitability(self) -> "ProjectProfitability | None":
+ """Return the most recent profitability snapshot, if any."""
+
+ if not self.profitability_snapshots:
+ return None
+ return self.profitability_snapshots[0]
def __repr__(self) -> str: # pragma: no cover - helpful for debugging
return f"Project(id={self.id!r}, name={self.name!r})"
diff --git a/models/scenario.py b/models/scenario.py
index 643dd4f..900dc3e 100644
--- a/models/scenario.py
+++ b/models/scenario.py
@@ -19,6 +19,7 @@ from sqlalchemy.sql import func
from config.database import Base
from services.currency import normalise_currency
from .enums import ResourceType, ScenarioStatus, sql_enum
+from .profitability_snapshot import ScenarioProfitability
if TYPE_CHECKING: # pragma: no cover
from .financial_input import FinancialInput
@@ -75,6 +76,13 @@ class Scenario(Base):
cascade="all, delete-orphan",
passive_deletes=True,
)
+ profitability_snapshots: Mapped[List["ScenarioProfitability"]] = relationship(
+ "ScenarioProfitability",
+ back_populates="scenario",
+ cascade="all, delete-orphan",
+ order_by=lambda: ScenarioProfitability.calculated_at.desc(),
+ passive_deletes=True,
+ )
@validates("currency")
def _normalise_currency(self, key: str, value: str | None) -> str | None:
@@ -83,3 +91,11 @@ class Scenario(Base):
def __repr__(self) -> str: # pragma: no cover
return f"Scenario(id={self.id!r}, name={self.name!r}, project_id={self.project_id!r})"
+
+ @property
+ def latest_profitability(self) -> "ScenarioProfitability | None":
+ """Return the most recent profitability snapshot for this scenario."""
+
+ if not self.profitability_snapshots:
+ return None
+ return self.profitability_snapshots[0]
diff --git a/routes/calculations.py b/routes/calculations.py
new file mode 100644
index 0000000..5a09339
--- /dev/null
+++ b/routes/calculations.py
@@ -0,0 +1,571 @@
+"""Routes handling financial calculation workflows."""
+
+from __future__ import annotations
+
+from decimal import Decimal
+from typing import Any
+
+from fastapi import APIRouter, Depends, Query, Request, status
+from fastapi.responses import HTMLResponse, JSONResponse, Response
+from fastapi.templating import Jinja2Templates
+from pydantic import ValidationError
+from starlette.datastructures import FormData
+
+from dependencies import get_pricing_metadata, get_unit_of_work, require_authenticated_user
+from models import (
+ Project,
+ ProjectProfitability,
+ Scenario,
+ ScenarioProfitability,
+ User,
+)
+from schemas.calculations import (
+ ProfitabilityCalculationRequest,
+ ProfitabilityCalculationResult,
+)
+from services.calculations import calculate_profitability
+from services.exceptions import EntityNotFoundError, ProfitabilityValidationError
+from services.pricing import PricingMetadata
+from services.unit_of_work import UnitOfWork
+
+router = APIRouter(prefix="/calculations", tags=["Calculations"])
+templates = Jinja2Templates(directory="templates")
+
+_SUPPORTED_METALS: tuple[dict[str, str], ...] = (
+ {"value": "copper", "label": "Copper"},
+ {"value": "gold", "label": "Gold"},
+ {"value": "lithium", "label": "Lithium"},
+)
+_SUPPORTED_METAL_VALUES = {entry["value"] for entry in _SUPPORTED_METALS}
+_DEFAULT_EVALUATION_PERIODS = 10
+
+
+def _combine_impurity_metadata(metadata: PricingMetadata) -> list[dict[str, object]]:
+ """Build impurity rows combining thresholds and penalties."""
+
+ thresholds = getattr(metadata, "impurity_thresholds", {}) or {}
+ penalties = getattr(metadata, "impurity_penalty_per_ppm", {}) or {}
+ impurity_codes = sorted({*thresholds.keys(), *penalties.keys()})
+
+ combined: list[dict[str, object]] = []
+ for code in impurity_codes:
+ combined.append(
+ {
+ "name": code,
+ "threshold": float(thresholds.get(code, 0.0)),
+ "penalty": float(penalties.get(code, 0.0)),
+ "value": None,
+ }
+ )
+ return combined
+
+
+def _value_or_blank(value: Any) -> Any:
+ if value is None:
+ return ""
+ if isinstance(value, Decimal):
+ return float(value)
+ return value
+
+
+def _normalise_impurity_entries(entries: Any) -> list[dict[str, Any]]:
+ if not entries:
+ return []
+
+ normalised: list[dict[str, Any]] = []
+ for entry in entries:
+ if isinstance(entry, dict):
+ getter = entry.get # type: ignore[assignment]
+ else:
+ def getter(key, default=None, _entry=entry): return getattr(
+ _entry, key, default)
+
+ normalised.append(
+ {
+ "name": getter("name", "") or "",
+ "value": _value_or_blank(getter("value")),
+ "threshold": _value_or_blank(getter("threshold")),
+ "penalty": _value_or_blank(getter("penalty")),
+ }
+ )
+ return normalised
+
+
+def _build_default_form_data(
+ *,
+ metadata: PricingMetadata,
+ project: Project | None,
+ scenario: Scenario | None,
+) -> dict[str, Any]:
+ payable_default = (
+ float(metadata.default_payable_pct)
+ if getattr(metadata, "default_payable_pct", None) is not None
+ else 100.0
+ )
+ moisture_threshold_default = (
+ float(metadata.moisture_threshold_pct)
+ if getattr(metadata, "moisture_threshold_pct", None) is not None
+ else 0.0
+ )
+ moisture_penalty_default = (
+ float(metadata.moisture_penalty_per_pct)
+ if getattr(metadata, "moisture_penalty_per_pct", None) is not None
+ else 0.0
+ )
+
+ base_metal_entry = next(iter(_SUPPORTED_METALS), None)
+ metal = base_metal_entry["value"] if base_metal_entry else ""
+ scenario_resource = getattr(scenario, "primary_resource", None)
+ if scenario_resource is not None:
+ candidate = getattr(scenario_resource, "value", str(scenario_resource))
+ if candidate in _SUPPORTED_METAL_VALUES:
+ metal = candidate
+
+ currency = ""
+ scenario_currency = getattr(scenario, "currency", None)
+ metadata_currency = getattr(metadata, "default_currency", None)
+ if scenario_currency:
+ currency = str(scenario_currency).upper()
+ elif metadata_currency:
+ currency = str(metadata_currency).upper()
+
+ discount_rate = ""
+ scenario_discount = getattr(scenario, "discount_rate", None)
+ if scenario_discount is not None:
+ discount_rate = float(scenario_discount) # type: ignore[arg-type]
+
+ return {
+ "metal": metal,
+ "ore_tonnage": "",
+ "head_grade_pct": "",
+ "recovery_pct": "",
+ "payable_pct": payable_default,
+ "reference_price": "",
+ "treatment_charge": "",
+ "smelting_charge": "",
+ "processing_opex": "",
+ "moisture_pct": "",
+ "moisture_threshold_pct": moisture_threshold_default,
+ "moisture_penalty_per_pct": moisture_penalty_default,
+ "premiums": "",
+ "fx_rate": 1.0,
+ "currency_code": currency,
+ "impurities": None,
+ "initial_capex": "",
+ "sustaining_capex": "",
+ "discount_rate": discount_rate,
+ "periods": _DEFAULT_EVALUATION_PERIODS,
+ }
+
+
+def _prepare_form_data_for_display(
+ *,
+ defaults: dict[str, Any],
+ overrides: dict[str, Any] | None = None,
+ allow_empty_override: bool = False,
+) -> dict[str, Any]:
+ data = dict(defaults)
+
+ if overrides:
+ for key, value in overrides.items():
+ if key == "csrf_token":
+ continue
+ if key == "impurities":
+ data["impurities"] = _normalise_impurity_entries(value)
+ continue
+ if value is None and not allow_empty_override:
+ continue
+ data[key] = _value_or_blank(value)
+
+ # Normalise defaults and ensure strings for None.
+ for key, value in list(data.items()):
+ if key == "impurities":
+ if value is None:
+ data[key] = None
+ else:
+ data[key] = _normalise_impurity_entries(value)
+ continue
+ data[key] = _value_or_blank(value)
+
+ return data
+
+
+def _prepare_default_context(
+ request: Request,
+ *,
+ project: Project | None = None,
+ scenario: Scenario | None = None,
+ metadata: PricingMetadata,
+ form_data: dict[str, Any] | None = None,
+ allow_empty_override: bool = False,
+ result: ProfitabilityCalculationResult | None = None,
+) -> dict[str, object]:
+ """Assemble template context shared across calculation endpoints."""
+
+ defaults = _build_default_form_data(
+ metadata=metadata,
+ project=project,
+ scenario=scenario,
+ )
+ data = _prepare_form_data_for_display(
+ defaults=defaults,
+ overrides=form_data,
+ allow_empty_override=allow_empty_override,
+ )
+
+ return {
+ "request": request,
+ "project": project,
+ "scenario": scenario,
+ "metadata": metadata,
+ "metadata_impurities": _combine_impurity_metadata(metadata),
+ "supported_metals": _SUPPORTED_METALS,
+ "data": data,
+ "result": result,
+ "errors": [],
+ "notices": [],
+ "cancel_url": request.headers.get("Referer"),
+ "form_action": request.url.path,
+ "csrf_token": None,
+ "default_periods": _DEFAULT_EVALUATION_PERIODS,
+ }
+
+
+def _load_project_and_scenario(
+ *,
+ uow: UnitOfWork,
+ project_id: int | None,
+ scenario_id: int | None,
+) -> tuple[Project | None, Scenario | None]:
+ project: Project | None = None
+ scenario: Scenario | None = None
+
+ if project_id is not None and uow.projects is not None:
+ try:
+ project = uow.projects.get(project_id, with_children=False)
+ except EntityNotFoundError:
+ project = None
+
+ if scenario_id is not None and uow.scenarios is not None:
+ try:
+ scenario = uow.scenarios.get(scenario_id, with_children=False)
+ except EntityNotFoundError:
+ scenario = None
+ if scenario is not None and project is None:
+ project = scenario.project
+
+ return project, scenario
+
+
+def _is_json_request(request: Request) -> bool:
+ content_type = request.headers.get("content-type", "").lower()
+ accept = request.headers.get("accept", "").lower()
+ return "application/json" in content_type or "application/json" in accept
+
+
+def _normalise_form_value(value: Any) -> Any:
+ if isinstance(value, str):
+ stripped = value.strip()
+ return stripped if stripped != "" else None
+ return value
+
+
+def _form_to_payload(form: FormData) -> dict[str, Any]:
+ data: dict[str, Any] = {}
+ impurities: dict[int, dict[str, Any]] = {}
+
+ for key, value in form.multi_items():
+ normalised_value = _normalise_form_value(value)
+ if key.startswith("impurities[") and "]" in key:
+ try:
+ index_part = key.split("[", 1)[1]
+ index_str, remainder = index_part.split("]", 1)
+ field = remainder.strip("[]")
+ if not field:
+ continue
+ index = int(index_str)
+ except (ValueError, IndexError):
+ continue
+ entry = impurities.setdefault(index, {})
+ entry[field] = normalised_value
+ continue
+
+ if key == "csrf_token":
+ continue
+ data[key] = normalised_value
+
+ if impurities:
+ ordered = []
+ for _, entry in sorted(impurities.items()):
+ if not entry.get("name"):
+ continue
+ ordered.append(entry)
+ if ordered:
+ data["impurities"] = ordered
+
+ return data
+
+
+async def _extract_payload(request: Request) -> dict[str, Any]:
+ if request.headers.get("content-type", "").lower().startswith("application/json"):
+ return await request.json()
+ form = await request.form()
+ return _form_to_payload(form)
+
+
+def _list_from_context(context: dict[str, Any], key: str) -> list:
+ value = context.get(key)
+ if isinstance(value, list):
+ return value
+ new_list: list = []
+ context[key] = new_list
+ return new_list
+
+
+def _should_persist_snapshot(
+ *,
+ project: Project | None,
+ scenario: Scenario | None,
+ payload: ProfitabilityCalculationRequest,
+) -> bool:
+ """Determine whether to persist the profitability result.
+
+ Current strategy persists automatically when a scenario or project context
+ is provided. This can be refined later to honour explicit user choices.
+ """
+
+ return bool(scenario or project)
+
+
+def _persist_profitability_snapshots(
+ *,
+ uow: UnitOfWork,
+ project: Project | None,
+ scenario: Scenario | None,
+ user: User | None,
+ request_model: ProfitabilityCalculationRequest,
+ result: ProfitabilityCalculationResult,
+) -> None:
+ if not _should_persist_snapshot(project=project, scenario=scenario, payload=request_model):
+ return
+
+ created_by_id = getattr(user, "id", None)
+
+ revenue_total = float(result.pricing.net_revenue)
+ processing_total = float(result.costs.processing_opex_total)
+ sustaining_total = float(result.costs.sustaining_capex_total)
+ initial_capex = float(result.costs.initial_capex)
+ net_cash_flow_total = revenue_total - (
+ processing_total + sustaining_total + initial_capex
+ )
+
+ npv_value = (
+ float(result.metrics.npv)
+ if result.metrics.npv is not None
+ else None
+ )
+ irr_value = (
+ float(result.metrics.irr)
+ if result.metrics.irr is not None
+ else None
+ )
+ payback_value = (
+ float(result.metrics.payback_period)
+ if result.metrics.payback_period is not None
+ else None
+ )
+ margin_value = (
+ float(result.metrics.margin)
+ if result.metrics.margin is not None
+ else None
+ )
+
+ payload = {
+ "request": request_model.model_dump(mode="json"),
+ "result": result.model_dump(),
+ }
+
+ if scenario and uow.scenario_profitability:
+ scenario_snapshot = ScenarioProfitability(
+ scenario_id=scenario.id,
+ created_by_id=created_by_id,
+ calculation_source="calculations.profitability",
+ currency_code=result.currency,
+ npv=npv_value,
+ irr_pct=irr_value,
+ payback_period_years=payback_value,
+ margin_pct=margin_value,
+ revenue_total=revenue_total,
+ processing_opex_total=processing_total,
+ sustaining_capex_total=sustaining_total,
+ initial_capex=initial_capex,
+ net_cash_flow_total=net_cash_flow_total,
+ payload=payload,
+ )
+ uow.scenario_profitability.create(scenario_snapshot)
+
+ if project and uow.project_profitability:
+ project_snapshot = ProjectProfitability(
+ project_id=project.id,
+ created_by_id=created_by_id,
+ calculation_source="calculations.profitability",
+ currency_code=result.currency,
+ npv=npv_value,
+ irr_pct=irr_value,
+ payback_period_years=payback_value,
+ margin_pct=margin_value,
+ revenue_total=revenue_total,
+ processing_opex_total=processing_total,
+ sustaining_capex_total=sustaining_total,
+ initial_capex=initial_capex,
+ net_cash_flow_total=net_cash_flow_total,
+ payload=payload,
+ )
+ uow.project_profitability.create(project_snapshot)
+
+
+@router.get(
+ "/profitability",
+ response_class=HTMLResponse,
+ name="calculations.profitability_form",
+)
+def profitability_form(
+ request: Request,
+ _: User = Depends(require_authenticated_user),
+ metadata: PricingMetadata = Depends(get_pricing_metadata),
+ uow: UnitOfWork = Depends(get_unit_of_work),
+ project_id: int | None = Query(
+ None, description="Optional project identifier"),
+ scenario_id: int | None = Query(
+ None, description="Optional scenario identifier"),
+) -> HTMLResponse:
+ """Render the profitability calculation form with default metadata."""
+
+ project, scenario = _load_project_and_scenario(
+ uow=uow, project_id=project_id, scenario_id=scenario_id
+ )
+ context = _prepare_default_context(
+ request,
+ project=project,
+ scenario=scenario,
+ metadata=metadata,
+ )
+
+ return templates.TemplateResponse("scenarios/profitability.html", context)
+
+
+@router.post(
+ "/profitability",
+ name="calculations.profitability_submit",
+)
+async def profitability_submit(
+ request: Request,
+ current_user: User = Depends(require_authenticated_user),
+ metadata: PricingMetadata = Depends(get_pricing_metadata),
+ uow: UnitOfWork = Depends(get_unit_of_work),
+ project_id: int | None = Query(
+ None, description="Optional project identifier"),
+ scenario_id: int | None = Query(
+ None, description="Optional scenario identifier"),
+) -> Response:
+ """Handle profitability calculations and return HTML or JSON."""
+
+ wants_json = _is_json_request(request)
+ payload_data = await _extract_payload(request)
+
+ try:
+ request_model = ProfitabilityCalculationRequest.model_validate(
+ payload_data)
+ result = calculate_profitability(request_model, metadata=metadata)
+ except ValidationError as exc:
+ if wants_json:
+ return JSONResponse(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ content={"errors": exc.errors()},
+ )
+
+ project, scenario = _load_project_and_scenario(
+ uow=uow, project_id=project_id, scenario_id=scenario_id
+ )
+ context = _prepare_default_context(
+ request,
+ project=project,
+ scenario=scenario,
+ metadata=metadata,
+ form_data=payload_data,
+ allow_empty_override=True,
+ )
+ errors = _list_from_context(context, "errors")
+ errors.extend(
+ [f"{err['loc']} - {err['msg']}" for err in exc.errors()]
+ )
+ return templates.TemplateResponse(
+ "scenarios/profitability.html",
+ context,
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ )
+ except ProfitabilityValidationError as exc:
+ if wants_json:
+ return JSONResponse(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ content={
+ "errors": exc.field_errors or [],
+ "message": exc.message,
+ },
+ )
+
+ project, scenario = _load_project_and_scenario(
+ uow=uow, project_id=project_id, scenario_id=scenario_id
+ )
+ context = _prepare_default_context(
+ request,
+ project=project,
+ scenario=scenario,
+ metadata=metadata,
+ form_data=payload_data,
+ allow_empty_override=True,
+ )
+ messages = list(exc.field_errors or []) or [exc.message]
+ errors = _list_from_context(context, "errors")
+ errors.extend(messages)
+ return templates.TemplateResponse(
+ "scenarios/profitability.html",
+ context,
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ )
+
+ project, scenario = _load_project_and_scenario(
+ uow=uow, project_id=project_id, scenario_id=scenario_id
+ )
+
+ _persist_profitability_snapshots(
+ uow=uow,
+ project=project,
+ scenario=scenario,
+ user=current_user,
+ request_model=request_model,
+ result=result,
+ )
+
+ if wants_json:
+ return JSONResponse(
+ status_code=status.HTTP_200_OK,
+ content=result.model_dump(),
+ )
+
+ context = _prepare_default_context(
+ request,
+ project=project,
+ scenario=scenario,
+ metadata=metadata,
+ form_data=request_model.model_dump(mode="json"),
+ result=result,
+ )
+ notices = _list_from_context(context, "notices")
+ notices.append("Profitability calculation completed successfully.")
+
+ return templates.TemplateResponse(
+ "scenarios/profitability.html",
+ context,
+ status_code=status.HTTP_200_OK,
+ )
diff --git a/schemas/calculations.py b/schemas/calculations.py
new file mode 100644
index 0000000..01744f2
--- /dev/null
+++ b/schemas/calculations.py
@@ -0,0 +1,108 @@
+"""Pydantic schemas for calculation workflows."""
+
+from __future__ import annotations
+
+from typing import List, Optional
+
+from pydantic import BaseModel, Field, PositiveFloat, ValidationError, field_validator
+
+from services.pricing import PricingResult
+
+
+class ImpurityInput(BaseModel):
+ """Impurity configuration row supplied by the client."""
+
+ name: str = Field(..., min_length=1)
+ value: float | None = Field(None, ge=0)
+ threshold: float | None = Field(None, ge=0)
+ penalty: float | None = Field(None)
+
+ @field_validator("name")
+ @classmethod
+ def _normalise_name(cls, value: str) -> str:
+ return value.strip()
+
+
+class ProfitabilityCalculationRequest(BaseModel):
+ """Request payload for profitability calculations."""
+
+ metal: str = Field(..., min_length=1)
+ ore_tonnage: PositiveFloat
+ head_grade_pct: float = Field(..., gt=0, le=100)
+ recovery_pct: float = Field(..., gt=0, le=100)
+ payable_pct: float | None = Field(None, gt=0, le=100)
+ reference_price: PositiveFloat
+ treatment_charge: float = Field(0, ge=0)
+ smelting_charge: float = Field(0, ge=0)
+ moisture_pct: float = Field(0, ge=0, le=100)
+ moisture_threshold_pct: float | None = Field(None, ge=0, le=100)
+ moisture_penalty_per_pct: float | None = None
+ premiums: float = Field(0)
+ fx_rate: PositiveFloat = Field(1)
+ currency_code: str | None = Field(None, min_length=3, max_length=3)
+ processing_opex: float = Field(0, ge=0)
+ sustaining_capex: float = Field(0, ge=0)
+ initial_capex: float = Field(0, ge=0)
+ discount_rate: float | None = Field(None, ge=0, le=100)
+ periods: int = Field(10, ge=1, le=120)
+ impurities: List[ImpurityInput] = Field(default_factory=list)
+
+ @field_validator("currency_code")
+ @classmethod
+ def _uppercase_currency(cls, value: str | None) -> str | None:
+ if value is None:
+ return None
+ return value.strip().upper()
+
+ @field_validator("metal")
+ @classmethod
+ def _normalise_metal(cls, value: str) -> str:
+ return value.strip().lower()
+
+
+class ProfitabilityCosts(BaseModel):
+ """Aggregated cost components for profitability output."""
+
+ processing_opex_total: float
+ sustaining_capex_total: float
+ initial_capex: float
+
+
+class ProfitabilityMetrics(BaseModel):
+ """Financial KPIs yielded by the profitability calculation."""
+
+ npv: float | None
+ irr: float | None
+ payback_period: float | None
+ margin: float | None
+
+
+class CashFlowEntry(BaseModel):
+ """Normalized cash flow row for reporting and charting."""
+
+ period: int
+ revenue: float
+ processing_opex: float
+ sustaining_capex: float
+ net: float
+
+
+class ProfitabilityCalculationResult(BaseModel):
+ """Response body summarizing profitability calculation outputs."""
+
+ pricing: PricingResult
+ costs: ProfitabilityCosts
+ metrics: ProfitabilityMetrics
+ cash_flows: list[CashFlowEntry]
+ currency: str | None
+
+
+__all__ = [
+ "ImpurityInput",
+ "ProfitabilityCalculationRequest",
+ "ProfitabilityCosts",
+ "ProfitabilityMetrics",
+ "CashFlowEntry",
+ "ProfitabilityCalculationResult",
+ "ValidationError",
+]
diff --git a/scripts/_route_verification.py b/scripts/_route_verification.py
new file mode 100644
index 0000000..2b97182
--- /dev/null
+++ b/scripts/_route_verification.py
@@ -0,0 +1,112 @@
+"""Utility script to verify key authenticated routes respond without errors."""
+from __future__ import annotations
+
+import json
+import os
+import sys
+import urllib.parse
+from http.client import HTTPConnection
+from http.cookies import SimpleCookie
+from typing import Dict, List, Tuple
+
+HOST = "127.0.0.1"
+PORT = 8000
+
+cookies: Dict[str, str] = {}
+
+
+def _update_cookies(headers: List[Tuple[str, str]]) -> None:
+ for name, value in headers:
+ if name.lower() != "set-cookie":
+ continue
+ cookie = SimpleCookie()
+ cookie.load(value)
+ for key, morsel in cookie.items():
+ cookies[key] = morsel.value
+
+
+def _cookie_header() -> str | None:
+ if not cookies:
+ return None
+ return "; ".join(f"{key}={value}" for key, value in cookies.items())
+
+
+def request(method: str, path: str, *, body: bytes | None = None, headers: Dict[str, str] | None = None) -> Tuple[int, Dict[str, str], bytes]:
+ conn = HTTPConnection(HOST, PORT, timeout=10)
+ prepared_headers = {"User-Agent": "route-checker"}
+ if headers:
+ prepared_headers.update(headers)
+ cookie_header = _cookie_header()
+ if cookie_header:
+ prepared_headers["Cookie"] = cookie_header
+
+ conn.request(method, path, body=body, headers=prepared_headers)
+ resp = conn.getresponse()
+ payload = resp.read()
+ status = resp.status
+ reason = resp.reason
+ response_headers = {name: value for name, value in resp.getheaders()}
+ _update_cookies(list(resp.getheaders()))
+ conn.close()
+ print(f"{method} {path} -> {status} {reason}")
+ return status, response_headers, payload
+
+
+def main() -> int:
+ status, _, _ = request("GET", "/login")
+ if status != 200:
+ print("Unexpected status for GET /login", file=sys.stderr)
+ return 1
+
+ admin_username = os.getenv("CALMINER_SEED_ADMIN_USERNAME", "admin")
+ admin_password = os.getenv("CALMINER_SEED_ADMIN_PASSWORD", "M11ffpgm.")
+ login_payload = urllib.parse.urlencode(
+ {"username": admin_username, "password": admin_password}
+ ).encode()
+ status, headers, _ = request(
+ "POST",
+ "/login",
+ body=login_payload,
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ )
+ if status not in {200, 303}:
+ print("Login failed", file=sys.stderr)
+ return 1
+
+ location = headers.get("Location", "/")
+ redirect_path = urllib.parse.urlsplit(location).path or "/"
+ request("GET", redirect_path)
+
+ request("GET", "/")
+ request("GET", "/projects/ui")
+
+ status, headers, body = request(
+ "GET",
+ "/projects",
+ headers={"Accept": "application/json"},
+ )
+ projects: List[dict] = []
+ if headers.get("Content-Type", "").startswith("application/json"):
+ projects = json.loads(body.decode())
+
+ if projects:
+ project_id = projects[0]["id"]
+ request("GET", f"/projects/{project_id}/view")
+ status, headers, body = request(
+ "GET",
+ f"/projects/{project_id}/scenarios",
+ headers={"Accept": "application/json"},
+ )
+ scenarios: List[dict] = []
+ if headers.get("Content-Type", "").startswith("application/json"):
+ scenarios = json.loads(body.decode())
+ if scenarios:
+ scenario_id = scenarios[0]["id"]
+ request("GET", f"/scenarios/{scenario_id}/view")
+
+ print("Cookies:", cookies)
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/scripts/apply_users_sequence_fix.py b/scripts/apply_users_sequence_fix.py
new file mode 100644
index 0000000..4322d91
--- /dev/null
+++ b/scripts/apply_users_sequence_fix.py
@@ -0,0 +1,15 @@
+from sqlalchemy import create_engine, text
+from config.database import DATABASE_URL
+
+engine = create_engine(DATABASE_URL, future=True)
+sqls = [
+ "CREATE SEQUENCE IF NOT EXISTS users_id_seq;",
+ "ALTER TABLE users ALTER COLUMN id SET DEFAULT nextval('users_id_seq');",
+ "SELECT setval('users_id_seq', COALESCE((SELECT MAX(id) FROM users), 1));",
+ "ALTER SEQUENCE users_id_seq OWNED BY users.id;",
+]
+with engine.begin() as conn:
+ for s in sqls:
+ print('EXECUTING:', s)
+ conn.execute(text(s))
+print('SEQUENCE fix applied')
diff --git a/services/__init__.py b/services/__init__.py
index c6e3b26..c452a0a 100644
--- a/services/__init__.py
+++ b/services/__init__.py
@@ -1,10 +1,12 @@
"""Service layer utilities."""
from .pricing import calculate_pricing, PricingInput, PricingMetadata, PricingResult
+from .calculations import calculate_profitability
__all__ = [
"calculate_pricing",
"PricingInput",
"PricingMetadata",
"PricingResult",
+ "calculate_profitability",
]
diff --git a/services/calculations.py b/services/calculations.py
new file mode 100644
index 0000000..410ef5c
--- /dev/null
+++ b/services/calculations.py
@@ -0,0 +1,205 @@
+"""Service functions for financial calculations."""
+
+from __future__ import annotations
+
+from services.currency import CurrencyValidationError, normalise_currency
+from services.exceptions import ProfitabilityValidationError
+from services.financial import (
+ CashFlow,
+ ConvergenceError,
+ PaybackNotReachedError,
+ internal_rate_of_return,
+ net_present_value,
+ payback_period,
+)
+from services.pricing import PricingInput, PricingMetadata, PricingResult, calculate_pricing
+from schemas.calculations import (
+ CashFlowEntry,
+ ProfitabilityCalculationRequest,
+ ProfitabilityCalculationResult,
+ ProfitabilityCosts,
+ ProfitabilityMetrics,
+)
+
+
+def _build_pricing_input(
+ request: ProfitabilityCalculationRequest,
+) -> PricingInput:
+ """Construct a pricing input instance including impurity overrides."""
+
+ impurity_values: dict[str, float] = {}
+ impurity_thresholds: dict[str, float] = {}
+ impurity_penalties: dict[str, float] = {}
+
+ for impurity in request.impurities:
+ code = impurity.name.strip()
+ if not code:
+ continue
+ code = code.upper()
+ if impurity.value is not None:
+ impurity_values[code] = float(impurity.value)
+ if impurity.threshold is not None:
+ impurity_thresholds[code] = float(impurity.threshold)
+ if impurity.penalty is not None:
+ impurity_penalties[code] = float(impurity.penalty)
+
+ pricing_input = PricingInput(
+ metal=request.metal,
+ ore_tonnage=request.ore_tonnage,
+ head_grade_pct=request.head_grade_pct,
+ recovery_pct=request.recovery_pct,
+ payable_pct=request.payable_pct,
+ reference_price=request.reference_price,
+ treatment_charge=request.treatment_charge,
+ smelting_charge=request.smelting_charge,
+ moisture_pct=request.moisture_pct,
+ moisture_threshold_pct=request.moisture_threshold_pct,
+ moisture_penalty_per_pct=request.moisture_penalty_per_pct,
+ impurity_ppm=impurity_values,
+ impurity_thresholds=impurity_thresholds,
+ impurity_penalty_per_ppm=impurity_penalties,
+ premiums=request.premiums,
+ fx_rate=request.fx_rate,
+ currency_code=request.currency_code,
+ )
+
+ return pricing_input
+
+
+def _generate_cash_flows(
+ *,
+ periods: int,
+ net_per_period: float,
+ initial_capex: float,
+) -> tuple[list[CashFlow], list[CashFlowEntry]]:
+ """Create cash flow structures for financial metric calculations."""
+
+ cash_flow_models: list[CashFlow] = [
+ CashFlow(amount=-initial_capex, period_index=0)
+ ]
+ cash_flow_entries: list[CashFlowEntry] = [
+ CashFlowEntry(
+ period=0,
+ revenue=0.0,
+ processing_opex=0.0,
+ sustaining_capex=0.0,
+ net=-initial_capex,
+ )
+ ]
+
+ for period in range(1, periods + 1):
+ cash_flow_models.append(
+ CashFlow(amount=net_per_period, period_index=period))
+ cash_flow_entries.append(
+ CashFlowEntry(
+ period=period,
+ revenue=0.0,
+ processing_opex=0.0,
+ sustaining_capex=0.0,
+ net=net_per_period,
+ )
+ )
+
+ return cash_flow_models, cash_flow_entries
+
+
+def calculate_profitability(
+ request: ProfitabilityCalculationRequest,
+ *,
+ metadata: PricingMetadata,
+) -> ProfitabilityCalculationResult:
+ """Calculate profitability metrics using pricing inputs and cost data."""
+
+ if request.periods <= 0:
+ raise ProfitabilityValidationError(
+ "Evaluation periods must be at least 1.", ["periods"]
+ )
+
+ pricing_input = _build_pricing_input(request)
+ try:
+ pricing_result: PricingResult = calculate_pricing(
+ pricing_input, metadata=metadata
+ )
+ except CurrencyValidationError as exc:
+ raise ProfitabilityValidationError(
+ str(exc), ["currency_code"]) from exc
+
+ periods = request.periods
+ revenue_total = float(pricing_result.net_revenue)
+ revenue_per_period = revenue_total / periods
+
+ processing_total = float(request.processing_opex) * periods
+ sustaining_total = float(request.sustaining_capex) * periods
+ initial_capex = float(request.initial_capex)
+
+ net_per_period = (
+ revenue_per_period
+ - float(request.processing_opex)
+ - float(request.sustaining_capex)
+ )
+
+ cash_flow_models, cash_flow_entries = _generate_cash_flows(
+ periods=periods,
+ net_per_period=net_per_period,
+ initial_capex=initial_capex,
+ )
+
+ # Update per-period entries to include explicit costs for presentation
+ for entry in cash_flow_entries[1:]:
+ entry.revenue = revenue_per_period
+ entry.processing_opex = float(request.processing_opex)
+ entry.sustaining_capex = float(request.sustaining_capex)
+ entry.net = net_per_period
+
+ discount_rate = (request.discount_rate or 0.0) / 100.0
+
+ npv_value = net_present_value(discount_rate, cash_flow_models)
+
+ try:
+ irr_value = internal_rate_of_return(cash_flow_models) * 100.0
+ except (ValueError, ZeroDivisionError, ConvergenceError):
+ irr_value = None
+
+ try:
+ payback_value = payback_period(cash_flow_models)
+ except (ValueError, PaybackNotReachedError):
+ payback_value = None
+
+ total_costs = processing_total + sustaining_total + initial_capex
+ total_net = revenue_total - total_costs
+
+ if revenue_total == 0:
+ margin_value = None
+ else:
+ margin_value = (total_net / revenue_total) * 100.0
+
+ currency = request.currency_code or pricing_result.currency
+ try:
+ currency = normalise_currency(currency)
+ except CurrencyValidationError as exc:
+ raise ProfitabilityValidationError(
+ str(exc), ["currency_code"]) from exc
+
+ costs = ProfitabilityCosts(
+ processing_opex_total=processing_total,
+ sustaining_capex_total=sustaining_total,
+ initial_capex=initial_capex,
+ )
+
+ metrics = ProfitabilityMetrics(
+ npv=npv_value,
+ irr=irr_value,
+ payback_period=payback_value,
+ margin=margin_value,
+ )
+
+ return ProfitabilityCalculationResult(
+ pricing=pricing_result,
+ costs=costs,
+ metrics=metrics,
+ cash_flows=cash_flow_entries,
+ currency=currency,
+ )
+
+
+__all__ = ["calculate_profitability"]
diff --git a/services/exceptions.py b/services/exceptions.py
index 786fd43..f7d592a 100644
--- a/services/exceptions.py
+++ b/services/exceptions.py
@@ -26,3 +26,14 @@ class ScenarioValidationError(Exception):
def __str__(self) -> str: # pragma: no cover - mirrors message for logging
return self.message
+
+
+@dataclass(eq=False)
+class ProfitabilityValidationError(Exception):
+ """Raised when profitability calculation inputs fail domain validation."""
+
+ message: str
+ field_errors: Sequence[str] | None = None
+
+ def __str__(self) -> str: # pragma: no cover - mirrors message for logging
+ return self.message
diff --git a/services/repositories.py b/services/repositories.py
index ef64db7..a53e31a 100644
--- a/services/repositories.py
+++ b/services/repositories.py
@@ -15,8 +15,10 @@ from models import (
PricingImpuritySettings,
PricingMetalSettings,
PricingSettings,
+ ProjectProfitability,
Role,
Scenario,
+ ScenarioProfitability,
ScenarioStatus,
SimulationParameter,
User,
@@ -367,6 +369,106 @@ class ScenarioRepository:
self.session.delete(scenario)
+class ProjectProfitabilityRepository:
+ """Persistence operations for project-level profitability snapshots."""
+
+ def __init__(self, session: Session) -> None:
+ self.session = session
+
+ def create(self, snapshot: ProjectProfitability) -> ProjectProfitability:
+ self.session.add(snapshot)
+ self.session.flush()
+ return snapshot
+
+ def list_for_project(
+ self,
+ project_id: int,
+ *,
+ limit: int | None = None,
+ ) -> Sequence[ProjectProfitability]:
+ stmt = (
+ select(ProjectProfitability)
+ .where(ProjectProfitability.project_id == project_id)
+ .order_by(ProjectProfitability.calculated_at.desc())
+ )
+ if limit is not None:
+ stmt = stmt.limit(limit)
+ return self.session.execute(stmt).scalars().all()
+
+ def latest_for_project(
+ self,
+ project_id: int,
+ ) -> ProjectProfitability | None:
+ stmt = (
+ select(ProjectProfitability)
+ .where(ProjectProfitability.project_id == project_id)
+ .order_by(ProjectProfitability.calculated_at.desc())
+ .limit(1)
+ )
+ return self.session.execute(stmt).scalar_one_or_none()
+
+ def delete(self, snapshot_id: int) -> None:
+ stmt = select(ProjectProfitability).where(
+ ProjectProfitability.id == snapshot_id
+ )
+ entity = self.session.execute(stmt).scalar_one_or_none()
+ if entity is None:
+ raise EntityNotFoundError(
+ f"Project profitability snapshot {snapshot_id} not found"
+ )
+ self.session.delete(entity)
+
+
+class ScenarioProfitabilityRepository:
+ """Persistence operations for scenario-level profitability snapshots."""
+
+ def __init__(self, session: Session) -> None:
+ self.session = session
+
+ def create(self, snapshot: ScenarioProfitability) -> ScenarioProfitability:
+ self.session.add(snapshot)
+ self.session.flush()
+ return snapshot
+
+ def list_for_scenario(
+ self,
+ scenario_id: int,
+ *,
+ limit: int | None = None,
+ ) -> Sequence[ScenarioProfitability]:
+ stmt = (
+ select(ScenarioProfitability)
+ .where(ScenarioProfitability.scenario_id == scenario_id)
+ .order_by(ScenarioProfitability.calculated_at.desc())
+ )
+ if limit is not None:
+ stmt = stmt.limit(limit)
+ return self.session.execute(stmt).scalars().all()
+
+ def latest_for_scenario(
+ self,
+ scenario_id: int,
+ ) -> ScenarioProfitability | None:
+ stmt = (
+ select(ScenarioProfitability)
+ .where(ScenarioProfitability.scenario_id == scenario_id)
+ .order_by(ScenarioProfitability.calculated_at.desc())
+ .limit(1)
+ )
+ return self.session.execute(stmt).scalar_one_or_none()
+
+ def delete(self, snapshot_id: int) -> None:
+ stmt = select(ScenarioProfitability).where(
+ ScenarioProfitability.id == snapshot_id
+ )
+ entity = self.session.execute(stmt).scalar_one_or_none()
+ if entity is None:
+ raise EntityNotFoundError(
+ f"Scenario profitability snapshot {snapshot_id} not found"
+ )
+ self.session.delete(entity)
+
+
class FinancialInputRepository:
"""Persistence operations for FinancialInput entities."""
diff --git a/services/unit_of_work.py b/services/unit_of_work.py
index d8579c7..91e3d80 100644
--- a/services/unit_of_work.py
+++ b/services/unit_of_work.py
@@ -13,8 +13,10 @@ from services.repositories import (
PricingSettingsRepository,
PricingSettingsSeedResult,
ProjectRepository,
+ ProjectProfitabilityRepository,
RoleRepository,
ScenarioRepository,
+ ScenarioProfitabilityRepository,
SimulationParameterRepository,
UserRepository,
ensure_admin_user as ensure_admin_user_record,
@@ -36,6 +38,8 @@ class UnitOfWork(AbstractContextManager["UnitOfWork"]):
self.scenarios: ScenarioRepository | None = None
self.financial_inputs: FinancialInputRepository | None = None
self.simulation_parameters: SimulationParameterRepository | None = None
+ self.project_profitability: ProjectProfitabilityRepository | None = None
+ self.scenario_profitability: ScenarioProfitabilityRepository | None = None
self.users: UserRepository | None = None
self.roles: RoleRepository | None = None
self.pricing_settings: PricingSettingsRepository | None = None
@@ -47,6 +51,11 @@ class UnitOfWork(AbstractContextManager["UnitOfWork"]):
self.financial_inputs = FinancialInputRepository(self.session)
self.simulation_parameters = SimulationParameterRepository(
self.session)
+ self.project_profitability = ProjectProfitabilityRepository(
+ self.session)
+ self.scenario_profitability = ScenarioProfitabilityRepository(
+ self.session
+ )
self.users = UserRepository(self.session)
self.roles = RoleRepository(self.session)
self.pricing_settings = PricingSettingsRepository(self.session)
@@ -65,6 +74,8 @@ class UnitOfWork(AbstractContextManager["UnitOfWork"]):
self.scenarios = None
self.financial_inputs = None
self.simulation_parameters = None
+ self.project_profitability = None
+ self.scenario_profitability = None
self.users = None
self.roles = None
self.pricing_settings = None
diff --git a/templates/scenarios/profitability.html b/templates/scenarios/profitability.html
new file mode 100644
index 0000000..c730496
--- /dev/null
+++ b/templates/scenarios/profitability.html
@@ -0,0 +1,338 @@
+{% extends "base.html" %}
+{% block title %}Profitability Calculator · CalMiner{% endblock %}
+
+{% block content %}
+
+
+
+
+ {% if errors %}
+
+
Submission errors
+
+ {% for message in errors %}
+ - {{ message }}
+ {% endfor %}
+
+
+ {% endif %}
+
+ {% if notices %}
+
+
+ {% for message in notices %}
+ - {{ message }}
+ {% endfor %}
+
+
+ {% endif %}
+
+
+
+
+
+
+
+
+
+
+ {% if result %}
+
+
+ Revenue Summary
+
+ -
+ Payable Metal
+ {{ result.pricing.payable_metal_tonnes | default('—') }}
+
+ -
+ Gross Revenue
+ {{ result.pricing.gross_revenue | currency_display(result.pricing.currency) }}
+
+ -
+ Net Revenue
+ {{ result.pricing.net_revenue | currency_display(result.pricing.currency) }}
+
+
+
+
+
+ Cost Breakdown
+
+ -
+ Processing Opex
+ {{ result.costs.processing_opex_total | currency_display(result.currency) }}
+
+ -
+ Sustaining Capex
+ {{ result.costs.sustaining_capex_total | currency_display(result.currency) }}
+
+ -
+ Initial Capex
+ {{ result.costs.initial_capex | currency_display(result.currency) }}
+
+
+
+
+
+ Key Metrics
+
+ -
+ NPV
+ {{ result.metrics.npv | currency_display(result.currency) }}
+
+ -
+ IRR
+ {{ result.metrics.irr | percentage_display }}
+
+ -
+ Payback
+ {{ result.metrics.payback_period | period_display }}
+
+ -
+ Margin
+ {{ result.metrics.margin | percentage_display }}
+
+
+
+
+
+ {% if result.cash_flows %}
+
+
+
+ | Period |
+ Revenue |
+ Processing Opex |
+ Sustaining Capex |
+ Net Cash Flow |
+
+
+
+ {% for entry in result.cash_flows %}
+
+ | {{ entry.period }} |
+ {{ entry.revenue | currency_display(result.currency) }} |
+ {{ entry.processing_opex | currency_display(result.currency) }} |
+ {{ entry.sustaining_capex | currency_display(result.currency) }} |
+ {{ entry.net | currency_display(result.currency) }} |
+
+ {% endfor %}
+
+
+ {% endif %}
+ {% else %}
+ Run a calculation to see profitability metrics.
+ {% endif %}
+
+
+
+{% endblock %}