feat: Add profitability calculation schemas and service functions

- Introduced Pydantic schemas for profitability calculations in `schemas/calculations.py`.
- Implemented service functions for profitability calculations in `services/calculations.py`.
- Added new exception class `ProfitabilityValidationError` for handling validation errors.
- Created repositories for managing project and scenario profitability snapshots.
- Developed a utility script for verifying authenticated routes.
- Added a new HTML template for the profitability calculator interface.
- Implemented a script to fix user ID sequence in the database.
This commit is contained in:
2025-11-12 22:22:29 +01:00
parent 6d496a599e
commit b1a6df9f90
15 changed files with 1654 additions and 0 deletions

View File

@@ -15,8 +15,10 @@ from models import (
PricingImpuritySettings,
PricingMetalSettings,
PricingSettings,
ProjectProfitability,
Role,
Scenario,
ScenarioProfitability,
ScenarioStatus,
SimulationParameter,
User,
@@ -367,6 +369,106 @@ class ScenarioRepository:
self.session.delete(scenario)
class ProjectProfitabilityRepository:
"""Persistence operations for project-level profitability snapshots."""
def __init__(self, session: Session) -> None:
self.session = session
def create(self, snapshot: ProjectProfitability) -> ProjectProfitability:
self.session.add(snapshot)
self.session.flush()
return snapshot
def list_for_project(
self,
project_id: int,
*,
limit: int | None = None,
) -> Sequence[ProjectProfitability]:
stmt = (
select(ProjectProfitability)
.where(ProjectProfitability.project_id == project_id)
.order_by(ProjectProfitability.calculated_at.desc())
)
if limit is not None:
stmt = stmt.limit(limit)
return self.session.execute(stmt).scalars().all()
def latest_for_project(
self,
project_id: int,
) -> ProjectProfitability | None:
stmt = (
select(ProjectProfitability)
.where(ProjectProfitability.project_id == project_id)
.order_by(ProjectProfitability.calculated_at.desc())
.limit(1)
)
return self.session.execute(stmt).scalar_one_or_none()
def delete(self, snapshot_id: int) -> None:
stmt = select(ProjectProfitability).where(
ProjectProfitability.id == snapshot_id
)
entity = self.session.execute(stmt).scalar_one_or_none()
if entity is None:
raise EntityNotFoundError(
f"Project profitability snapshot {snapshot_id} not found"
)
self.session.delete(entity)
class ScenarioProfitabilityRepository:
"""Persistence operations for scenario-level profitability snapshots."""
def __init__(self, session: Session) -> None:
self.session = session
def create(self, snapshot: ScenarioProfitability) -> ScenarioProfitability:
self.session.add(snapshot)
self.session.flush()
return snapshot
def list_for_scenario(
self,
scenario_id: int,
*,
limit: int | None = None,
) -> Sequence[ScenarioProfitability]:
stmt = (
select(ScenarioProfitability)
.where(ScenarioProfitability.scenario_id == scenario_id)
.order_by(ScenarioProfitability.calculated_at.desc())
)
if limit is not None:
stmt = stmt.limit(limit)
return self.session.execute(stmt).scalars().all()
def latest_for_scenario(
self,
scenario_id: int,
) -> ScenarioProfitability | None:
stmt = (
select(ScenarioProfitability)
.where(ScenarioProfitability.scenario_id == scenario_id)
.order_by(ScenarioProfitability.calculated_at.desc())
.limit(1)
)
return self.session.execute(stmt).scalar_one_or_none()
def delete(self, snapshot_id: int) -> None:
stmt = select(ScenarioProfitability).where(
ScenarioProfitability.id == snapshot_id
)
entity = self.session.execute(stmt).scalar_one_or_none()
if entity is None:
raise EntityNotFoundError(
f"Scenario profitability snapshot {snapshot_id} not found"
)
self.session.delete(entity)
class FinancialInputRepository:
"""Persistence operations for FinancialInput entities."""