Files
calminer/tests/test_authorization_integration.py

209 lines
6.7 KiB
Python

from __future__ import annotations
from contextlib import contextmanager
from uuid import uuid4
import pytest
from fastapi import Request, status
from dependencies import get_auth_session
from models import MiningOperationType, Project, User
from services.session import AuthSession, SessionTokens
@pytest.fixture()
def auth_session_context(client):
"""Allow tests to swap the current auth session for the test app."""
original_override = client.app.dependency_overrides[get_auth_session]
@contextmanager
def _use(user: User | None):
def _override(request: Request) -> AuthSession:
if user is None:
session = AuthSession.anonymous()
else:
session = AuthSession(tokens=SessionTokens(
access_token="token", refresh_token="refresh"))
session.user = user
request.state.auth_session = session
return session
client.app.dependency_overrides[get_auth_session] = _override
try:
yield
finally:
client.app.dependency_overrides[get_auth_session] = original_override
return _use
def _unique(name: str) -> str:
return f"{name}-{uuid4().hex}"
def _create_user(uow, *, roles: tuple[str, ...] = (), is_superuser: bool = False) -> User:
assert uow.users and uow.roles
uow.ensure_default_roles()
user = User(
email=f"{_unique('user')}@example.com",
username=_unique('user'),
password_hash=User.hash_password("password"),
is_active=True,
is_superuser=is_superuser,
)
uow.users.create(user)
uow.flush()
for role_name in roles:
role = uow.roles.get_by_name(role_name)
if role is None: # pragma: no cover - defensive guard
raise AssertionError(f"Role {role_name} not found")
uow.users.assign_role(user_id=user.id, role_id=role.id)
return uow.users.get(user.id, with_roles=True)
def _create_project(uow) -> Project:
assert uow.projects
project = Project(
name=_unique('project'),
location="Integration Site",
operation_type=MiningOperationType.OPEN_PIT,
)
uow.projects.create(project)
uow.flush()
return project
class TestAuthenticationRequirements:
def test_api_projects_list_requires_login(self, client, auth_session_context):
with auth_session_context(None):
response = client.get("/projects")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert response.json()["detail"] == "Authentication required."
def test_ui_project_list_requires_login(self, client, auth_session_context):
with auth_session_context(None):
response = client.get("/projects/ui")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
class TestRoleRestrictions:
def test_api_projects_create_forbidden_for_viewer(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
viewer = _create_user(uow, roles=("viewer",))
payload = {
"name": _unique("project"),
"location": "Restricted",
"operation_type": MiningOperationType.OPEN_PIT.value,
"description": "Test restriction",
}
with auth_session_context(viewer):
response = client.post("/projects", json=payload)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()[
"detail"] == "Insufficient permissions for this action."
def test_api_projects_create_allows_project_manager(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
manager = _create_user(uow, roles=("project_manager",))
payload = {
"name": _unique("managed-project"),
"location": "Permitted",
"operation_type": MiningOperationType.OPEN_PIT.value,
"description": "Authorized creation",
}
with auth_session_context(manager):
response = client.post("/projects", json=payload)
assert response.status_code == status.HTTP_201_CREATED
assert response.json()["name"] == payload["name"]
def test_api_projects_update_forbidden_for_viewer(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow)
viewer = _create_user(uow, roles=("viewer",))
with auth_session_context(viewer):
response = client.put(
f"/projects/{project.id}",
json={"description": "Updated"},
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()[
"detail"] == "Insufficient role permissions for this action."
def test_api_projects_update_allows_manager(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow)
manager = _create_user(uow, roles=("project_manager",))
updated_description = "Manager updated description"
with auth_session_context(manager):
response = client.put(
f"/projects/{project.id}",
json={"description": updated_description},
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["description"] == updated_description
def test_ui_project_edit_forbidden_for_viewer(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow)
viewer = _create_user(uow, roles=("viewer",))
with auth_session_context(viewer):
response = client.get(f"/projects/{project.id}/edit")
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.json()[
"detail"] == "Insufficient role permissions for this action."
def test_ui_project_edit_accessible_to_manager(
self,
client,
auth_session_context,
unit_of_work_factory,
) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow)
manager = _create_user(uow, roles=("project_manager",))
with auth_session_context(manager):
response = client.get(f"/projects/{project.id}/edit")
assert response.status_code == status.HTTP_200_OK
assert response.template.name == "projects/form.html"