feat: enhance project and scenario management with role-based access control

- Implemented role-based access control for project and scenario routes.
- Added authorization checks to ensure users have appropriate roles for viewing and managing projects and scenarios.
- Introduced utility functions for ensuring project and scenario access based on user roles.
- Refactored project and scenario routes to utilize new authorization helpers.
- Created initial data seeding script to set up default roles and an admin user.
- Added tests for authorization helpers and initial data seeding functionality.
- Updated exception handling to include authorization errors.
This commit is contained in:
2025-11-09 23:14:54 +01:00
parent 27262bdfa3
commit 0f79864188
16 changed files with 997 additions and 132 deletions

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
from collections.abc import Callable, Iterator
import pytest
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
@@ -11,12 +11,14 @@ from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from config.database import Base
from dependencies import get_unit_of_work
from dependencies import get_auth_session, get_unit_of_work
from models import User
from routes.auth import router as auth_router
from routes.dashboard import router as dashboard_router
from routes.projects import router as projects_router
from routes.scenarios import router as scenarios_router
from services.unit_of_work import UnitOfWork
from services.session import AuthSession, SessionTokens
@pytest.fixture()
@@ -55,6 +57,28 @@ def app(session_factory: sessionmaker) -> FastAPI:
yield uow
application.dependency_overrides[get_unit_of_work] = _override_uow
with UnitOfWork(session_factory=session_factory) as uow:
assert uow.users is not None
uow.ensure_default_roles()
user = User(
email="test-superuser@example.com",
username="test-superuser",
password_hash=User.hash_password("test-password"),
is_active=True,
is_superuser=True,
)
uow.users.create(user)
user = uow.users.get(user.id, with_roles=True)
def _override_auth_session(request: Request) -> AuthSession:
session = AuthSession(tokens=SessionTokens(
access_token="test", refresh_token="test"))
session.user = user
request.state.auth_session = session
return session
application.dependency_overrides[get_auth_session] = _override_auth_session
return application

View File

