from __future__ import annotations from io import BytesIO from typing import Callable import pandas as pd import pytest from models.project import MiningOperationType, Project from models.scenario import Scenario, ScenarioStatus from services.importers import ( ImportCommitResult, ImportIngestionService, ImportPreviewState, StagedImportView, ) from services.repositories import ProjectRepository from services.unit_of_work import UnitOfWork from schemas.imports import ( ProjectImportCommitResponse, ProjectImportPreviewResponse, ScenarioImportCommitResponse, ScenarioImportPreviewResponse, ) @pytest.fixture() def ingestion_service(unit_of_work_factory: Callable[[], UnitOfWork]) -> ImportIngestionService: return ImportIngestionService(unit_of_work_factory) def test_preview_projects_flags_updates_and_duplicates( ingestion_service: ImportIngestionService, unit_of_work_factory: Callable[[], UnitOfWork], ) -> None: with unit_of_work_factory() as uow: assert uow.projects is not None existing = Project( name="Project A", location="Chile", operation_type=MiningOperationType.OPEN_PIT, ) uow.projects.create(existing) csv_content = ( "name,location,operation_type\n" "Project A,Peru,open pit\n" "Project B,Canada,underground\n" "Project B,Canada,underground\n" ) stream = BytesIO(csv_content.encode("utf-8")) preview = ingestion_service.preview_projects(stream, "projects.csv") states = [row.state for row in preview.rows] assert states == [ ImportPreviewState.UPDATE, ImportPreviewState.NEW, ImportPreviewState.SKIP, ] assert preview.summary.total_rows == 3 assert preview.summary.accepted == 2 assert preview.summary.skipped == 1 assert preview.summary.errored == 0 assert preview.parser_errors == [] assert preview.stage_token is not None issue_map = {bundle.row_number: bundle for bundle in preview.row_issues} assert 2 in issue_map and issue_map[2].state == ImportPreviewState.UPDATE assert { detail.message for detail in issue_map[2].issues } == {"Existing project will be updated."} assert 4 in issue_map and issue_map[4].state == ImportPreviewState.SKIP assert any( "Duplicate project name" in detail.message for detail in issue_map[4].issues ) # type: ignore[attr-defined] staged = ingestion_service._project_stage[preview.stage_token] assert len(staged.rows) == 2 update_context = preview.rows[0].context assert update_context is not None and update_context.get( "project_id") is not None response_model = ProjectImportPreviewResponse.model_validate(preview) assert response_model.summary.accepted == preview.summary.accepted def test_preview_scenarios_validates_projects_and_updates( ingestion_service: ImportIngestionService, unit_of_work_factory: Callable[[], UnitOfWork], ) -> None: with unit_of_work_factory() as uow: assert uow.projects is not None and uow.scenarios is not None project = Project( name="Existing Project", location="Chile", operation_type=MiningOperationType.OPEN_PIT, ) uow.projects.create(project) scenario = Scenario( project_id=project.id, name="Existing Scenario", status=ScenarioStatus.ACTIVE, ) uow.scenarios.create(scenario) df = pd.DataFrame( [ { "project_name": "Existing Project", "name": "Existing Scenario", "status": "Active", }, { "project_name": "Existing Project", "name": "New Scenario", "status": "Draft", }, { "project_name": "Missing Project", "name": "Ghost Scenario", "status": "Draft", }, { "project_name": "Existing Project", "name": "New Scenario", "status": "Draft", }, ] ) buffer = BytesIO() df.to_csv(buffer, index=False) buffer.seek(0) preview = ingestion_service.preview_scenarios(buffer, "scenarios.csv") states = [row.state for row in preview.rows] assert states == [ ImportPreviewState.UPDATE, ImportPreviewState.NEW, ImportPreviewState.ERROR, ImportPreviewState.SKIP, ] assert preview.summary.total_rows == 4 assert preview.summary.accepted == 2 assert preview.summary.skipped == 1 assert preview.summary.errored == 1 assert preview.stage_token is not None issue_map = {bundle.row_number: bundle for bundle in preview.row_issues} assert 2 in issue_map and issue_map[2].state == ImportPreviewState.UPDATE assert 4 in issue_map and issue_map[4].state == ImportPreviewState.ERROR assert any( "does not exist" in detail.message for detail in issue_map[4].issues ) # type: ignore[attr-defined] staged = ingestion_service._scenario_stage[preview.stage_token] assert len(staged.rows) == 2 error_row = preview.rows[2] assert any("does not exist" in msg for msg in error_row.issues) response_model = ScenarioImportPreviewResponse.model_validate(preview) assert response_model.summary.errored == preview.summary.errored def test_preview_scenarios_aggregates_parser_errors( ingestion_service: ImportIngestionService, unit_of_work_factory: Callable[[], UnitOfWork], ) -> None: with unit_of_work_factory() as uow: assert uow.projects is not None project = Project( name="Existing Project", location="Chile", operation_type=MiningOperationType.OPEN_PIT, ) uow.projects.create(project) csv_content = ( "project_name,name,status\n" "Existing Project,Broken Scenario,UNKNOWN_STATUS\n" ) stream = BytesIO(csv_content.encode("utf-8")) preview = ingestion_service.preview_scenarios(stream, "invalid.csv") assert preview.rows == [] assert preview.summary.total_rows == 1 assert preview.summary.errored == 1 assert preview.stage_token is None assert len(preview.parser_errors) == 1 issue_map = {bundle.row_number: bundle for bundle in preview.row_issues} assert 2 in issue_map bundle = issue_map[2] assert bundle.state == ImportPreviewState.ERROR assert any(detail.field == "status" for detail in bundle.issues) assert all(detail.message for detail in bundle.issues) response_model = ScenarioImportPreviewResponse.model_validate(preview) assert response_model.summary.total_rows == 1 def test_consume_staged_projects_removes_token( ingestion_service: ImportIngestionService, unit_of_work_factory: Callable[[], UnitOfWork], ) -> None: with unit_of_work_factory() as uow: assert uow.projects is not None csv_content = ( "name,location,operation_type\n" "Project X,Peru,open pit\n" ) stream = BytesIO(csv_content.encode("utf-8")) preview = ingestion_service.preview_projects(stream, "projects.csv") assert preview.stage_token is not None token = preview.stage_token initial_view = ingestion_service.get_staged_projects(token) assert isinstance(initial_view, StagedImportView) consumed = ingestion_service.consume_staged_projects(token) assert consumed == initial_view assert ingestion_service.get_staged_projects(token) is None assert ingestion_service.consume_staged_projects(token) is None def test_clear_staged_scenarios_drops_entry( ingestion_service: ImportIngestionService, unit_of_work_factory: Callable[[], UnitOfWork], ) -> None: with unit_of_work_factory() as uow: assert uow.projects is not None project = Project( name="Project Y", location="Chile", operation_type=MiningOperationType.OPEN_PIT, ) uow.projects.create(project) csv_content = ( "project_name,name,status\n" "Project Y,Scenario 1,Active\n" ) stream = BytesIO(csv_content.encode("utf-8")) preview = ingestion_service.preview_scenarios(stream, "scenarios.csv") assert preview.stage_token is not None token = preview.stage_token assert ingestion_service.get_staged_scenarios(token) is not None assert ingestion_service.clear_staged_scenarios(token) is True assert ingestion_service.get_staged_scenarios(token) is None assert ingestion_service.clear_staged_scenarios(token) is False def test_commit_project_import_applies_create_and_update( ingestion_service: ImportIngestionService, unit_of_work_factory: Callable[[], UnitOfWork], ) -> None: with unit_of_work_factory() as uow: assert uow.projects is not None existing = Project( name="Project A", location="Chile", operation_type=MiningOperationType.OPEN_PIT, ) uow.projects.create(existing) csv_content = ( "name,location,operation_type\n" "Project A,Peru,underground\n" "Project B,Canada,open pit\n" ) stream = BytesIO(csv_content.encode("utf-8")) preview = ingestion_service.preview_projects(stream, "projects.csv") assert preview.stage_token is not None result = ingestion_service.commit_project_import(preview.stage_token) assert isinstance(result, ImportCommitResult) assert result.summary.created == 1 assert result.summary.updated == 1 assert ingestion_service.get_staged_projects(preview.stage_token) is None commit_response = ProjectImportCommitResponse.model_validate(result) assert commit_response.summary.updated == 1 with unit_of_work_factory() as uow: assert uow.projects is not None projects = uow.projects.list() names = sorted(project.name for project in projects) assert names == ["Project A", "Project B"] updated_project = next(p for p in projects if p.name == "Project A") assert updated_project.location == "Peru" assert updated_project.operation_type == MiningOperationType.UNDERGROUND new_project = next(p for p in projects if p.name == "Project B") assert new_project.location == "Canada" def test_commit_project_import_with_invalid_token_raises( ingestion_service: ImportIngestionService, ) -> None: with pytest.raises(ValueError): ingestion_service.commit_project_import("missing-token") def test_commit_scenario_import_applies_create_and_update( ingestion_service: ImportIngestionService, unit_of_work_factory: Callable[[], UnitOfWork], ) -> None: with unit_of_work_factory() as uow: assert uow.projects is not None and uow.scenarios is not None project = Project( name="Project X", location="Chile", operation_type=MiningOperationType.OPEN_PIT, ) uow.projects.create(project) scenario = Scenario( project_id=project.id, name="Existing Scenario", status=ScenarioStatus.ACTIVE, ) uow.scenarios.create(scenario) csv_content = ( "project_name,name,status\n" "Project X,Existing Scenario,Archived\n" "Project X,New Scenario,Draft\n" ) stream = BytesIO(csv_content.encode("utf-8")) preview = ingestion_service.preview_scenarios(stream, "scenarios.csv") assert preview.stage_token is not None result = ingestion_service.commit_scenario_import(preview.stage_token) assert result.summary.created == 1 assert result.summary.updated == 1 assert ingestion_service.get_staged_scenarios(preview.stage_token) is None commit_response = ScenarioImportCommitResponse.model_validate(result) assert commit_response.summary.created == 1 with unit_of_work_factory() as uow: assert uow.projects is not None and uow.scenarios is not None scenarios = uow.scenarios.list_for_project(uow.projects.list()[0].id) names = sorted(scenario.name for scenario in scenarios) assert names == ["Existing Scenario", "New Scenario"] updated_scenario = next( item for item in scenarios if item.name == "Existing Scenario" ) assert updated_scenario.status == ScenarioStatus.ARCHIVED new_scenario = next( item for item in scenarios if item.name == "New Scenario" ) assert new_scenario.status == ScenarioStatus.DRAFT def test_commit_scenario_import_with_invalid_token_raises( ingestion_service: ImportIngestionService, ) -> None: with pytest.raises(ValueError): ingestion_service.commit_scenario_import("missing-token") def test_commit_project_import_rolls_back_on_failure( ingestion_service: ImportIngestionService, unit_of_work_factory: Callable[[], UnitOfWork], monkeypatch: pytest.MonkeyPatch, ) -> None: with unit_of_work_factory() as uow: assert uow.projects is not None csv_content = ( "name,location,operation_type\n" "Project Fail,Peru,open pit\n" ) stream = BytesIO(csv_content.encode("utf-8")) preview = ingestion_service.preview_projects(stream, "projects.csv") assert preview.stage_token is not None token = preview.stage_token def _boom(self: ProjectRepository, project: Project) -> Project: raise RuntimeError("boom") monkeypatch.setattr(ProjectRepository, "create", _boom) with pytest.raises(RuntimeError): ingestion_service.commit_project_import(token) # Token should still be present for retry. assert ingestion_service.get_staged_projects(token) is not None with unit_of_work_factory() as uow: assert uow.projects is not None assert uow.projects.count() == 0