396 lines
14 KiB
Python
396 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
from io import BytesIO
|
|
from typing import Callable
|
|
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
from models.project import MiningOperationType, Project
|
|
from models.scenario import Scenario, ScenarioStatus
|
|
from services.importers import (
|
|
ImportCommitResult,
|
|
ImportIngestionService,
|
|
ImportPreviewState,
|
|
StagedImportView,
|
|
)
|
|
from services.repositories import ProjectRepository
|
|
from services.unit_of_work import UnitOfWork
|
|
from schemas.imports import (
|
|
ProjectImportCommitResponse,
|
|
ProjectImportPreviewResponse,
|
|
ScenarioImportCommitResponse,
|
|
ScenarioImportPreviewResponse,
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def ingestion_service(unit_of_work_factory: Callable[[], UnitOfWork]) -> ImportIngestionService:
|
|
return ImportIngestionService(unit_of_work_factory)
|
|
|
|
|
|
def test_preview_projects_flags_updates_and_duplicates(
|
|
ingestion_service: ImportIngestionService,
|
|
unit_of_work_factory: Callable[[], UnitOfWork],
|
|
) -> None:
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None
|
|
existing = Project(
|
|
name="Project A",
|
|
location="Chile",
|
|
operation_type=MiningOperationType.OPEN_PIT,
|
|
)
|
|
uow.projects.create(existing)
|
|
|
|
csv_content = (
|
|
"name,location,operation_type\n"
|
|
"Project A,Peru,open pit\n"
|
|
"Project B,Canada,underground\n"
|
|
"Project B,Canada,underground\n"
|
|
)
|
|
stream = BytesIO(csv_content.encode("utf-8"))
|
|
|
|
preview = ingestion_service.preview_projects(stream, "projects.csv")
|
|
|
|
states = [row.state for row in preview.rows]
|
|
assert states == [
|
|
ImportPreviewState.UPDATE,
|
|
ImportPreviewState.NEW,
|
|
ImportPreviewState.SKIP,
|
|
]
|
|
assert preview.summary.total_rows == 3
|
|
assert preview.summary.accepted == 2
|
|
assert preview.summary.skipped == 1
|
|
assert preview.summary.errored == 0
|
|
assert preview.parser_errors == []
|
|
assert preview.stage_token is not None
|
|
issue_map = {bundle.row_number: bundle for bundle in preview.row_issues}
|
|
assert 2 in issue_map and issue_map[2].state == ImportPreviewState.UPDATE
|
|
assert {
|
|
detail.message for detail in issue_map[2].issues
|
|
} == {"Existing project will be updated."}
|
|
assert 4 in issue_map and issue_map[4].state == ImportPreviewState.SKIP
|
|
assert any(
|
|
"Duplicate project name" in detail.message
|
|
for detail in issue_map[4].issues
|
|
)
|
|
# type: ignore[attr-defined]
|
|
staged = ingestion_service._project_stage[preview.stage_token]
|
|
assert len(staged.rows) == 2
|
|
update_context = preview.rows[0].context
|
|
assert update_context is not None and update_context.get(
|
|
"project_id") is not None
|
|
|
|
response_model = ProjectImportPreviewResponse.model_validate(preview)
|
|
assert response_model.summary.accepted == preview.summary.accepted
|
|
|
|
|
|
def test_preview_scenarios_validates_projects_and_updates(
|
|
ingestion_service: ImportIngestionService,
|
|
unit_of_work_factory: Callable[[], UnitOfWork],
|
|
) -> None:
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None and uow.scenarios is not None
|
|
project = Project(
|
|
name="Existing Project",
|
|
location="Chile",
|
|
operation_type=MiningOperationType.OPEN_PIT,
|
|
)
|
|
uow.projects.create(project)
|
|
scenario = Scenario(
|
|
project_id=project.id,
|
|
name="Existing Scenario",
|
|
status=ScenarioStatus.ACTIVE,
|
|
)
|
|
uow.scenarios.create(scenario)
|
|
|
|
df = pd.DataFrame(
|
|
[
|
|
{
|
|
"project_name": "Existing Project",
|
|
"name": "Existing Scenario",
|
|
"status": "Active",
|
|
},
|
|
{
|
|
"project_name": "Existing Project",
|
|
"name": "New Scenario",
|
|
"status": "Draft",
|
|
},
|
|
{
|
|
"project_name": "Missing Project",
|
|
"name": "Ghost Scenario",
|
|
"status": "Draft",
|
|
},
|
|
{
|
|
"project_name": "Existing Project",
|
|
"name": "New Scenario",
|
|
"status": "Draft",
|
|
},
|
|
]
|
|
)
|
|
buffer = BytesIO()
|
|
df.to_csv(buffer, index=False)
|
|
buffer.seek(0)
|
|
|
|
preview = ingestion_service.preview_scenarios(buffer, "scenarios.csv")
|
|
|
|
states = [row.state for row in preview.rows]
|
|
assert states == [
|
|
ImportPreviewState.UPDATE,
|
|
ImportPreviewState.NEW,
|
|
ImportPreviewState.ERROR,
|
|
ImportPreviewState.SKIP,
|
|
]
|
|
assert preview.summary.total_rows == 4
|
|
assert preview.summary.accepted == 2
|
|
assert preview.summary.skipped == 1
|
|
assert preview.summary.errored == 1
|
|
assert preview.stage_token is not None
|
|
issue_map = {bundle.row_number: bundle for bundle in preview.row_issues}
|
|
assert 2 in issue_map and issue_map[2].state == ImportPreviewState.UPDATE
|
|
assert 4 in issue_map and issue_map[4].state == ImportPreviewState.ERROR
|
|
assert any(
|
|
"does not exist" in detail.message
|
|
for detail in issue_map[4].issues
|
|
)
|
|
# type: ignore[attr-defined]
|
|
staged = ingestion_service._scenario_stage[preview.stage_token]
|
|
assert len(staged.rows) == 2
|
|
error_row = preview.rows[2]
|
|
assert any("does not exist" in msg for msg in error_row.issues)
|
|
|
|
response_model = ScenarioImportPreviewResponse.model_validate(preview)
|
|
assert response_model.summary.errored == preview.summary.errored
|
|
|
|
|
|
def test_preview_scenarios_aggregates_parser_errors(
|
|
ingestion_service: ImportIngestionService,
|
|
unit_of_work_factory: Callable[[], UnitOfWork],
|
|
) -> None:
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None
|
|
project = Project(
|
|
name="Existing Project",
|
|
location="Chile",
|
|
operation_type=MiningOperationType.OPEN_PIT,
|
|
)
|
|
uow.projects.create(project)
|
|
|
|
csv_content = (
|
|
"project_name,name,status\n"
|
|
"Existing Project,Broken Scenario,UNKNOWN_STATUS\n"
|
|
)
|
|
stream = BytesIO(csv_content.encode("utf-8"))
|
|
|
|
preview = ingestion_service.preview_scenarios(stream, "invalid.csv")
|
|
|
|
assert preview.rows == []
|
|
assert preview.summary.total_rows == 1
|
|
assert preview.summary.errored == 1
|
|
assert preview.stage_token is None
|
|
assert len(preview.parser_errors) == 1
|
|
issue_map = {bundle.row_number: bundle for bundle in preview.row_issues}
|
|
assert 2 in issue_map
|
|
bundle = issue_map[2]
|
|
assert bundle.state == ImportPreviewState.ERROR
|
|
assert any(detail.field == "status" for detail in bundle.issues)
|
|
assert all(detail.message for detail in bundle.issues)
|
|
|
|
response_model = ScenarioImportPreviewResponse.model_validate(preview)
|
|
assert response_model.summary.total_rows == 1
|
|
|
|
|
|
def test_consume_staged_projects_removes_token(
|
|
ingestion_service: ImportIngestionService,
|
|
unit_of_work_factory: Callable[[], UnitOfWork],
|
|
) -> None:
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None
|
|
|
|
csv_content = (
|
|
"name,location,operation_type\n"
|
|
"Project X,Peru,open pit\n"
|
|
)
|
|
stream = BytesIO(csv_content.encode("utf-8"))
|
|
|
|
preview = ingestion_service.preview_projects(stream, "projects.csv")
|
|
assert preview.stage_token is not None
|
|
token = preview.stage_token
|
|
|
|
initial_view = ingestion_service.get_staged_projects(token)
|
|
assert isinstance(initial_view, StagedImportView)
|
|
consumed = ingestion_service.consume_staged_projects(token)
|
|
assert consumed == initial_view
|
|
assert ingestion_service.get_staged_projects(token) is None
|
|
assert ingestion_service.consume_staged_projects(token) is None
|
|
|
|
|
|
def test_clear_staged_scenarios_drops_entry(
|
|
ingestion_service: ImportIngestionService,
|
|
unit_of_work_factory: Callable[[], UnitOfWork],
|
|
) -> None:
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None
|
|
project = Project(
|
|
name="Project Y",
|
|
location="Chile",
|
|
operation_type=MiningOperationType.OPEN_PIT,
|
|
)
|
|
uow.projects.create(project)
|
|
|
|
csv_content = (
|
|
"project_name,name,status\n"
|
|
"Project Y,Scenario 1,Active\n"
|
|
)
|
|
stream = BytesIO(csv_content.encode("utf-8"))
|
|
|
|
preview = ingestion_service.preview_scenarios(stream, "scenarios.csv")
|
|
assert preview.stage_token is not None
|
|
token = preview.stage_token
|
|
|
|
assert ingestion_service.get_staged_scenarios(token) is not None
|
|
assert ingestion_service.clear_staged_scenarios(token) is True
|
|
assert ingestion_service.get_staged_scenarios(token) is None
|
|
assert ingestion_service.clear_staged_scenarios(token) is False
|
|
|
|
|
|
def test_commit_project_import_applies_create_and_update(
|
|
ingestion_service: ImportIngestionService,
|
|
unit_of_work_factory: Callable[[], UnitOfWork],
|
|
) -> None:
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None
|
|
existing = Project(
|
|
name="Project A",
|
|
location="Chile",
|
|
operation_type=MiningOperationType.OPEN_PIT,
|
|
)
|
|
uow.projects.create(existing)
|
|
|
|
csv_content = (
|
|
"name,location,operation_type\n"
|
|
"Project A,Peru,underground\n"
|
|
"Project B,Canada,open pit\n"
|
|
)
|
|
stream = BytesIO(csv_content.encode("utf-8"))
|
|
preview = ingestion_service.preview_projects(stream, "projects.csv")
|
|
assert preview.stage_token is not None
|
|
|
|
result = ingestion_service.commit_project_import(preview.stage_token)
|
|
assert isinstance(result, ImportCommitResult)
|
|
assert result.summary.created == 1
|
|
assert result.summary.updated == 1
|
|
assert ingestion_service.get_staged_projects(preview.stage_token) is None
|
|
|
|
commit_response = ProjectImportCommitResponse.model_validate(result)
|
|
assert commit_response.summary.updated == 1
|
|
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None
|
|
projects = uow.projects.list()
|
|
names = sorted(project.name for project in projects)
|
|
assert names == ["Project A", "Project B"]
|
|
updated_project = next(p for p in projects if p.name == "Project A")
|
|
assert updated_project.location == "Peru"
|
|
assert updated_project.operation_type == MiningOperationType.UNDERGROUND
|
|
new_project = next(p for p in projects if p.name == "Project B")
|
|
assert new_project.location == "Canada"
|
|
|
|
|
|
def test_commit_project_import_with_invalid_token_raises(
|
|
ingestion_service: ImportIngestionService,
|
|
) -> None:
|
|
with pytest.raises(ValueError):
|
|
ingestion_service.commit_project_import("missing-token")
|
|
|
|
|
|
def test_commit_scenario_import_applies_create_and_update(
|
|
ingestion_service: ImportIngestionService,
|
|
unit_of_work_factory: Callable[[], UnitOfWork],
|
|
) -> None:
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None and uow.scenarios is not None
|
|
project = Project(
|
|
name="Project X",
|
|
location="Chile",
|
|
operation_type=MiningOperationType.OPEN_PIT,
|
|
)
|
|
uow.projects.create(project)
|
|
scenario = Scenario(
|
|
project_id=project.id,
|
|
name="Existing Scenario",
|
|
status=ScenarioStatus.ACTIVE,
|
|
)
|
|
uow.scenarios.create(scenario)
|
|
|
|
csv_content = (
|
|
"project_name,name,status\n"
|
|
"Project X,Existing Scenario,Archived\n"
|
|
"Project X,New Scenario,Draft\n"
|
|
)
|
|
stream = BytesIO(csv_content.encode("utf-8"))
|
|
preview = ingestion_service.preview_scenarios(stream, "scenarios.csv")
|
|
assert preview.stage_token is not None
|
|
|
|
result = ingestion_service.commit_scenario_import(preview.stage_token)
|
|
assert result.summary.created == 1
|
|
assert result.summary.updated == 1
|
|
assert ingestion_service.get_staged_scenarios(preview.stage_token) is None
|
|
|
|
commit_response = ScenarioImportCommitResponse.model_validate(result)
|
|
assert commit_response.summary.created == 1
|
|
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None and uow.scenarios is not None
|
|
scenarios = uow.scenarios.list_for_project(uow.projects.list()[0].id)
|
|
names = sorted(scenario.name for scenario in scenarios)
|
|
assert names == ["Existing Scenario", "New Scenario"]
|
|
updated_scenario = next(
|
|
item for item in scenarios if item.name == "Existing Scenario"
|
|
)
|
|
assert updated_scenario.status == ScenarioStatus.ARCHIVED
|
|
new_scenario = next(
|
|
item for item in scenarios if item.name == "New Scenario"
|
|
)
|
|
assert new_scenario.status == ScenarioStatus.DRAFT
|
|
|
|
|
|
def test_commit_scenario_import_with_invalid_token_raises(
|
|
ingestion_service: ImportIngestionService,
|
|
) -> None:
|
|
with pytest.raises(ValueError):
|
|
ingestion_service.commit_scenario_import("missing-token")
|
|
|
|
|
|
def test_commit_project_import_rolls_back_on_failure(
|
|
ingestion_service: ImportIngestionService,
|
|
unit_of_work_factory: Callable[[], UnitOfWork],
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None
|
|
|
|
csv_content = (
|
|
"name,location,operation_type\n"
|
|
"Project Fail,Peru,open pit\n"
|
|
)
|
|
stream = BytesIO(csv_content.encode("utf-8"))
|
|
preview = ingestion_service.preview_projects(stream, "projects.csv")
|
|
assert preview.stage_token is not None
|
|
token = preview.stage_token
|
|
|
|
def _boom(self: ProjectRepository, project: Project) -> Project:
|
|
raise RuntimeError("boom")
|
|
|
|
monkeypatch.setattr(ProjectRepository, "create", _boom)
|
|
|
|
with pytest.raises(RuntimeError):
|
|
ingestion_service.commit_project_import(token)
|
|
|
|
# Token should still be present for retry.
|
|
assert ingestion_service.get_staged_projects(token) is not None
|
|
|
|
with unit_of_work_factory() as uow:
|
|
assert uow.projects is not None
|
|
assert uow.projects.count() == 0
|