@@ -0,0 +1,156 @@
from __future__ import annotations
import logging
from collections.abc import Callable
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from config.database import Base
from scripts import initial_data
from scripts.initial_data import AdminSeedResult, RoleSeedResult, SeedConfig
from services.repositories import DEFAULT_ROLE_DEFINITIONS
from services.unit_of_work import UnitOfWork
@pytest.fixture()
def in_memory_session_factory() -> Callable[[], Session]:
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
Base.metadata.create_all(engine)
factory = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)
def _session_factory() -> Session:
return factory()
return _session_factory
@pytest.fixture()
def uow(in_memory_session_factory: Callable[[], Session]) -> UnitOfWork:
return UnitOfWork(session_factory=in_memory_session_factory)
def test_ensure_default_roles_idempotent(uow: UnitOfWork) -> None:
with uow as working:
assert working.roles is not None
result_first = initial_data.ensure_default_roles(working.roles, DEFAULT_ROLE_DEFINITIONS)
assert result_first == RoleSeedResult(created=4, updated=0, total=4)
with uow as working:
assert working.roles is not None
result_second = initial_data.ensure_default_roles(working.roles, DEFAULT_ROLE_DEFINITIONS)
assert result_second == RoleSeedResult(created=0, updated=0, total=4)
def test_ensure_admin_user_creates_and_assigns_roles(uow: UnitOfWork) -> None:
config = SeedConfig(
admin_email="admin@example.com",
admin_username="admin",
admin_password="secret",
admin_roles=("admin", "viewer"),
force_reset=False,
)
with uow as working:
assert working.roles is not None
assert working.users is not None
initial_data.ensure_default_roles(working.roles, DEFAULT_ROLE_DEFINITIONS)
result = initial_data.ensure_admin_user(working.users, working.roles, config)
assert result == AdminSeedResult(
created_user=True,
updated_user=False,
password_rotated=False,
roles_granted=2,
)
with uow as working:
assert working.roles is not None
assert working.users is not None
result_again = initial_data.ensure_admin_user(working.users, working.roles, config)
assert result_again == AdminSeedResult(
created_user=False,
updated_user=False,
password_rotated=False,
roles_granted=0,
)
with uow as working:
assert working.users is not None
user = working.users.get_by_email("admin@example.com", with_roles=True)
assert user is not None
assert user.is_active is True
assert user.is_superuser is True
role_names = {role.name for role in user.roles}
assert role_names == {"admin", "viewer"}
def test_ensure_admin_user_force_reset_rotates_password(uow: UnitOfWork) -> None:
base_config = SeedConfig(
admin_email="admin@example.com",
admin_username="admin",
admin_password="first",
admin_roles=("admin",),
force_reset=False,
)
with uow as working:
assert working.roles is not None
assert working.users is not None
initial_data.ensure_default_roles(working.roles, DEFAULT_ROLE_DEFINITIONS)
initial_data.ensure_admin_user(working.users, working.roles, base_config)
rotate_config = SeedConfig(
admin_email="admin@example.com",
admin_username="admin",
admin_password="second",
admin_roles=("admin",),
force_reset=True,
)
with uow as working:
assert working.users is not None
user_before = working.users.get_by_email("admin@example.com")
assert user_before is not None
old_hash = user_before.password_hash
with uow as working:
assert working.roles is not None
assert working.users is not None
result = initial_data.ensure_admin_user(working.users, working.roles, rotate_config)
assert result.password_rotated is True
with uow as working:
assert working.users is not None
user_after = working.users.get_by_email("admin@example.com")
assert user_after is not None
assert user_after.password_hash != old_hash
def test_seed_initial_data_logs_results(
caplog,
in_memory_session_factory: Callable[[], Session],
) -> None:
caplog.set_level(logging.INFO)
config = SeedConfig(
admin_email="seed@example.com",
admin_username="seed",
admin_password="seed-pass",
admin_roles=("admin",),
force_reset=False,
)
initial_data.seed_initial_data(
config,
unit_of_work_factory=lambda: UnitOfWork(session_factory=in_memory_session_factory),
)
assert "Starting initial data seeding" in caplog.text
assert "Initial data seeding completed successfully" in caplog.text
with UnitOfWork(session_factory=in_memory_session_factory) as check_uow:
assert check_uow.users is not None
assert check_uow.roles is not None
user = check_uow.users.get_by_email("seed@example.com")
assert user is not None
assert check_uow.roles.get_by_name("admin") is not None

View File

@@ -0,0 +1,165 @@
from __future__ import annotations
import pytest
from models import MiningOperationType, Project, Scenario, ScenarioStatus, User
from services.authorization import (
ensure_project_access,
ensure_scenario_access,
ensure_scenario_in_project,
)
from services.exceptions import AuthorizationError, EntityNotFoundError
from services.unit_of_work import UnitOfWork
def _create_user_with_roles(
uow: UnitOfWork,
*,
email: str,
username: str,
roles: set[str],
) -> User:
assert uow.users is not None
assert uow.roles is not None
user = User(
email=email,
username=username,
password_hash=User.hash_password("secret"),
is_active=True,
)
uow.users.create(user)
uow.ensure_default_roles()
for role_name in roles:
role = uow.roles.get_by_name(role_name)
assert role is not None, f"Role {role_name} should exist"
uow.users.assign_role(user_id=user.id, role_id=role.id)
return uow.users.get(user.id, with_roles=True)
def _create_project(uow: UnitOfWork, name: str) -> Project:
assert uow.projects is not None
project = Project(
name=name,
location=None,
operation_type=MiningOperationType.OTHER,
description=None,
)
uow.projects.create(project)
return project
def _create_scenario(uow: UnitOfWork, project: Project, name: str) -> Scenario:
assert uow.scenarios is not None
scenario = Scenario(
project_id=project.id,
name=name,
description=None,
status=ScenarioStatus.DRAFT,
)
uow.scenarios.create(scenario)
return scenario
def test_ensure_project_access_allows_view_roles(unit_of_work_factory) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow, "Project A")
user = _create_user_with_roles(
uow,
email="viewer@example.com",
username="viewer",
roles={"viewer"},
)
resolved = ensure_project_access(
uow,
project_id=project.id,
user=user,
)
assert resolved.id == project.id
with pytest.raises(AuthorizationError):
ensure_project_access(
uow,
project_id=project.id,
user=user,
require_manage=True,
)
def test_ensure_project_access_allows_manage_roles(unit_of_work_factory) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow, "Project B")
user = _create_user_with_roles(
uow,
email="manager@example.com",
username="manager",
roles={"project_manager"},
)
resolved = ensure_project_access(
uow,
project_id=project.id,
user=user,
require_manage=True,
)
assert resolved.id == project.id
def test_ensure_scenario_access(unit_of_work_factory) -> None:
with unit_of_work_factory() as uow:
project = _create_project(uow, "Project C")
scenario = _create_scenario(uow, project, "Scenario C1")
user = _create_user_with_roles(
uow,
email="analyst@example.com",
username="analyst",
roles={"analyst"},
)
resolved = ensure_scenario_access(
uow,
scenario_id=scenario.id,
user=user,
)
assert resolved.id == scenario.id
with pytest.raises(AuthorizationError):
ensure_scenario_access(
uow,
scenario_id=scenario.id,
user=user,
require_manage=True,
)
def test_ensure_scenario_in_project_validates_membership(unit_of_work_factory) -> None:
with unit_of_work_factory() as uow:
project_one = _create_project(uow, "Project D")
project_two = _create_project(uow, "Project E")
scenario = _create_scenario(uow, project_one, "Scenario D1")
user = _create_user_with_roles(
uow,
email="manager2@example.com",
username="manager2",
roles={"project_manager"},
)
resolved = ensure_scenario_in_project(
uow,
project_id=project_one.id,
scenario_id=scenario.id,
user=user,
require_manage=True,
)
assert resolved.id == scenario.id
with pytest.raises(EntityNotFoundError):
ensure_scenario_in_project(
uow,
project_id=project_two.id,
scenario_id=scenario.id,
user=user,
)

View File

@@ -6,7 +6,7 @@ from typing import cast
from uuid import uuid4
import pytest
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient
from pydantic import ValidationError
from sqlalchemy import create_engine
@@ -15,13 +15,14 @@ from sqlalchemy.engine import Engine
from sqlalchemy.pool import StaticPool
from config.database import Base
from dependencies import get_unit_of_work
from dependencies import get_auth_session, get_unit_of_work
from models import (
MiningOperationType,
Project,
ResourceType,
Scenario,
ScenarioStatus,
User,
)
from schemas.scenario import (
ScenarioComparisonRequest,
@@ -30,6 +31,7 @@ from schemas.scenario import (
from services.exceptions import ScenarioValidationError
from services.scenario_validation import ScenarioComparisonValidator
from services.unit_of_work import UnitOfWork
from services.session import AuthSession, SessionTokens
from routes.scenarios import router as scenarios_router
@@ -159,6 +161,28 @@ def api_client(session_factory) -> Iterator[TestClient]:
yield uow
app.dependency_overrides[get_unit_of_work] = _override_uow
with UnitOfWork(session_factory=session_factory) as uow:
assert uow.users is not None
uow.ensure_default_roles()
user = User(
email="test-scenarios@example.com",
username="scenario-tester",
password_hash=User.hash_password("password"),
is_active=True,
is_superuser=True,
)
uow.users.create(user)
user = uow.users.get(user.id, with_roles=True)
def _override_auth_session(request: Request) -> AuthSession:
session = AuthSession(tokens=SessionTokens(
access_token="test", refresh_token="test"))
session.user = user
request.state.auth_session = session
return session
app.dependency_overrides[get_auth_session] = _override_auth_session
client = TestClient(app)
try:
yield client
@@ -171,6 +195,8 @@ def _create_project_with_scenarios(
scenario_overrides: list[dict[str, object]],
) -> tuple[int, list[int]]:
with UnitOfWork(session_factory=session_factory) as uow:
assert uow.projects is not None
assert uow.scenarios is not None
project_name = f"Project {uuid4()}"
project = Project(name=project_name,
operation_type=MiningOperationType.OPEN_PIT)