Files
calminer/models/processing_opex_snapshot.py
zwitschi 1feae7ff85 feat: Add Processing Opex functionality
- Introduced OpexValidationError for handling validation errors in processing opex calculations.
- Implemented ProjectProcessingOpexRepository and ScenarioProcessingOpexRepository for managing project and scenario-level processing opex snapshots.
- Enhanced UnitOfWork to include repositories for processing opex.
- Updated sidebar navigation and scenario detail templates to include links to the new Processing Opex Planner.
- Created a new template for the Processing Opex Planner with form handling for input components and parameters.
- Developed integration tests for processing opex calculations, covering HTML and JSON flows, including validation for currency mismatches and unsupported frequencies.
- Added unit tests for the calculation logic, ensuring correct handling of various scenarios and edge cases.
2025-11-13 09:26:57 +01:00

110 lines
4.9 KiB
Python

from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import JSON, Boolean, DateTime, ForeignKey, Integer, Numeric, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from config.database import Base
if TYPE_CHECKING: # pragma: no cover
from .project import Project
from .scenario import Scenario
from .user import User
class ProjectProcessingOpexSnapshot(Base):
"""Snapshot of recurring processing opex metrics at the project level."""
__tablename__ = "project_processing_opex_snapshots"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
project_id: Mapped[int] = mapped_column(
ForeignKey("projects.id", ondelete="CASCADE"), nullable=False, index=True
)
created_by_id: Mapped[int | None] = mapped_column(
ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
)
calculation_source: Mapped[str | None] = mapped_column(String(64), nullable=True)
calculated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
currency_code: Mapped[str | None] = mapped_column(String(3), nullable=True)
overall_annual: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
escalated_total: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
annual_average: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
evaluation_horizon_years: Mapped[int | None] = mapped_column(Integer, nullable=True)
escalation_pct: Mapped[float | None] = mapped_column(Numeric(12, 6), nullable=True)
apply_escalation: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
component_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
project: Mapped[Project] = relationship(
"Project", back_populates="processing_opex_snapshots"
)
created_by: Mapped[User | None] = relationship("User")
def __repr__(self) -> str: # pragma: no cover
return (
"ProjectProcessingOpexSnapshot(id={id!r}, project_id={project_id!r}, overall_annual={overall_annual!r})".format(
id=self.id,
project_id=self.project_id,
overall_annual=self.overall_annual,
)
)
class ScenarioProcessingOpexSnapshot(Base):
"""Snapshot of processing opex metrics for an individual scenario."""
__tablename__ = "scenario_processing_opex_snapshots"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
scenario_id: Mapped[int] = mapped_column(
ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False, index=True
)
created_by_id: Mapped[int | None] = mapped_column(
ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
)
calculation_source: Mapped[str | None] = mapped_column(String(64), nullable=True)
calculated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
currency_code: Mapped[str | None] = mapped_column(String(3), nullable=True)
overall_annual: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
escalated_total: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
annual_average: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
evaluation_horizon_years: Mapped[int | None] = mapped_column(Integer, nullable=True)
escalation_pct: Mapped[float | None] = mapped_column(Numeric(12, 6), nullable=True)
apply_escalation: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
component_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
)
scenario: Mapped[Scenario] = relationship(
"Scenario", back_populates="processing_opex_snapshots"
)
created_by: Mapped[User | None] = relationship("User")
def __repr__(self) -> str: # pragma: no cover
return (
"ScenarioProcessingOpexSnapshot(id={id!r}, scenario_id={scenario_id!r}, overall_annual={overall_annual!r})".format(
id=self.id,
scenario_id=self.scenario_id,
overall_annual=self.overall_annual,
)
)