feat: implement persistent audit logging for import/export operations with Prometheus metrics

This commit is contained in:
2025-11-10 21:37:07 +01:00
parent 51c0fcec95
commit 032e6d2681
10 changed files with 760 additions and 96 deletions

View File

@@ -1,9 +1,11 @@
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, BinaryIO, Callable, Generic, Iterable, Mapping, TypeVar, cast
from typing import Any, BinaryIO, Callable, Generic, Iterable, Mapping, Optional, TypeVar, cast
from uuid import uuid4
from types import MappingProxyType
@@ -14,6 +16,10 @@ from pydantic import BaseModel, ValidationError
from models import Project, Scenario
from schemas.imports import ProjectImportRow, ScenarioImportRow
from services.unit_of_work import UnitOfWork
from models.import_export_log import ImportExportLog
from monitoring.metrics import observe_import
logger = logging.getLogger(__name__)
TImportRow = TypeVar("TImportRow", bound=BaseModel)
@@ -164,7 +170,34 @@ class ImportIngestionService:
stream: BinaryIO,
filename: str,
) -> ImportPreview[ProjectImportRow]:
start = time.perf_counter()
result = load_project_imports(stream, filename)
status = "success" if not result.errors else "partial"
self._record_audit_log(
action="preview",
dataset="projects",
status=status,
filename=filename,
row_count=len(result.rows),
detail=f"accepted={len(result.rows)} parser_errors={len(result.errors)}",
)
observe_import(
action="preview",
dataset="projects",
status=status,
seconds=time.perf_counter() - start,
)
logger.info(
"import.preview",
extra={
"event": "import.preview",
"dataset": "projects",
"status": status,
"filename": filename,
"row_count": len(result.rows),
"error_count": len(result.errors),
},
)
parser_errors = result.errors
preview_rows: list[ImportPreviewRow[ProjectImportRow]] = []
@@ -258,7 +291,34 @@ class ImportIngestionService:
stream: BinaryIO,
filename: str,
) -> ImportPreview[ScenarioImportRow]:
start = time.perf_counter()
result = load_scenario_imports(stream, filename)
status = "success" if not result.errors else "partial"
self._record_audit_log(
action="preview",
dataset="scenarios",
status=status,
filename=filename,
row_count=len(result.rows),
detail=f"accepted={len(result.rows)} parser_errors={len(result.errors)}",
)
observe_import(
action="preview",
dataset="scenarios",
status=status,
seconds=time.perf_counter() - start,
)
logger.info(
"import.preview",
extra={
"event": "import.preview",
"dataset": "scenarios",
"status": status,
"filename": filename,
"row_count": len(result.rows),
"error_count": len(result.errors),
},
)
parser_errors = result.errors
preview_rows: list[ImportPreviewRow[ScenarioImportRow]] = []
@@ -423,46 +483,101 @@ class ImportIngestionService:
staged_view = _build_staged_view(staged)
created = updated = 0
with self._uow_factory() as uow:
if not uow.projects:
raise RuntimeError("Project repository is unavailable")
start = time.perf_counter()
try:
with self._uow_factory() as uow:
if not uow.projects:
raise RuntimeError("Project repository is unavailable")
for row in staged.rows:
mode = row.context.get("mode")
data = row.parsed.data
for row in staged.rows:
mode = row.context.get("mode")
data = row.parsed.data
if mode == "create":
project = Project(
name=data.name,
location=data.location,
operation_type=data.operation_type,
description=data.description,
)
if data.created_at:
project.created_at = data.created_at
if data.updated_at:
project.updated_at = data.updated_at
uow.projects.create(project)
created += 1
elif mode == "update":
project_id = row.context.get("project_id")
if not project_id:
raise ValueError(
"Staged project update is missing project_id context"
if mode == "create":
project = Project(
name=data.name,
location=data.location,
operation_type=data.operation_type,
description=data.description,
)
project = uow.projects.get(project_id)
project.name = data.name
project.location = data.location
project.operation_type = data.operation_type
project.description = data.description
if data.created_at:
project.created_at = data.created_at
if data.updated_at:
project.updated_at = data.updated_at
updated += 1
else:
raise ValueError(
f"Unsupported staged project mode: {mode!r}")
if data.created_at:
project.created_at = data.created_at
if data.updated_at:
project.updated_at = data.updated_at
uow.projects.create(project)
created += 1
elif mode == "update":
project_id = row.context.get("project_id")
if not project_id:
raise ValueError(
"Staged project update is missing project_id context"
)
project = uow.projects.get(project_id)
project.name = data.name
project.location = data.location
project.operation_type = data.operation_type
project.description = data.description
if data.created_at:
project.created_at = data.created_at
if data.updated_at:
project.updated_at = data.updated_at
updated += 1
else:
raise ValueError(
f"Unsupported staged project mode: {mode!r}")
except Exception as exc:
self._record_audit_log(
action="commit",
dataset="projects",
status="failure",
filename=None,
row_count=len(staged.rows),
detail=f"error={type(exc).__name__}: {exc}",
)
observe_import(
action="commit",
dataset="projects",
status="failure",
seconds=time.perf_counter() - start,
)
logger.exception(
"import.commit.failed",
extra={
"event": "import.commit",
"dataset": "projects",
"status": "failure",
"row_count": len(staged.rows),
"token": token,
},
)
raise
else:
self._record_audit_log(
action="commit",
dataset="projects",
status="success",
filename=None,
row_count=len(staged.rows),
detail=f"created={created} updated={updated}",
)
observe_import(
action="commit",
dataset="projects",
status="success",
seconds=time.perf_counter() - start,
)
logger.info(
"import.commit",
extra={
"event": "import.commit",
"dataset": "projects",
"status": "success",
"row_count": len(staged.rows),
"created": created,
"updated": updated,
"token": token,
},
)
self._project_stage.pop(token, None)
return ImportCommitResult(
@@ -479,64 +594,119 @@ class ImportIngestionService:
staged_view = _build_staged_view(staged)
created = updated = 0
with self._uow_factory() as uow:
if not uow.scenarios or not uow.projects:
raise RuntimeError("Scenario repositories are unavailable")
start = time.perf_counter()
try:
with self._uow_factory() as uow:
if not uow.scenarios or not uow.projects:
raise RuntimeError("Scenario repositories are unavailable")
for row in staged.rows:
mode = row.context.get("mode")
data = row.parsed.data
for row in staged.rows:
mode = row.context.get("mode")
data = row.parsed.data
project_id = row.context.get("project_id")
if not project_id:
raise ValueError(
"Staged scenario row is missing project_id context"
)
project = uow.projects.get(project_id)
if mode == "create":
scenario = Scenario(
project_id=project.id,
name=data.name,
status=data.status,
start_date=data.start_date,
end_date=data.end_date,
discount_rate=data.discount_rate,
currency=data.currency,
primary_resource=data.primary_resource,
description=data.description,
)
if data.created_at:
scenario.created_at = data.created_at
if data.updated_at:
scenario.updated_at = data.updated_at
uow.scenarios.create(scenario)
created += 1
elif mode == "update":
scenario_id = row.context.get("scenario_id")
if not scenario_id:
project_id = row.context.get("project_id")
if not project_id:
raise ValueError(
"Staged scenario update is missing scenario_id context"
"Staged scenario row is missing project_id context"
)
scenario = uow.scenarios.get(scenario_id)
scenario.project_id = project.id
scenario.name = data.name
scenario.status = data.status
scenario.start_date = data.start_date
scenario.end_date = data.end_date
scenario.discount_rate = data.discount_rate
scenario.currency = data.currency
scenario.primary_resource = data.primary_resource
scenario.description = data.description
if data.created_at:
scenario.created_at = data.created_at
if data.updated_at:
scenario.updated_at = data.updated_at
updated += 1
else:
raise ValueError(
f"Unsupported staged scenario mode: {mode!r}")
project = uow.projects.get(project_id)
if mode == "create":
scenario = Scenario(
project_id=project.id,
name=data.name,
status=data.status,
start_date=data.start_date,
end_date=data.end_date,
discount_rate=data.discount_rate,
currency=data.currency,
primary_resource=data.primary_resource,
description=data.description,
)
if data.created_at:
scenario.created_at = data.created_at
if data.updated_at:
scenario.updated_at = data.updated_at
uow.scenarios.create(scenario)
created += 1
elif mode == "update":
scenario_id = row.context.get("scenario_id")
if not scenario_id:
raise ValueError(
"Staged scenario update is missing scenario_id context"
)
scenario = uow.scenarios.get(scenario_id)
scenario.project_id = project.id
scenario.name = data.name
scenario.status = data.status
scenario.start_date = data.start_date
scenario.end_date = data.end_date
scenario.discount_rate = data.discount_rate
scenario.currency = data.currency
scenario.primary_resource = data.primary_resource
scenario.description = data.description
if data.created_at:
scenario.created_at = data.created_at
if data.updated_at:
scenario.updated_at = data.updated_at
updated += 1
else:
raise ValueError(
f"Unsupported staged scenario mode: {mode!r}")
except Exception as exc:
self._record_audit_log(
action="commit",
dataset="scenarios",
status="failure",
filename=None,
row_count=len(staged.rows),
detail=f"error={type(exc).__name__}: {exc}",
)
observe_import(
action="commit",
dataset="scenarios",
status="failure",
seconds=time.perf_counter() - start,
)
logger.exception(
"import.commit.failed",
extra={
"event": "import.commit",
"dataset": "scenarios",
"status": "failure",
"row_count": len(staged.rows),
"token": token,
},
)
raise
else:
self._record_audit_log(
action="commit",
dataset="scenarios",
status="success",
filename=None,
row_count=len(staged.rows),
detail=f"created={created} updated={updated}",
)
observe_import(
action="commit",
dataset="scenarios",
status="success",
seconds=time.perf_counter() - start,
)
logger.info(
"import.commit",
extra={
"event": "import.commit",
"dataset": "scenarios",
"status": "success",
"row_count": len(staged.rows),
"created": created,
"updated": updated,
"token": token,
},
)
self._scenario_stage.pop(token, None)
return ImportCommitResult(
@@ -545,6 +715,34 @@ class ImportIngestionService:
summary=ImportCommitSummary(created=created, updated=updated),
)
def _record_audit_log(
self,
*,
action: str,
dataset: str,
status: str,
row_count: int,
detail: Optional[str],
filename: Optional[str],
) -> None:
try:
with self._uow_factory() as uow:
if uow.session is None:
return
log = ImportExportLog(
action=action,
dataset=dataset,
status=status,
filename=filename,
row_count=row_count,
detail=detail,
)
uow.session.add(log)
uow.commit()
except Exception:
# Audit logging must not break core workflows
pass
def _store_project_stage(
self, rows: list[StagedRow[ProjectImportRow]]
) -> str: