feat: Implement initial capex calculation feature

- Added CapexComponentInput, CapexParameters, CapexCalculationRequest, CapexCalculationResult, and related schemas for capex calculations.
- Introduced calculate_initial_capex function to aggregate capex components and compute totals and timelines.
- Created ProjectCapexRepository and ScenarioCapexRepository for managing capex snapshots in the database.
- Developed capex.html template for capturing and displaying initial capex data.
- Registered common Jinja2 filters for formatting currency and percentages.
- Implemented unit and integration tests for capex calculation functionality.
- Updated unit of work to include new repositories for capex management.
This commit is contained in:
2025-11-12 23:51:52 +01:00
parent 6c1570a254
commit d9fd82b2e3
16 changed files with 1566 additions and 93 deletions

View File

@@ -15,9 +15,11 @@ from models import (
PricingImpuritySettings,
PricingMetalSettings,
PricingSettings,
ProjectCapexSnapshot,
ProjectProfitability,
Role,
Scenario,
ScenarioCapexSnapshot,
ScenarioProfitability,
ScenarioStatus,
SimulationParameter,
@@ -469,6 +471,106 @@ class ScenarioProfitabilityRepository:
self.session.delete(entity)
class ProjectCapexRepository:
"""Persistence operations for project-level capex snapshots."""
def __init__(self, session: Session) -> None:
self.session = session
def create(self, snapshot: ProjectCapexSnapshot) -> ProjectCapexSnapshot:
self.session.add(snapshot)
self.session.flush()
return snapshot
def list_for_project(
self,
project_id: int,
*,
limit: int | None = None,
) -> Sequence[ProjectCapexSnapshot]:
stmt = (
select(ProjectCapexSnapshot)
.where(ProjectCapexSnapshot.project_id == project_id)
.order_by(ProjectCapexSnapshot.calculated_at.desc())
)
if limit is not None:
stmt = stmt.limit(limit)
return self.session.execute(stmt).scalars().all()
def latest_for_project(
self,
project_id: int,
) -> ProjectCapexSnapshot | None:
stmt = (
select(ProjectCapexSnapshot)
.where(ProjectCapexSnapshot.project_id == project_id)
.order_by(ProjectCapexSnapshot.calculated_at.desc())
.limit(1)
)
return self.session.execute(stmt).scalar_one_or_none()
def delete(self, snapshot_id: int) -> None:
stmt = select(ProjectCapexSnapshot).where(
ProjectCapexSnapshot.id == snapshot_id
)
entity = self.session.execute(stmt).scalar_one_or_none()
if entity is None:
raise EntityNotFoundError(
f"Project capex snapshot {snapshot_id} not found"
)
self.session.delete(entity)
class ScenarioCapexRepository:
"""Persistence operations for scenario-level capex snapshots."""
def __init__(self, session: Session) -> None:
self.session = session
def create(self, snapshot: ScenarioCapexSnapshot) -> ScenarioCapexSnapshot:
self.session.add(snapshot)
self.session.flush()
return snapshot
def list_for_scenario(
self,
scenario_id: int,
*,
limit: int | None = None,
) -> Sequence[ScenarioCapexSnapshot]:
stmt = (
select(ScenarioCapexSnapshot)
.where(ScenarioCapexSnapshot.scenario_id == scenario_id)
.order_by(ScenarioCapexSnapshot.calculated_at.desc())
)
if limit is not None:
stmt = stmt.limit(limit)
return self.session.execute(stmt).scalars().all()
def latest_for_scenario(
self,
scenario_id: int,
) -> ScenarioCapexSnapshot | None:
stmt = (
select(ScenarioCapexSnapshot)
.where(ScenarioCapexSnapshot.scenario_id == scenario_id)
.order_by(ScenarioCapexSnapshot.calculated_at.desc())
.limit(1)
)
return self.session.execute(stmt).scalar_one_or_none()
def delete(self, snapshot_id: int) -> None:
stmt = select(ScenarioCapexSnapshot).where(
ScenarioCapexSnapshot.id == snapshot_id
)
entity = self.session.execute(stmt).scalar_one_or_none()
if entity is None:
raise EntityNotFoundError(
f"Scenario capex snapshot {snapshot_id} not found"
)
self.session.delete(entity)
class FinancialInputRepository:
"""Persistence operations for FinancialInput entities."""