diff --git a/changelog.md b/changelog.md index ed4c7fc..bdb8c93 100644 --- a/changelog.md +++ b/changelog.md @@ -25,3 +25,5 @@ ## 2025-11-10 - Extended authorization helper layer with project/scenario ownership lookups, integrated them into FastAPI dependencies, refreshed pytest fixtures to keep the suite authenticated, and documented the new patterns across RBAC plan and security guides. +- Added dedicated pytest coverage for guard dependencies, exercising success plus failure paths (missing session, inactive user, missing roles, project/scenario access errors) via `tests/test_dependencies_guards.py`. +- Added integration tests in `tests/test_authorization_integration.py` verifying anonymous 401 responses, role-based 403s, and authorized project manager flows across API and UI endpoints. diff --git a/tests/test_authorization_integration.py b/tests/test_authorization_integration.py new file mode 100644 index 0000000..825b414 --- /dev/null +++ b/tests/test_authorization_integration.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +from contextlib import contextmanager +from uuid import uuid4 + +import pytest +from fastapi import Request, status + +from dependencies import get_auth_session +from models import MiningOperationType, Project, User +from services.session import AuthSession, SessionTokens + + +@pytest.fixture() +def auth_session_context(client): + """Allow tests to swap the current auth session for the test app.""" + + original_override = client.app.dependency_overrides[get_auth_session] + + @contextmanager + def _use(user: User | None): + def _override(request: Request) -> AuthSession: + if user is None: + session = AuthSession.anonymous() + else: + session = AuthSession(tokens=SessionTokens( + access_token="token", refresh_token="refresh")) + session.user = user + request.state.auth_session = session + return session + + client.app.dependency_overrides[get_auth_session] = _override + try: + yield + finally: + client.app.dependency_overrides[get_auth_session] = original_override + + return _use + + +def _unique(name: str) -> str: + return f"{name}-{uuid4().hex}" + + +def _create_user(uow, *, roles: tuple[str, ...] = (), is_superuser: bool = False) -> User: + assert uow.users and uow.roles + uow.ensure_default_roles() + user = User( + email=f"{_unique('user')}@example.com", + username=_unique('user'), + password_hash=User.hash_password("password"), + is_active=True, + is_superuser=is_superuser, + ) + uow.users.create(user) + uow.flush() + + for role_name in roles: + role = uow.roles.get_by_name(role_name) + if role is None: # pragma: no cover - defensive guard + raise AssertionError(f"Role {role_name} not found") + uow.users.assign_role(user_id=user.id, role_id=role.id) + return uow.users.get(user.id, with_roles=True) + + +def _create_project(uow) -> Project: + assert uow.projects + project = Project( + name=_unique('project'), + location="Integration Site", + operation_type=MiningOperationType.OPEN_PIT, + ) + uow.projects.create(project) + uow.flush() + return project + + +class TestAuthenticationRequirements: + def test_api_projects_list_requires_login(self, client, auth_session_context): + with auth_session_context(None): + response = client.get("/projects") + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + assert response.json()["detail"] == "Authentication required." + + def test_ui_project_list_requires_login(self, client, auth_session_context): + with auth_session_context(None): + response = client.get("/projects/ui") + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +class TestRoleRestrictions: + def test_api_projects_create_forbidden_for_viewer( + self, + client, + auth_session_context, + unit_of_work_factory, + ) -> None: + with unit_of_work_factory() as uow: + viewer = _create_user(uow, roles=("viewer",)) + + payload = { + "name": _unique("project"), + "location": "Restricted", + "operation_type": MiningOperationType.OPEN_PIT.value, + "description": "Test restriction", + } + with auth_session_context(viewer): + response = client.post("/projects", json=payload) + + assert response.status_code == status.HTTP_403_FORBIDDEN + assert response.json()[ + "detail"] == "Insufficient permissions for this action." + + def test_api_projects_create_allows_project_manager( + self, + client, + auth_session_context, + unit_of_work_factory, + ) -> None: + with unit_of_work_factory() as uow: + manager = _create_user(uow, roles=("project_manager",)) + + payload = { + "name": _unique("managed-project"), + "location": "Permitted", + "operation_type": MiningOperationType.OPEN_PIT.value, + "description": "Authorized creation", + } + with auth_session_context(manager): + response = client.post("/projects", json=payload) + + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["name"] == payload["name"] + + def test_api_projects_update_forbidden_for_viewer( + self, + client, + auth_session_context, + unit_of_work_factory, + ) -> None: + with unit_of_work_factory() as uow: + project = _create_project(uow) + viewer = _create_user(uow, roles=("viewer",)) + + with auth_session_context(viewer): + response = client.put( + f"/projects/{project.id}", + json={"description": "Updated"}, + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + assert response.json()[ + "detail"] == "Insufficient role permissions for this action." + + def test_api_projects_update_allows_manager( + self, + client, + auth_session_context, + unit_of_work_factory, + ) -> None: + with unit_of_work_factory() as uow: + project = _create_project(uow) + manager = _create_user(uow, roles=("project_manager",)) + + updated_description = "Manager updated description" + with auth_session_context(manager): + response = client.put( + f"/projects/{project.id}", + json={"description": updated_description}, + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["description"] == updated_description + + def test_ui_project_edit_forbidden_for_viewer( + self, + client, + auth_session_context, + unit_of_work_factory, + ) -> None: + with unit_of_work_factory() as uow: + project = _create_project(uow) + viewer = _create_user(uow, roles=("viewer",)) + + with auth_session_context(viewer): + response = client.get(f"/projects/{project.id}/edit") + + assert response.status_code == status.HTTP_403_FORBIDDEN + assert response.json()[ + "detail"] == "Insufficient role permissions for this action." + + def test_ui_project_edit_accessible_to_manager( + self, + client, + auth_session_context, + unit_of_work_factory, + ) -> None: + with unit_of_work_factory() as uow: + project = _create_project(uow) + manager = _create_user(uow, roles=("project_manager",)) + + with auth_session_context(manager): + response = client.get(f"/projects/{project.id}/edit") + + assert response.status_code == status.HTTP_200_OK + assert response.template.name == "projects/form.html" diff --git a/tests/test_dependencies_guards.py b/tests/test_dependencies_guards.py new file mode 100644 index 0000000..40439f8 --- /dev/null +++ b/tests/test_dependencies_guards.py @@ -0,0 +1,267 @@ +from __future__ import annotations + +from uuid import uuid4 + +import pytest +from fastapi import HTTPException, status + +from dependencies import ( + require_any_role, + require_authenticated_user, + require_current_user, + require_project_resource, + require_project_scenario_resource, + require_roles, + require_scenario_resource, +) +from models import Project, Scenario, User +from services.session import AuthSession, SessionTokens + + +@pytest.fixture() +def uow(unit_of_work_factory): + with unit_of_work_factory() as uow: + assert uow.users and uow.roles and uow.projects and uow.scenarios + uow.ensure_default_roles() + yield uow + + +def _unique(prefix: str) -> str: + return f"{prefix}-{uuid4().hex}" + + +def _create_user( + uow, + *, + roles: tuple[str, ...] = (), + is_active: bool = True, + is_superuser: bool = False, +) -> User: + user = User( + email=f"{_unique('user')}@example.com", + username=_unique('user'), + password_hash=User.hash_password("password"), + is_active=is_active, + is_superuser=is_superuser, + ) + assert uow.users and uow.roles + uow.users.create(user) + uow.flush() + + for role_name in roles: + role = uow.roles.get_by_name(role_name) + if role is None: # pragma: no cover - defensive for missing roles + raise AssertionError(f"Role {role_name} expected in test database") + uow.users.assign_role(user_id=user.id, role_id=role.id) + return uow.users.get(user.id, with_roles=True) + + +def _create_project(uow) -> Project: + assert uow.projects + project = Project(name=_unique('project'), location="Test Site") + uow.projects.create(project) + uow.flush() + return project + + +def _create_scenario(uow, project: Project) -> Scenario: + assert uow.scenarios + scenario = Scenario(project_id=project.id, name=_unique('scenario')) + uow.scenarios.create(scenario) + uow.flush() + return scenario + + +def test_require_current_user_returns_authenticated_user(): + user = User( + email="user@example.com", + username="user", + password_hash=User.hash_password("password"), + is_active=True, + ) + session = AuthSession(tokens=SessionTokens( + access_token="token", refresh_token=None)) + session.user = user + + result = require_current_user(session=session) + + assert result is user + + +def test_require_current_user_raises_when_session_missing(): + anonymous_session = AuthSession.anonymous() + + with pytest.raises(HTTPException) as exc: + require_current_user(session=anonymous_session) + + assert exc.value.status_code == status.HTTP_401_UNAUTHORIZED + assert exc.value.detail == "Authentication required." + + +def test_require_authenticated_user_blocks_inactive_users(): + user = User( + email="user@example.com", + username="user", + password_hash=User.hash_password("password"), + is_active=False, + ) + + with pytest.raises(HTTPException) as exc: + require_authenticated_user(user=user) + + assert exc.value.status_code == status.HTTP_403_FORBIDDEN + assert exc.value.detail == "User account is disabled." + + +def test_require_roles_accepts_user_with_role(uow): + user = _create_user(uow, roles=("viewer",)) + + dependency = require_roles("viewer") + result = dependency(user=user) + + assert result is user + + +def test_require_roles_allows_superuser_without_matching_role(uow): + user = _create_user(uow, roles=(), is_superuser=True) + + dependency = require_roles("project_manager") + result = dependency(user=user) + + assert result is user + + +def test_require_roles_rejects_user_missing_required_role(uow): + user = _create_user(uow, roles=("viewer",)) + + dependency = require_any_role("project_manager", "admin") + with pytest.raises(HTTPException) as exc: + dependency(user=user) + + assert exc.value.status_code == status.HTTP_403_FORBIDDEN + assert exc.value.detail == "Insufficient permissions for this action." + + +def test_require_roles_raises_value_error_when_no_roles_provided(): + with pytest.raises(ValueError) as exc: + require_roles() + + assert str(exc.value) == "require_roles requires at least one role name" + + +def test_require_project_resource_returns_project_for_authorized_user(uow): + user = _create_user(uow, roles=("viewer",)) + project = _create_project(uow) + + dependency = require_project_resource() + result = dependency(project.id, user=user, uow=uow) + + assert result.id == project.id + + +def test_require_project_resource_enforces_manage_requirement(uow): + user = _create_user(uow, roles=("viewer",)) + project = _create_project(uow) + + dependency = require_project_resource(require_manage=True) + with pytest.raises(HTTPException) as exc: + dependency(project.id, user=user, uow=uow) + + assert exc.value.status_code == status.HTTP_403_FORBIDDEN + assert exc.value.detail == "Insufficient role permissions for this action." + + +def test_require_project_resource_raises_not_found_for_missing_project(uow): + user = _create_user(uow, roles=("viewer",)) + + dependency = require_project_resource() + with pytest.raises(HTTPException) as exc: + dependency(9999, user=user, uow=uow) + + assert exc.value.status_code == status.HTTP_404_NOT_FOUND + assert exc.value.detail == "Project 9999 not found" + + +def test_require_project_resource_rejects_inactive_user(uow): + user = _create_user(uow, roles=("viewer",), is_active=False) + project = _create_project(uow) + + dependency = require_project_resource() + with pytest.raises(HTTPException) as exc: + dependency(project.id, user=user, uow=uow) + + assert exc.value.status_code == status.HTTP_403_FORBIDDEN + assert exc.value.detail == "User account is disabled." + + +def test_require_scenario_resource_returns_scenario_for_authorized_user(uow): + user = _create_user(uow, roles=("viewer",)) + project = _create_project(uow) + scenario = _create_scenario(uow, project) + + dependency = require_scenario_resource() + result = dependency(scenario.id, user=user, uow=uow) + + assert result.id == scenario.id + + +def test_require_scenario_resource_requires_manage_role(uow): + user = _create_user(uow, roles=("viewer",)) + project = _create_project(uow) + scenario = _create_scenario(uow, project) + + dependency = require_scenario_resource(require_manage=True) + with pytest.raises(HTTPException) as exc: + dependency(scenario.id, user=user, uow=uow) + + assert exc.value.status_code == status.HTTP_403_FORBIDDEN + assert exc.value.detail == "Insufficient role permissions for this action." + + +def test_require_scenario_resource_raises_not_found(uow): + user = _create_user(uow, roles=("viewer",)) + + dependency = require_scenario_resource() + with pytest.raises(HTTPException) as exc: + dependency(12345, user=user, uow=uow) + + assert exc.value.status_code == status.HTTP_404_NOT_FOUND + assert exc.value.detail == "Scenario 12345 not found" + + +def test_require_project_scenario_resource_returns_scenario_when_linked(uow): + user = _create_user(uow, roles=("viewer",)) + project = _create_project(uow) + scenario = _create_scenario(uow, project) + + dependency = require_project_scenario_resource() + result = dependency(project.id, scenario.id, user=user, uow=uow) + + assert result.id == scenario.id + + +def test_require_project_scenario_resource_raises_when_scenario_not_in_project(uow): + user = _create_user(uow, roles=("viewer",)) + project = _create_project(uow) + other_project = _create_project(uow) + scenario = _create_scenario(uow, other_project) + + dependency = require_project_scenario_resource() + with pytest.raises(HTTPException) as exc: + dependency(project.id, scenario.id, user=user, uow=uow) + + assert exc.value.status_code == status.HTTP_404_NOT_FOUND + assert "does not belong" in exc.value.detail + + +def test_require_project_scenario_resource_requires_manage_role(uow): + user = _create_user(uow, roles=("viewer",)) + project = _create_project(uow) + scenario = _create_scenario(uow, project) + + dependency = require_project_scenario_resource(require_manage=True) + with pytest.raises(HTTPException) as exc: + dependency(project.id, scenario.id, user=user, uow=uow) + + assert exc.value.status_code == status.HTTP_403_FORBIDDEN + assert exc.value.detail == "Insufficient role permissions for this action."