feat: add tests for authorization guards and role-based access control

This commit is contained in:
2025-11-09 23:27:10 +01:00
parent 0f79864188
commit 118657491c
3 changed files with 477 additions and 0 deletions

View File

@@ -25,3 +25,5 @@
## 2025-11-10
- Extended authorization helper layer with project/scenario ownership lookups, integrated them into FastAPI dependencies, refreshed pytest fixtures to keep the suite authenticated, and documented the new patterns across RBAC plan and security guides.
- Added dedicated pytest coverage for guard dependencies, exercising success plus failure paths (missing session, inactive user, missing roles, project/scenario access errors) via `tests/test_dependencies_guards.py`.
- Added integration tests in `tests/test_authorization_integration.py` verifying anonymous 401 responses, role-based 403s, and authorized project manager flows across API and UI endpoints.

View File

@@ -0,0 +1,208 @@
from __future__ import annotations
from contextlib import contextmanager
from uuid import uuid4
import pytest
from fastapi import Request, status
from dependencies import get_auth_session
from models import MiningOperationType, Project, User
from services.session import AuthSession, SessionTokens
@pytest.fixture()
def auth_session_context(client):
"""Allow tests to swap the current auth session for the test app."""
original_override = client.app.dependency_overrides[get_auth_session]
@contextmanager
def _use(user: User | None):
def _override(request: Request) -> AuthSession:
if user is None:
session = AuthSession.anonymous()
else:
session = AuthSession(tokens=SessionTokens(
access_token="token", refresh_token="refresh"))
session.user = user
request.state.auth_session = session
return session
client.app.dependency_overrides[get_auth_session] = _override
try:
yield
finally:
client.app.dependency_overrides[get_auth_session] = original_override
return _use
def _unique(name: str) -> str:
return f"{name}-{uuid4().hex}"
def _create_user(uow, *, roles: tuple[str, ...] = (), is_superuser: bool = False) -> User:
assert uow.users and uow.roles
uow.ensure_default_roles()
user = User(
email=f"{_unique('user')}@example.com",
username=_unique('user'),
password_hash=User.hash_password("password"),
is_active=True,
is_superuser=is_superuser,
)
uow.users.create(user)
uow.flush()
for role_name in roles:
role = uow.roles.get_by_name(role_name)
if role is None: # pragma: no cover - defensive guard
raise AssertionError(f"Role {role_name} not found")
uow.users.assign_role(user_id=user.id, role_id=role.id)
return uow.users.get(user.id, with_roles=True)
def _create_project(uow) -> Project:
assert uow.projects
project = Project(
name=_unique('project'),
location="Integration Site",
operation_type=MiningOperationType.OPEN_PIT,
)
uow.projects.create(project)
uow.flush()
return project
class TestAuthenticationRequirements:
def test_api_projects_list_requires_login(self, client, auth_session_context):
with auth_session_context(None):
response = client.get("/projects")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert response.json()["detail"] == "Authentication required."
def test_ui_project_list_requires_login(self, client, auth_session_context):
with auth_session_context(None):
response = client.get("/projects/ui")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
class TestRoleRestrictions:
def test_api_projects_create_forbidden_for_viewer(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
viewer = _create_user(uow, roles=("viewer",))
payload = {
"name": _unique("project"),
"location": "Restricted",
"operation_type": MiningOperationType.OPEN_PIT.value,
"description": "Test restriction",
}
with auth_session_context(viewer):
response = client.post("/projects", json=payload)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()[
"detail"] == "Insufficient permissions for this action."
def test_api_projects_create_allows_project_manager(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
manager = _create_user(uow, roles=("project_manager",))
payload = {
"name": _unique("managed-project"),
"location": "Permitted",
"operation_type": MiningOperationType.OPEN_PIT.value,
"description": "Authorized creation",
}
with auth_session_context(manager):
response = client.post("/projects", json=payload)
assert response.status_code == status.HTTP_201_CREATED
assert response.json()["name"] == payload["name"]
def test_api_projects_update_forbidden_for_viewer(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow)
viewer = _create_user(uow, roles=("viewer",))
with auth_session_context(viewer):
response = client.put(
f"/projects/{project.id}",
json={"description": "Updated"},
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()[
"detail"] == "Insufficient role permissions for this action."
def test_api_projects_update_allows_manager(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow)
manager = _create_user(uow, roles=("project_manager",))
updated_description = "Manager updated description"
with auth_session_context(manager):
response = client.put(
f"/projects/{project.id}",
json={"description": updated_description},
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["description"] == updated_description
def test_ui_project_edit_forbidden_for_viewer(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow)
viewer = _create_user(uow, roles=("viewer",))
with auth_session_context(viewer):
response = client.get(f"/projects/{project.id}/edit")
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()[
"detail"] == "Insufficient role permissions for this action."
def test_ui_project_edit_accessible_to_manager(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow)
manager = _create_user(uow, roles=("project_manager",))
with auth_session_context(manager):
response = client.get(f"/projects/{project.id}/edit")
assert response.status_code == status.HTTP_200_OK
assert response.template.name == "projects/form.html"

View File

@@ -0,0 +1,267 @@
from __future__ import annotations
from uuid import uuid4
import pytest
from fastapi import HTTPException, status
from dependencies import (
require_any_role,
require_authenticated_user,
require_current_user,
require_project_resource,
require_project_scenario_resource,
require_roles,
require_scenario_resource,
)
from models import Project, Scenario, User
from services.session import AuthSession, SessionTokens
@pytest.fixture()
def uow(unit_of_work_factory):
with unit_of_work_factory() as uow:
assert uow.users and uow.roles and uow.projects and uow.scenarios
uow.ensure_default_roles()
yield uow
def _unique(prefix: str) -> str:
return f"{prefix}-{uuid4().hex}"
def _create_user(
uow,
*,
roles: tuple[str, ...] = (),
is_active: bool = True,
is_superuser: bool = False,
) -> User:
user = User(
email=f"{_unique('user')}@example.com",
username=_unique('user'),
password_hash=User.hash_password("password"),
is_active=is_active,
is_superuser=is_superuser,
)
assert uow.users and uow.roles
uow.users.create(user)
uow.flush()
for role_name in roles:
role = uow.roles.get_by_name(role_name)
if role is None: # pragma: no cover - defensive for missing roles
raise AssertionError(f"Role {role_name} expected in test database")
uow.users.assign_role(user_id=user.id, role_id=role.id)
return uow.users.get(user.id, with_roles=True)
def _create_project(uow) -> Project:
assert uow.projects
project = Project(name=_unique('project'), location="Test Site")
uow.projects.create(project)
uow.flush()
return project
def _create_scenario(uow, project: Project) -> Scenario:
assert uow.scenarios
scenario = Scenario(project_id=project.id, name=_unique('scenario'))
uow.scenarios.create(scenario)
uow.flush()
return scenario
def test_require_current_user_returns_authenticated_user():
user = User(
email="user@example.com",
username="user",
password_hash=User.hash_password("password"),
is_active=True,
)
session = AuthSession(tokens=SessionTokens(
access_token="token", refresh_token=None))
session.user = user
result = require_current_user(session=session)
assert result is user
def test_require_current_user_raises_when_session_missing():
anonymous_session = AuthSession.anonymous()
with pytest.raises(HTTPException) as exc:
require_current_user(session=anonymous_session)
assert exc.value.status_code == status.HTTP_401_UNAUTHORIZED
assert exc.value.detail == "Authentication required."
def test_require_authenticated_user_blocks_inactive_users():
user = User(
email="user@example.com",
username="user",
password_hash=User.hash_password("password"),
is_active=False,
)
with pytest.raises(HTTPException) as exc:
require_authenticated_user(user=user)
assert exc.value.status_code == status.HTTP_403_FORBIDDEN
assert exc.value.detail == "User account is disabled."
def test_require_roles_accepts_user_with_role(uow):
user = _create_user(uow, roles=("viewer",))
dependency = require_roles("viewer")
result = dependency(user=user)
assert result is user
def test_require_roles_allows_superuser_without_matching_role(uow):
user = _create_user(uow, roles=(), is_superuser=True)
dependency = require_roles("project_manager")
result = dependency(user=user)
assert result is user
def test_require_roles_rejects_user_missing_required_role(uow):
user = _create_user(uow, roles=("viewer",))
dependency = require_any_role("project_manager", "admin")
with pytest.raises(HTTPException) as exc:
dependency(user=user)
assert exc.value.status_code == status.HTTP_403_FORBIDDEN
assert exc.value.detail == "Insufficient permissions for this action."
def test_require_roles_raises_value_error_when_no_roles_provided():
with pytest.raises(ValueError) as exc:
require_roles()
assert str(exc.value) == "require_roles requires at least one role name"
def test_require_project_resource_returns_project_for_authorized_user(uow):
user = _create_user(uow, roles=("viewer",))
project = _create_project(uow)
dependency = require_project_resource()
result = dependency(project.id, user=user, uow=uow)
assert result.id == project.id
def test_require_project_resource_enforces_manage_requirement(uow):
user = _create_user(uow, roles=("viewer",))
project = _create_project(uow)
dependency = require_project_resource(require_manage=True)
with pytest.raises(HTTPException) as exc:
dependency(project.id, user=user, uow=uow)
assert exc.value.status_code == status.HTTP_403_FORBIDDEN
assert exc.value.detail == "Insufficient role permissions for this action."
def test_require_project_resource_raises_not_found_for_missing_project(uow):
user = _create_user(uow, roles=("viewer",))
dependency = require_project_resource()
with pytest.raises(HTTPException) as exc:
dependency(9999, user=user, uow=uow)
assert exc.value.status_code == status.HTTP_404_NOT_FOUND
assert exc.value.detail == "Project 9999 not found"
def test_require_project_resource_rejects_inactive_user(uow):
user = _create_user(uow, roles=("viewer",), is_active=False)
project = _create_project(uow)
dependency = require_project_resource()
with pytest.raises(HTTPException) as exc:
dependency(project.id, user=user, uow=uow)
assert exc.value.status_code == status.HTTP_403_FORBIDDEN
assert exc.value.detail == "User account is disabled."
def test_require_scenario_resource_returns_scenario_for_authorized_user(uow):
user = _create_user(uow, roles=("viewer",))
project = _create_project(uow)
scenario = _create_scenario(uow, project)
dependency = require_scenario_resource()
result = dependency(scenario.id, user=user, uow=uow)
assert result.id == scenario.id
def test_require_scenario_resource_requires_manage_role(uow):
user = _create_user(uow, roles=("viewer",))
project = _create_project(uow)
scenario = _create_scenario(uow, project)
dependency = require_scenario_resource(require_manage=True)
with pytest.raises(HTTPException) as exc:
dependency(scenario.id, user=user, uow=uow)
assert exc.value.status_code == status.HTTP_403_FORBIDDEN
assert exc.value.detail == "Insufficient role permissions for this action."
def test_require_scenario_resource_raises_not_found(uow):
user = _create_user(uow, roles=("viewer",))
dependency = require_scenario_resource()
with pytest.raises(HTTPException) as exc:
dependency(12345, user=user, uow=uow)
assert exc.value.status_code == status.HTTP_404_NOT_FOUND
assert exc.value.detail == "Scenario 12345 not found"
def test_require_project_scenario_resource_returns_scenario_when_linked(uow):
user = _create_user(uow, roles=("viewer",))
project = _create_project(uow)
scenario = _create_scenario(uow, project)
dependency = require_project_scenario_resource()
result = dependency(project.id, scenario.id, user=user, uow=uow)
assert result.id == scenario.id
def test_require_project_scenario_resource_raises_when_scenario_not_in_project(uow):
user = _create_user(uow, roles=("viewer",))
project = _create_project(uow)
other_project = _create_project(uow)
scenario = _create_scenario(uow, other_project)
dependency = require_project_scenario_resource()
with pytest.raises(HTTPException) as exc:
dependency(project.id, scenario.id, user=user, uow=uow)
assert exc.value.status_code == status.HTTP_404_NOT_FOUND
assert "does not belong" in exc.value.detail
def test_require_project_scenario_resource_requires_manage_role(uow):
user = _create_user(uow, roles=("viewer",))
project = _create_project(uow)
scenario = _create_scenario(uow, project)
dependency = require_project_scenario_resource(require_manage=True)
with pytest.raises(HTTPException) as exc:
dependency(project.id, scenario.id, user=user, uow=uow)
assert exc.value.status_code == status.HTTP_403_FORBIDDEN
assert exc.value.detail == "Insufficient role permissions for this action."