- Introduced OpexValidationError for handling validation errors in processing opex calculations. - Implemented ProjectProcessingOpexRepository and ScenarioProcessingOpexRepository for managing project and scenario-level processing opex snapshots. - Enhanced UnitOfWork to include repositories for processing opex. - Updated sidebar navigation and scenario detail templates to include links to the new Processing Opex Planner. - Created a new template for the Processing Opex Planner with form handling for input components and parameters. - Developed integration tests for processing opex calculations, covering HTML and JSON flows, including validation for currency mismatches and unsupported frequencies. - Added unit tests for the calculation logic, ensuring correct handling of various scenarios and edge cases.
110 lines
4.9 KiB
Python
110 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from typing import TYPE_CHECKING
|
|
|
|
from sqlalchemy import JSON, Boolean, DateTime, ForeignKey, Integer, Numeric, String
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
from sqlalchemy.sql import func
|
|
|
|
from config.database import Base
|
|
|
|
if TYPE_CHECKING: # pragma: no cover
|
|
from .project import Project
|
|
from .scenario import Scenario
|
|
from .user import User
|
|
|
|
|
|
class ProjectProcessingOpexSnapshot(Base):
|
|
"""Snapshot of recurring processing opex metrics at the project level."""
|
|
|
|
__tablename__ = "project_processing_opex_snapshots"
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
project_id: Mapped[int] = mapped_column(
|
|
ForeignKey("projects.id", ondelete="CASCADE"), nullable=False, index=True
|
|
)
|
|
created_by_id: Mapped[int | None] = mapped_column(
|
|
ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
|
|
)
|
|
calculation_source: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
|
calculated_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now()
|
|
)
|
|
currency_code: Mapped[str | None] = mapped_column(String(3), nullable=True)
|
|
overall_annual: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
|
|
escalated_total: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
|
|
annual_average: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
|
|
evaluation_horizon_years: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
|
escalation_pct: Mapped[float | None] = mapped_column(Numeric(12, 6), nullable=True)
|
|
apply_escalation: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
|
|
component_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
|
payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now()
|
|
)
|
|
updated_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
|
|
)
|
|
|
|
project: Mapped[Project] = relationship(
|
|
"Project", back_populates="processing_opex_snapshots"
|
|
)
|
|
created_by: Mapped[User | None] = relationship("User")
|
|
|
|
def __repr__(self) -> str: # pragma: no cover
|
|
return (
|
|
"ProjectProcessingOpexSnapshot(id={id!r}, project_id={project_id!r}, overall_annual={overall_annual!r})".format(
|
|
id=self.id,
|
|
project_id=self.project_id,
|
|
overall_annual=self.overall_annual,
|
|
)
|
|
)
|
|
|
|
|
|
class ScenarioProcessingOpexSnapshot(Base):
|
|
"""Snapshot of processing opex metrics for an individual scenario."""
|
|
|
|
__tablename__ = "scenario_processing_opex_snapshots"
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
scenario_id: Mapped[int] = mapped_column(
|
|
ForeignKey("scenarios.id", ondelete="CASCADE"), nullable=False, index=True
|
|
)
|
|
created_by_id: Mapped[int | None] = mapped_column(
|
|
ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
|
|
)
|
|
calculation_source: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
|
calculated_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now()
|
|
)
|
|
currency_code: Mapped[str | None] = mapped_column(String(3), nullable=True)
|
|
overall_annual: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
|
|
escalated_total: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
|
|
annual_average: Mapped[float | None] = mapped_column(Numeric(18, 2), nullable=True)
|
|
evaluation_horizon_years: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
|
escalation_pct: Mapped[float | None] = mapped_column(Numeric(12, 6), nullable=True)
|
|
apply_escalation: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
|
|
component_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
|
payload: Mapped[dict | None] = mapped_column(JSON, nullable=True)
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now()
|
|
)
|
|
updated_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
|
|
)
|
|
|
|
scenario: Mapped[Scenario] = relationship(
|
|
"Scenario", back_populates="processing_opex_snapshots"
|
|
)
|
|
created_by: Mapped[User | None] = relationship("User")
|
|
|
|
def __repr__(self) -> str: # pragma: no cover
|
|
return (
|
|
"ScenarioProcessingOpexSnapshot(id={id!r}, scenario_id={scenario_id!r}, overall_annual={overall_annual!r})".format(
|
|
id=self.id,
|
|
scenario_id=self.scenario_id,
|
|
overall_annual=self.overall_annual,
|
|
)
|
|
)
|