feat: enhance dashboard with new metrics, project and scenario utilities, and comprehensive tests

This commit is contained in:
2025-11-09 19:02:36 +01:00
parent 053da332ac
commit 7f5ed6a42d
6 changed files with 302 additions and 64 deletions

View File

@@ -10,3 +10,5 @@
- Connected project and scenario routers to new Jinja2 list/detail/edit views with HTML forms and redirects.
- Implemented FR-009 client-side enhancements with responsive navigation toggle, mobile-first scenario tables, and shared asset loading across templates.
- Added scenario comparison validator, FastAPI comparison endpoint, and comprehensive unit tests to enforce FR-009 validation rules through API errors.
- Delivered a new dashboard experience with `templates/dashboard.html`, dedicated styling, and a FastAPI route supplying real project/scenario metrics via repository helpers.
- Extended repositories with count/recency utilities and added pytest coverage, including a dashboard rendering smoke test validating empty-state messaging.

View File

@@ -1,83 +1,102 @@
from __future__ import annotations
from datetime import datetime, timedelta
from types import SimpleNamespace
from datetime import datetime
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from dependencies import get_unit_of_work
from models import MiningOperationType
from models import ScenarioStatus
from services.unit_of_work import UnitOfWork
router = APIRouter(tags=["Dashboard"])
templates = Jinja2Templates(directory="templates")
def _load_metrics(_: UnitOfWork) -> dict[str, object]:
today = datetime.utcnow()
def _format_timestamp(moment: datetime | None) -> str | None:
if moment is None:
return None
return moment.strftime("%Y-%m-%d")
def _format_timestamp_with_time(moment: datetime | None) -> str | None:
if moment is None:
return None
return moment.strftime("%Y-%m-%d %H:%M")
def _load_metrics(uow: UnitOfWork) -> dict[str, object]:
total_projects = uow.projects.count()
active_scenarios = uow.scenarios.count_by_status(ScenarioStatus.ACTIVE)
pending_simulations = uow.scenarios.count_by_status(ScenarioStatus.DRAFT)
last_import_at = uow.financial_inputs.latest_created_at()
return {
"total_projects": 12,
"active_scenarios": 7,
"pending_simulations": 3,
"last_import": today.strftime("%Y-%m-%d"),
"total_projects": total_projects,
"active_scenarios": active_scenarios,
"pending_simulations": pending_simulations,
"last_import": _format_timestamp(last_import_at),
}
def _load_recent_projects(_: UnitOfWork) -> list[SimpleNamespace]:
now = datetime.utcnow()
return [
SimpleNamespace(
id=1,
name="Copper Ridge Expansion",
operation_type=MiningOperationType.OPEN_PIT,
updated_at=now - timedelta(days=2),
),
SimpleNamespace(
id=2,
name="Lithium Basin North",
operation_type=MiningOperationType.UNDERGROUND,
updated_at=now - timedelta(days=5),
),
SimpleNamespace(
id=3,
name="Nickel Underground Phase II",
operation_type=MiningOperationType.IN_SITU_LEACH,
updated_at=now - timedelta(days=9),
),
]
def _load_recent_projects(uow: UnitOfWork) -> list:
return list(uow.projects.recent(limit=5))
def _load_simulation_updates(_: UnitOfWork) -> list[SimpleNamespace]:
now = datetime.utcnow()
return [
SimpleNamespace(
title="Monte Carlo Batch #21 completed",
description="1,000 runs processed for Lithium Basin North.",
timestamp=now - timedelta(hours=4),
),
SimpleNamespace(
title="Scenario validation queued",
description="Copper Ridge Expansion pending validation on new cost inputs.",
timestamp=now - timedelta(days=1, hours=3),
),
]
def _load_simulation_updates(uow: UnitOfWork) -> list[dict[str, object]]:
updates: list[dict[str, object]] = []
scenarios = uow.scenarios.recent(limit=5, with_project=True)
for scenario in scenarios:
project_name = scenario.project.name if scenario.project else f"Project #{scenario.project_id}"
timestamp_label = _format_timestamp_with_time(scenario.updated_at)
updates.append(
{
"title": f"{scenario.name} · {scenario.status.value.title()}",
"description": f"Latest update recorded for {project_name}.",
"timestamp": scenario.updated_at,
"timestamp_label": timestamp_label,
}
)
return updates
def _load_scenario_alerts(_: UnitOfWork) -> list[SimpleNamespace]:
return [
SimpleNamespace(
title="Variance exceeds threshold",
message="Nickel Underground Phase II deviates 18% from baseline forecast.",
link="/projects/3/view",
def _load_scenario_alerts(
request: Request, uow: UnitOfWork
) -> list[dict[str, object]]:
alerts: list[dict[str, object]] = []
drafts = uow.scenarios.list_by_status(
ScenarioStatus.DRAFT, limit=3, with_project=True
)
for scenario in drafts:
project_name = scenario.project.name if scenario.project else f"Project #{scenario.project_id}"
alerts.append(
{
"title": f"Draft scenario: {scenario.name}",
"message": f"{project_name} has a scenario awaiting validation.",
"link": request.url_for(
"projects.view_project", project_id=scenario.project_id
),
SimpleNamespace(
title="Simulation backlog",
message="Lithium Basin North has 2 pending simulation batches.",
link="/projects/2/view",
}
)
if not alerts:
archived = uow.scenarios.list_by_status(
ScenarioStatus.ARCHIVED, limit=3, with_project=True
)
for scenario in archived:
project_name = scenario.project.name if scenario.project else f"Project #{scenario.project_id}"
alerts.append(
{
"title": f"Archived scenario: {scenario.name}",
"message": f"Review archived scenario insights for {project_name}.",
"link": request.url_for(
"scenarios.view_scenario", scenario_id=scenario.id
),
]
}
)
return alerts
@router.get("/", response_class=HTMLResponse, include_in_schema=False, name="dashboard.home")
@@ -90,6 +109,6 @@ def dashboard_home(
"metrics": _load_metrics(uow),
"recent_projects": _load_recent_projects(uow),
"simulation_updates": _load_simulation_updates(uow),
"scenario_alerts": _load_scenario_alerts(uow),
"scenario_alerts": _load_scenario_alerts(request, uow),
}
return templates.TemplateResponse("dashboard.html", context)

View File

@@ -1,13 +1,14 @@
from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime
from typing import Sequence
from sqlalchemy import select
from sqlalchemy import select, func
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload, selectinload
from models import FinancialInput, Project, Scenario, SimulationParameter
from models import FinancialInput, Project, Scenario, ScenarioStatus, SimulationParameter
from services.exceptions import EntityConflictError, EntityNotFoundError
@@ -23,6 +24,18 @@ class ProjectRepository:
stmt = stmt.options(selectinload(Project.scenarios))
return self.session.execute(stmt).scalars().all()
def count(self) -> int:
stmt = select(func.count(Project.id))
return self.session.execute(stmt).scalar_one()
def recent(self, limit: int = 5) -> Sequence[Project]:
stmt = (
select(Project)
.order_by(Project.updated_at.desc())
.limit(limit)
)
return self.session.execute(stmt).scalars().all()
def get(self, project_id: int, *, with_children: bool = False) -> Project:
stmt = select(Project).where(Project.id == project_id)
if with_children:
@@ -60,6 +73,39 @@ class ScenarioRepository:
)
return self.session.execute(stmt).scalars().all()
def count(self) -> int:
stmt = select(func.count(Scenario.id))
return self.session.execute(stmt).scalar_one()
def count_by_status(self, status: ScenarioStatus) -> int:
stmt = select(func.count(Scenario.id)).where(Scenario.status == status)
return self.session.execute(stmt).scalar_one()
def recent(self, limit: int = 5, *, with_project: bool = False) -> Sequence[Scenario]:
stmt = select(Scenario).order_by(
Scenario.updated_at.desc()).limit(limit)
if with_project:
stmt = stmt.options(joinedload(Scenario.project))
return self.session.execute(stmt).scalars().all()
def list_by_status(
self,
status: ScenarioStatus,
*,
limit: int | None = None,
with_project: bool = False,
) -> Sequence[Scenario]:
stmt = (
select(Scenario)
.where(Scenario.status == status)
.order_by(Scenario.updated_at.desc())
)
if with_project:
stmt = stmt.options(joinedload(Scenario.project))
if limit is not None:
stmt = stmt.limit(limit)
return self.session.execute(stmt).scalars().all()
def get(self, scenario_id: int, *, with_children: bool = False) -> Scenario:
stmt = select(Scenario).where(Scenario.id == scenario_id)
if with_children:
@@ -119,6 +165,14 @@ class FinancialInputRepository:
raise EntityNotFoundError(f"Financial input {input_id} not found")
self.session.delete(entity)
def latest_created_at(self) -> datetime | None:
stmt = (
select(FinancialInput.created_at)
.order_by(FinancialInput.created_at.desc())
.limit(1)
)
return self.session.execute(stmt).scalar_one_or_none()
class SimulationParameterRepository:
"""Persistence operations for SimulationParameter entities."""

View File

@@ -63,7 +63,7 @@
<a class="table-link" href="{{ url_for('projects.view_project', project_id=project.id) }}">{{ project.name }}</a>
</td>
<td>{{ project.operation_type.value.replace('_', ' ') | title }}</td>
<td>{{ project.updated_at.strftime('%Y-%m-%d') }}</td>
<td>{{ project.updated_at.strftime('%Y-%m-%d') if project.updated_at else '—' }}</td>
</tr>
{% endfor %}
</tbody>
@@ -81,7 +81,7 @@
<ul class="timeline">
{% for update in simulation_updates %}
<li>
<span class="timeline-label">{{ update.timestamp.strftime('%Y-%m-%d %H:%M') }}</span>
<span class="timeline-label">{{ update.timestamp_label or '—' }}</span>
<div>
<strong>{{ update.title }}</strong>
<p>{{ update.description }}</p>
@@ -106,7 +106,9 @@
<li>
<strong>{{ alert.title }}</strong>
<p>{{ alert.message }}</p>
{% if alert.link %}
<a class="btn btn-link" href="{{ alert.link }}">Review</a>
{% endif %}
</li>
{% endfor %}
</ul>

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
from collections.abc import Iterator
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from config.database import Base
from dependencies import get_unit_of_work
from routes.dashboard import router as dashboard_router
from routes.projects import router as projects_router
from routes.scenarios import router as scenarios_router
from services.unit_of_work import UnitOfWork
@pytest.fixture()
def engine() -> Iterator[Engine]:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
future=True,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
try:
yield engine
finally:
Base.metadata.drop_all(bind=engine)
engine.dispose()
@pytest.fixture()
def session_factory(engine: Engine) -> Iterator[sessionmaker]:
testing_session = sessionmaker(
bind=engine, expire_on_commit=False, future=True)
yield testing_session
@pytest.fixture()
def client(session_factory: sessionmaker) -> Iterator[TestClient]:
app = FastAPI()
app.include_router(dashboard_router)
app.include_router(projects_router)
app.include_router(scenarios_router)
def _override_uow() -> Iterator[UnitOfWork]:
with UnitOfWork(session_factory=session_factory) as uow:
yield uow
app.dependency_overrides[get_unit_of_work] = _override_uow
test_client = TestClient(app)
try:
yield test_client
finally:
test_client.close()
class TestDashboardRoute:
def test_renders_empty_state(self, client: TestClient) -> None:
response = client.get("/")
assert response.status_code == 200
html = response.text
assert "No recent projects" in html
assert "No simulation runs yet" in html
assert "All scenarios look good" in html
assert "" in html # Last data import placeholder

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from collections.abc import Iterator
from datetime import datetime, timezone
import pytest
from sqlalchemy import create_engine
@@ -154,3 +155,90 @@ def test_unit_of_work_commit_and_rollback(engine) -> None:
with TestingSession() as session:
projects = ProjectRepository(session).list()
assert len(projects) == 1
def test_project_repository_count_and_recent(session: Session) -> None:
repo = ProjectRepository(session)
project_alpha = Project(name="Alpha", operation_type=MiningOperationType.OPEN_PIT)
project_bravo = Project(name="Bravo", operation_type=MiningOperationType.UNDERGROUND)
repo.create(project_alpha)
repo.create(project_bravo)
project_alpha.updated_at = datetime(2025, 1, 1, tzinfo=timezone.utc)
project_bravo.updated_at = datetime(2025, 1, 2, tzinfo=timezone.utc)
session.flush()
assert repo.count() == 2
recent = repo.recent(limit=1)
assert len(recent) == 1
assert recent[0].name == "Bravo"
def test_scenario_repository_counts_and_filters(session: Session) -> None:
project = Project(name="Project", operation_type=MiningOperationType.PLACER)
session.add(project)
session.flush()
repo = ScenarioRepository(session)
draft = Scenario(name="Draft", project_id=project.id,
status=ScenarioStatus.DRAFT)
active = Scenario(name="Active", project_id=project.id,
status=ScenarioStatus.ACTIVE)
repo.create(draft)
repo.create(active)
draft.updated_at = datetime(2025, 1, 1, tzinfo=timezone.utc)
active.updated_at = datetime(2025, 1, 3, tzinfo=timezone.utc)
session.flush()
assert repo.count() == 2
assert repo.count_by_status(ScenarioStatus.ACTIVE) == 1
recent = repo.recent(limit=1, with_project=True)
assert len(recent) == 1
assert recent[0].name == "Active"
assert recent[0].project.name == "Project"
drafts = repo.list_by_status(ScenarioStatus.DRAFT, limit=2, with_project=True)
assert len(drafts) == 1
assert drafts[0].name == "Draft"
assert drafts[0].project_id == project.id
def test_financial_input_repository_latest_created_at(session: Session) -> None:
project = Project(name="Project FI", operation_type=MiningOperationType.OTHER)
scenario = Scenario(name="Scenario FI", project=project)
session.add(project)
session.flush()
repo = FinancialInputRepository(session)
old_timestamp = datetime(2024, 12, 31, 15, 0)
new_timestamp = datetime(2025, 1, 2, 8, 30)
repo.bulk_upsert(
[
FinancialInput(
scenario_id=scenario.id,
name="Legacy Entry",
category=FinancialCategory.OPERATING_EXPENDITURE,
amount=1000,
currency="usd",
created_at=old_timestamp,
updated_at=old_timestamp,
),
FinancialInput(
scenario_id=scenario.id,
name="New Entry",
category=FinancialCategory.OPERATING_EXPENDITURE,
amount=2000,
currency="usd",
created_at=new_timestamp,
updated_at=new_timestamp,
),
]
)
latest = repo.latest_created_at()
assert latest == new_timestamp