diff --git a/changelog.md b/changelog.md index 1cbf0cb..f30041c 100644 --- a/changelog.md +++ b/changelog.md @@ -33,3 +33,4 @@ - Documented the project/scenario import/export field mapping and file format guidelines in `calminer-docs/requirements/FR-008.md`, and introduced `schemas/imports.py` with Pydantic models that normalise incoming CSV/Excel rows for projects and scenarios. - Added `services/importers.py` to load CSV/XLSX files into the new import schemas, pulled in `openpyxl` for Excel support, and covered the parsing behaviour with `tests/test_import_parsing.py`. - Expanded the import ingestion workflow with staging previews, transactional persistence commits, FastAPI preview/commit endpoints under `/imports`, and new API tests (`tests/test_import_ingestion.py`, `tests/test_import_api.py`) ensuring end-to-end coverage. +- Added persistent audit logging via `ImportExportLog`, structured log emission, Prometheus metrics instrumentation, `/metrics` endpoint exposure, and updated operator/deployment documentation to guide monitoring setup. diff --git a/main.py b/main.py index 6aab622..ef58b7f 100644 --- a/main.py +++ b/main.py @@ -20,6 +20,7 @@ from routes.imports import router as imports_router from routes.exports import router as exports_router from routes.projects import router as projects_router from routes.scenarios import router as scenarios_router +from monitoring import router as monitoring_router from services.bootstrap import bootstrap_admin # Initialize database schema (imports above ensure models are registered) @@ -67,5 +68,6 @@ app.include_router(imports_router) app.include_router(exports_router) app.include_router(projects_router) app.include_router(scenarios_router) +app.include_router(monitoring_router) app.mount("/static", StaticFiles(directory="static"), name="static") diff --git a/models/import_export_log.py b/models/import_export_log.py new file mode 100644 index 0000000..4a5592a --- /dev/null +++ b/models/import_export_log.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text +from sqlalchemy.sql import func + +from config.database import Base + + +class ImportExportLog(Base): + """Audit log for import and export operations.""" + + __tablename__ = "import_export_logs" + + id = Column(Integer, primary_key=True, index=True) + action = Column(String(32), nullable=False) # preview, commit, export + dataset = Column(String(32), nullable=False) # projects, scenarios, etc. + status = Column(String(16), nullable=False) # success, failure + filename = Column(String(255), nullable=True) + row_count = Column(Integer, nullable=True) + detail = Column(Text, nullable=True) + user_id = Column(Integer, ForeignKey("users.id"), nullable=True) + created_at = Column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + + def __repr__(self) -> str: # pragma: no cover + return ( + f"ImportExportLog(id={self.id}, action={self.action}, " + f"dataset={self.dataset}, status={self.status})" + ) diff --git a/monitoring/__init__.py b/monitoring/__init__.py new file mode 100644 index 0000000..64edf71 --- /dev/null +++ b/monitoring/__init__.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from fastapi import APIRouter, Response +from prometheus_client import CONTENT_TYPE_LATEST, generate_latest + + +router = APIRouter(prefix="/metrics", tags=["monitoring"]) + + +@router.get("", summary="Prometheus metrics endpoint", include_in_schema=False) +async def metrics_endpoint() -> Response: + payload = generate_latest() + return Response(content=payload, media_type=CONTENT_TYPE_LATEST) diff --git a/monitoring/metrics.py b/monitoring/metrics.py new file mode 100644 index 0000000..a38787b --- /dev/null +++ b/monitoring/metrics.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from typing import Iterable + +from prometheus_client import Counter, Histogram + +IMPORT_DURATION = Histogram( + "calminer_import_duration_seconds", + "Duration of import preview and commit operations", + labelnames=("dataset", "action", "status"), +) + +IMPORT_TOTAL = Counter( + "calminer_import_total", + "Count of import operations", + labelnames=("dataset", "action", "status"), +) + +EXPORT_DURATION = Histogram( + "calminer_export_duration_seconds", + "Duration of export operations", + labelnames=("dataset", "status", "format"), +) + +EXPORT_TOTAL = Counter( + "calminer_export_total", + "Count of export operations", + labelnames=("dataset", "status", "format"), +) + + +def observe_import(action: str, dataset: str, status: str, seconds: float) -> None: + IMPORT_TOTAL.labels(dataset=dataset, action=action, status=status).inc() + IMPORT_DURATION.labels(dataset=dataset, action=action, + status=status).observe(seconds) + + +def observe_export(dataset: str, status: str, export_format: str, seconds: float) -> None: + EXPORT_TOTAL.labels(dataset=dataset, status=status, + format=export_format).inc() + EXPORT_DURATION.labels(dataset=dataset, status=status, + format=export_format).observe(seconds) diff --git a/requirements.txt b/requirements.txt index 7658aa7..fe0b091 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,5 @@ passlib argon2-cffi python-jose python-multipart -openpyxl \ No newline at end of file +openpyxl +prometheus-client \ No newline at end of file diff --git a/routes/exports.py b/routes/exports.py index e896d37..d55c8a1 100644 --- a/routes/exports.py +++ b/routes/exports.py @@ -1,5 +1,7 @@ from __future__ import annotations +import logging +import time from datetime import datetime, timezone from typing import Annotated @@ -20,6 +22,10 @@ from services.export_serializers import ( stream_scenarios_to_csv, ) from services.unit_of_work import UnitOfWork +from models.import_export_log import ImportExportLog +from monitoring.metrics import observe_export + +logger = logging.getLogger(__name__) router = APIRouter(prefix="/exports", tags=["exports"]) @@ -65,6 +71,43 @@ def _ensure_repository(repo, name: str): return repo +def _record_export_audit( + *, + uow: UnitOfWork, + dataset: str, + status: str, + export_format: ExportFormat, + row_count: int, + filename: str | None, +) -> None: + try: + if uow.session is None: + return + log = ImportExportLog( + action="export", + dataset=dataset, + status=status, + filename=filename, + row_count=row_count, + detail=f"format={export_format.value}", + ) + uow.session.add(log) + uow.commit() + except Exception: + # best-effort auditing, do not break exports + if uow.session is not None: + uow.session.rollback() + logger.exception( + "export.audit.failed", + extra={ + "event": "export.audit", + "dataset": dataset, + "status": status, + "format": export_format.value, + }, + ) + + @router.post( "/projects", status_code=status.HTTP_200_OK, @@ -78,17 +121,89 @@ async def export_projects( ) -> Response: project_repo = _ensure_repository( getattr(uow, "projects", None), "Project") - projects = project_repo.filtered_for_export(request.filters) + try: + start = time.perf_counter() + projects = project_repo.filtered_for_export(request.filters) + except Exception as exc: + _record_export_audit( + uow=uow, + dataset="projects", + status="failure", + export_format=request.format, + row_count=0, + filename=None, + ) + logger.exception( + "export.failed", + extra={ + "event": "export", + "dataset": "projects", + "status": "failure", + "format": request.format.value, + }, + ) + raise exc filename = f"projects-{_timestamp_suffix()}" + start = time.perf_counter() if request.format == ExportFormat.CSV: stream = stream_projects_to_csv(projects) response = StreamingResponse(stream, media_type="text/csv") response.headers["Content-Disposition"] = f"attachment; filename={filename}.csv" + _record_export_audit( + uow=uow, + dataset="projects", + status="success", + export_format=request.format, + row_count=len(projects), + filename=f"{filename}.csv", + ) + logger.info( + "export", + extra={ + "event": "export", + "dataset": "projects", + "status": "success", + "format": request.format.value, + "row_count": len(projects), + "filename": f"{filename}.csv", + }, + ) + observe_export( + dataset="projects", + status="success", + export_format=request.format.value, + seconds=time.perf_counter() - start, + ) return response data = export_projects_to_excel(projects) + _record_export_audit( + uow=uow, + dataset="projects", + status="success", + export_format=request.format, + row_count=len(projects), + filename=f"{filename}.xlsx", + ) + logger.info( + "export", + extra={ + "event": "export", + "dataset": "projects", + "status": "success", + "format": request.format.value, + "row_count": len(projects), + "filename": f"{filename}.xlsx", + }, + ) + observe_export( + dataset="projects", + status="success", + export_format=request.format.value, + seconds=time.perf_counter() - start, + ) return StreamingResponse( iter([data]), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", @@ -111,18 +226,90 @@ async def export_scenarios( ) -> Response: scenario_repo = _ensure_repository( getattr(uow, "scenarios", None), "Scenario") - scenarios = scenario_repo.filtered_for_export( - request.filters, include_project=True) + try: + start = time.perf_counter() + scenarios = scenario_repo.filtered_for_export( + request.filters, include_project=True) + except Exception as exc: + _record_export_audit( + uow=uow, + dataset="scenarios", + status="failure", + export_format=request.format, + row_count=0, + filename=None, + ) + logger.exception( + "export.failed", + extra={ + "event": "export", + "dataset": "scenarios", + "status": "failure", + "format": request.format.value, + }, + ) + raise exc filename = f"scenarios-{_timestamp_suffix()}" + start = time.perf_counter() if request.format == ExportFormat.CSV: stream = stream_scenarios_to_csv(scenarios) response = StreamingResponse(stream, media_type="text/csv") response.headers["Content-Disposition"] = f"attachment; filename={filename}.csv" + _record_export_audit( + uow=uow, + dataset="scenarios", + status="success", + export_format=request.format, + row_count=len(scenarios), + filename=f"{filename}.csv", + ) + logger.info( + "export", + extra={ + "event": "export", + "dataset": "scenarios", + "status": "success", + "format": request.format.value, + "row_count": len(scenarios), + "filename": f"{filename}.csv", + }, + ) + observe_export( + dataset="scenarios", + status="success", + export_format=request.format.value, + seconds=time.perf_counter() - start, + ) return response data = export_scenarios_to_excel(scenarios) + _record_export_audit( + uow=uow, + dataset="scenarios", + status="success", + export_format=request.format, + row_count=len(scenarios), + filename=f"{filename}.xlsx", + ) + logger.info( + "export", + extra={ + "event": "export", + "dataset": "scenarios", + "status": "success", + "format": request.format.value, + "row_count": len(scenarios), + "filename": f"{filename}.xlsx", + }, + ) + observe_export( + dataset="scenarios", + status="success", + export_format=request.format.value, + seconds=time.perf_counter() - start, + ) return StreamingResponse( iter([data]), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", diff --git a/services/importers.py b/services/importers.py index 4594783..c1107b5 100644 --- a/services/importers.py +++ b/services/importers.py @@ -1,9 +1,11 @@ from __future__ import annotations +import logging +import time from dataclasses import dataclass from enum import Enum from pathlib import Path -from typing import Any, BinaryIO, Callable, Generic, Iterable, Mapping, TypeVar, cast +from typing import Any, BinaryIO, Callable, Generic, Iterable, Mapping, Optional, TypeVar, cast from uuid import uuid4 from types import MappingProxyType @@ -14,6 +16,10 @@ from pydantic import BaseModel, ValidationError from models import Project, Scenario from schemas.imports import ProjectImportRow, ScenarioImportRow from services.unit_of_work import UnitOfWork +from models.import_export_log import ImportExportLog +from monitoring.metrics import observe_import + +logger = logging.getLogger(__name__) TImportRow = TypeVar("TImportRow", bound=BaseModel) @@ -164,7 +170,34 @@ class ImportIngestionService: stream: BinaryIO, filename: str, ) -> ImportPreview[ProjectImportRow]: + start = time.perf_counter() result = load_project_imports(stream, filename) + status = "success" if not result.errors else "partial" + self._record_audit_log( + action="preview", + dataset="projects", + status=status, + filename=filename, + row_count=len(result.rows), + detail=f"accepted={len(result.rows)} parser_errors={len(result.errors)}", + ) + observe_import( + action="preview", + dataset="projects", + status=status, + seconds=time.perf_counter() - start, + ) + logger.info( + "import.preview", + extra={ + "event": "import.preview", + "dataset": "projects", + "status": status, + "filename": filename, + "row_count": len(result.rows), + "error_count": len(result.errors), + }, + ) parser_errors = result.errors preview_rows: list[ImportPreviewRow[ProjectImportRow]] = [] @@ -258,7 +291,34 @@ class ImportIngestionService: stream: BinaryIO, filename: str, ) -> ImportPreview[ScenarioImportRow]: + start = time.perf_counter() result = load_scenario_imports(stream, filename) + status = "success" if not result.errors else "partial" + self._record_audit_log( + action="preview", + dataset="scenarios", + status=status, + filename=filename, + row_count=len(result.rows), + detail=f"accepted={len(result.rows)} parser_errors={len(result.errors)}", + ) + observe_import( + action="preview", + dataset="scenarios", + status=status, + seconds=time.perf_counter() - start, + ) + logger.info( + "import.preview", + extra={ + "event": "import.preview", + "dataset": "scenarios", + "status": status, + "filename": filename, + "row_count": len(result.rows), + "error_count": len(result.errors), + }, + ) parser_errors = result.errors preview_rows: list[ImportPreviewRow[ScenarioImportRow]] = [] @@ -423,46 +483,101 @@ class ImportIngestionService: staged_view = _build_staged_view(staged) created = updated = 0 - with self._uow_factory() as uow: - if not uow.projects: - raise RuntimeError("Project repository is unavailable") + start = time.perf_counter() + try: + with self._uow_factory() as uow: + if not uow.projects: + raise RuntimeError("Project repository is unavailable") - for row in staged.rows: - mode = row.context.get("mode") - data = row.parsed.data + for row in staged.rows: + mode = row.context.get("mode") + data = row.parsed.data - if mode == "create": - project = Project( - name=data.name, - location=data.location, - operation_type=data.operation_type, - description=data.description, - ) - if data.created_at: - project.created_at = data.created_at - if data.updated_at: - project.updated_at = data.updated_at - uow.projects.create(project) - created += 1 - elif mode == "update": - project_id = row.context.get("project_id") - if not project_id: - raise ValueError( - "Staged project update is missing project_id context" + if mode == "create": + project = Project( + name=data.name, + location=data.location, + operation_type=data.operation_type, + description=data.description, ) - project = uow.projects.get(project_id) - project.name = data.name - project.location = data.location - project.operation_type = data.operation_type - project.description = data.description - if data.created_at: - project.created_at = data.created_at - if data.updated_at: - project.updated_at = data.updated_at - updated += 1 - else: - raise ValueError( - f"Unsupported staged project mode: {mode!r}") + if data.created_at: + project.created_at = data.created_at + if data.updated_at: + project.updated_at = data.updated_at + uow.projects.create(project) + created += 1 + elif mode == "update": + project_id = row.context.get("project_id") + if not project_id: + raise ValueError( + "Staged project update is missing project_id context" + ) + project = uow.projects.get(project_id) + project.name = data.name + project.location = data.location + project.operation_type = data.operation_type + project.description = data.description + if data.created_at: + project.created_at = data.created_at + if data.updated_at: + project.updated_at = data.updated_at + updated += 1 + else: + raise ValueError( + f"Unsupported staged project mode: {mode!r}") + except Exception as exc: + self._record_audit_log( + action="commit", + dataset="projects", + status="failure", + filename=None, + row_count=len(staged.rows), + detail=f"error={type(exc).__name__}: {exc}", + ) + observe_import( + action="commit", + dataset="projects", + status="failure", + seconds=time.perf_counter() - start, + ) + logger.exception( + "import.commit.failed", + extra={ + "event": "import.commit", + "dataset": "projects", + "status": "failure", + "row_count": len(staged.rows), + "token": token, + }, + ) + raise + else: + self._record_audit_log( + action="commit", + dataset="projects", + status="success", + filename=None, + row_count=len(staged.rows), + detail=f"created={created} updated={updated}", + ) + observe_import( + action="commit", + dataset="projects", + status="success", + seconds=time.perf_counter() - start, + ) + logger.info( + "import.commit", + extra={ + "event": "import.commit", + "dataset": "projects", + "status": "success", + "row_count": len(staged.rows), + "created": created, + "updated": updated, + "token": token, + }, + ) self._project_stage.pop(token, None) return ImportCommitResult( @@ -479,64 +594,119 @@ class ImportIngestionService: staged_view = _build_staged_view(staged) created = updated = 0 - with self._uow_factory() as uow: - if not uow.scenarios or not uow.projects: - raise RuntimeError("Scenario repositories are unavailable") + start = time.perf_counter() + try: + with self._uow_factory() as uow: + if not uow.scenarios or not uow.projects: + raise RuntimeError("Scenario repositories are unavailable") - for row in staged.rows: - mode = row.context.get("mode") - data = row.parsed.data + for row in staged.rows: + mode = row.context.get("mode") + data = row.parsed.data - project_id = row.context.get("project_id") - if not project_id: - raise ValueError( - "Staged scenario row is missing project_id context" - ) - - project = uow.projects.get(project_id) - - if mode == "create": - scenario = Scenario( - project_id=project.id, - name=data.name, - status=data.status, - start_date=data.start_date, - end_date=data.end_date, - discount_rate=data.discount_rate, - currency=data.currency, - primary_resource=data.primary_resource, - description=data.description, - ) - if data.created_at: - scenario.created_at = data.created_at - if data.updated_at: - scenario.updated_at = data.updated_at - uow.scenarios.create(scenario) - created += 1 - elif mode == "update": - scenario_id = row.context.get("scenario_id") - if not scenario_id: + project_id = row.context.get("project_id") + if not project_id: raise ValueError( - "Staged scenario update is missing scenario_id context" + "Staged scenario row is missing project_id context" ) - scenario = uow.scenarios.get(scenario_id) - scenario.project_id = project.id - scenario.name = data.name - scenario.status = data.status - scenario.start_date = data.start_date - scenario.end_date = data.end_date - scenario.discount_rate = data.discount_rate - scenario.currency = data.currency - scenario.primary_resource = data.primary_resource - scenario.description = data.description - if data.created_at: - scenario.created_at = data.created_at - if data.updated_at: - scenario.updated_at = data.updated_at - updated += 1 - else: - raise ValueError( - f"Unsupported staged scenario mode: {mode!r}") + + project = uow.projects.get(project_id) + + if mode == "create": + scenario = Scenario( + project_id=project.id, + name=data.name, + status=data.status, + start_date=data.start_date, + end_date=data.end_date, + discount_rate=data.discount_rate, + currency=data.currency, + primary_resource=data.primary_resource, + description=data.description, + ) + if data.created_at: + scenario.created_at = data.created_at + if data.updated_at: + scenario.updated_at = data.updated_at + uow.scenarios.create(scenario) + created += 1 + elif mode == "update": + scenario_id = row.context.get("scenario_id") + if not scenario_id: + raise ValueError( + "Staged scenario update is missing scenario_id context" + ) + scenario = uow.scenarios.get(scenario_id) + scenario.project_id = project.id + scenario.name = data.name + scenario.status = data.status + scenario.start_date = data.start_date + scenario.end_date = data.end_date + scenario.discount_rate = data.discount_rate + scenario.currency = data.currency + scenario.primary_resource = data.primary_resource + scenario.description = data.description + if data.created_at: + scenario.created_at = data.created_at + if data.updated_at: + scenario.updated_at = data.updated_at + updated += 1 + else: + raise ValueError( + f"Unsupported staged scenario mode: {mode!r}") + except Exception as exc: + self._record_audit_log( + action="commit", + dataset="scenarios", + status="failure", + filename=None, + row_count=len(staged.rows), + detail=f"error={type(exc).__name__}: {exc}", + ) + observe_import( + action="commit", + dataset="scenarios", + status="failure", + seconds=time.perf_counter() - start, + ) + logger.exception( + "import.commit.failed", + extra={ + "event": "import.commit", + "dataset": "scenarios", + "status": "failure", + "row_count": len(staged.rows), + "token": token, + }, + ) + raise + else: + self._record_audit_log( + action="commit", + dataset="scenarios", + status="success", + filename=None, + row_count=len(staged.rows), + detail=f"created={created} updated={updated}", + ) + observe_import( + action="commit", + dataset="scenarios", + status="success", + seconds=time.perf_counter() - start, + ) + logger.info( + "import.commit", + extra={ + "event": "import.commit", + "dataset": "scenarios", + "status": "success", + "row_count": len(staged.rows), + "created": created, + "updated": updated, + "token": token, + }, + ) self._scenario_stage.pop(token, None) return ImportCommitResult( @@ -545,6 +715,34 @@ class ImportIngestionService: summary=ImportCommitSummary(created=created, updated=updated), ) + def _record_audit_log( + self, + *, + action: str, + dataset: str, + status: str, + row_count: int, + detail: Optional[str], + filename: Optional[str], + ) -> None: + try: + with self._uow_factory() as uow: + if uow.session is None: + return + log = ImportExportLog( + action=action, + dataset=dataset, + status=status, + filename=filename, + row_count=row_count, + detail=detail, + ) + uow.session.add(log) + uow.commit() + except Exception: + # Audit logging must not break core workflows + pass + def _store_project_stage( self, rows: list[StagedRow[ProjectImportRow]] ) -> str: diff --git a/tests/test_import_export_integration.py b/tests/test_import_export_integration.py new file mode 100644 index 0000000..f74e318 --- /dev/null +++ b/tests/test_import_export_integration.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +from io import BytesIO + +import pandas as pd +import pytest +from fastapi.testclient import TestClient + +from models import ( + MiningOperationType, + Project, + Scenario, + ScenarioStatus, +) +from models.import_export_log import ImportExportLog + + +@pytest.fixture() +def project_seed(unit_of_work_factory): + with unit_of_work_factory() as uow: + assert uow.projects is not None + project = Project(name="Seed Project", operation_type=MiningOperationType.OPEN_PIT) + uow.projects.create(project) + yield project + + +def test_project_import_preview_and_commit(client: TestClient, unit_of_work_factory) -> None: + csv_content = ( + "name,location,operation_type\n" + "Project Import A,Chile,open pit\n" + "Project Import B,Canada,underground\n" + ) + files = {"file": ("projects.csv", csv_content, "text/csv")} + + preview_response = client.post("/imports/projects/preview", files=files) + assert preview_response.status_code == 200 + preview_payload = preview_response.json() + assert preview_payload["summary"]["accepted"] == 2 + assert preview_payload["stage_token"] + + token = preview_payload["stage_token"] + + commit_response = client.post("/imports/projects/commit", json={"token": token}) + assert commit_response.status_code == 200 + commit_payload = commit_response.json() + assert commit_payload["summary"]["created"] == 2 + + with unit_of_work_factory() as uow: + assert uow.projects is not None + names = {project.name for project in uow.projects.list()} + assert {"Project Import A", "Project Import B"}.issubset(names) + # ensure audit logs recorded preview and commit events + assert uow.session is not None + logs = ( + uow.session.query(ImportExportLog) + .filter(ImportExportLog.dataset == "projects") + .order_by(ImportExportLog.created_at) + .all() + ) + actions = [log.action for log in logs] + assert "preview" in actions + assert "commit" in actions + + +def test_scenario_import_preview_and_commit(client: TestClient, unit_of_work_factory, project_seed) -> None: + csv_content = ( + "project_name,name,status\n" + "Seed Project,Scenario Import A,Draft\n" + "Seed Project,Scenario Import B,Active\n" + ) + files = {"file": ("scenarios.csv", csv_content, "text/csv")} + + preview_response = client.post("/imports/scenarios/preview", files=files) + assert preview_response.status_code == 200 + preview_payload = preview_response.json() + assert preview_payload["summary"]["accepted"] == 2 + token = preview_payload["stage_token"] + + commit_response = client.post("/imports/scenarios/commit", json={"token": token}) + assert commit_response.status_code == 200 + commit_payload = commit_response.json() + assert commit_payload["summary"]["created"] == 2 + + with unit_of_work_factory() as uow: + assert uow.projects is not None and uow.scenarios is not None + project = uow.projects.list()[0] + scenarios = uow.scenarios.list_for_project(project.id) + names = {scenario.name for scenario in scenarios} + assert {"Scenario Import A", "Scenario Import B"}.issubset(names) + assert uow.session is not None + logs = ( + uow.session.query(ImportExportLog) + .filter(ImportExportLog.dataset == "scenarios") + .order_by(ImportExportLog.created_at) + .all() + ) + actions = [log.action for log in logs] + assert "preview" in actions + assert "commit" in actions + + +def test_project_export_endpoint(client: TestClient, unit_of_work_factory) -> None: + with unit_of_work_factory() as uow: + assert uow.projects is not None + uow.projects.create(Project(name="Export Project", operation_type=MiningOperationType.OPEN_PIT)) + + response = client.post("/exports/projects", json={"format": "csv"}) + assert response.status_code == 200 + assert response.headers["Content-Type"].startswith("text/csv") + assert "attachment; filename=" in response.headers["Content-Disposition"] + body = response.content.decode("utf-8") + assert "Export Project" in body + + with unit_of_work_factory() as uow: + assert uow.session is not None + logs = ( + uow.session.query(ImportExportLog) + .filter(ImportExportLog.dataset == "projects", ImportExportLog.action == "export") + .order_by(ImportExportLog.created_at.desc()) + .first() + ) + assert logs is not None + assert logs.status == "success" + assert logs.row_count >= 1 \ No newline at end of file diff --git a/tests/test_import_parsing.py b/tests/test_import_parsing.py index 6c79974..2f08be7 100644 --- a/tests/test_import_parsing.py +++ b/tests/test_import_parsing.py @@ -1,12 +1,14 @@ from __future__ import annotations from io import BytesIO +from textwrap import dedent import pandas as pd import pytest from services.importers import ImportResult, load_project_imports, load_scenario_imports from schemas.imports import ProjectImportRow, ScenarioImportRow +from models.project import MiningOperationType def test_load_project_imports_from_csv() -> None: @@ -76,3 +78,65 @@ def test_import_errors_include_row_numbers() -> None: assert error.row_number == 2 assert error.field == "name" assert "required" in error.message + + +def test_project_import_handles_missing_columns() -> None: + csv_content = "name\nProject Only\n" + stream = BytesIO(csv_content.encode("utf-8")) + + result = load_project_imports(stream, "projects.csv") + + assert result.rows == [] + assert len(result.errors) == 1 + error = result.errors[0] + assert error.row_number == 2 + assert error.field == "operation_type" + + +def test_project_import_rejects_invalid_operation_type() -> None: + csv_content = "name,operation_type\nProject X,unknown\n" + stream = BytesIO(csv_content.encode("utf-8")) + + result = load_project_imports(stream, "projects.csv") + + assert len(result.rows) == 0 + assert len(result.errors) == 1 + error = result.errors[0] + assert error.row_number == 2 + assert error.field == "operation_type" + + +def test_scenario_import_flags_invalid_dates() -> None: + csv_content = dedent( + """ + project_name,name,status,start_date,end_date + Project A,Scenario Reverse,Draft,2025-12-31,2025-01-01 + """ + ).strip() + stream = BytesIO(csv_content.encode("utf-8")) + + result = load_scenario_imports(stream, "scenarios.csv") + + assert len(result.rows) == 0 + assert len(result.errors) == 1 + error = result.errors[0] + assert error.row_number == 2 + assert error.field is None + + +def test_scenario_import_handles_large_dataset() -> None: + buffer = BytesIO() + df = pd.DataFrame( + { + "project_name": ["Project"] * 500, + "name": [f"Scenario {i}" for i in range(500)], + "status": ["draft"] * 500, + } + ) + df.to_csv(buffer, index=False) + buffer.seek(0) + + result = load_scenario_imports(buffer, "bulk.csv") + + assert len(result.rows) == 500 + assert len(result.rows) == 500