Files
calminer/services/authorization.py
zwitschi 0f79864188 feat: enhance project and scenario management with role-based access control
- Implemented role-based access control for project and scenario routes.
- Added authorization checks to ensure users have appropriate roles for viewing and managing projects and scenarios.
- Introduced utility functions for ensuring project and scenario access based on user roles.
- Refactored project and scenario routes to utilize new authorization helpers.
- Created initial data seeding script to set up default roles and an admin user.
- Added tests for authorization helpers and initial data seeding functionality.
- Updated exception handling to include authorization errors.
2025-11-09 23:14:54 +01:00

105 lines
3.0 KiB
Python

from __future__ import annotations
from typing import Iterable
from models import Project, Role, Scenario, User
from services.exceptions import AuthorizationError, EntityNotFoundError
from services.repositories import ProjectRepository, ScenarioRepository
from services.unit_of_work import UnitOfWork
READ_ROLES: frozenset[str] = frozenset(
{"viewer", "analyst", "project_manager", "admin"}
)
MANAGE_ROLES: frozenset[str] = frozenset({"project_manager", "admin"})
def _user_role_names(user: User) -> set[str]:
roles: Iterable[Role] = getattr(user, "roles", []) or []
return {role.name for role in roles}
def _require_project_repo(uow: UnitOfWork) -> ProjectRepository:
if not uow.projects:
raise RuntimeError("Project repository not initialised")
return uow.projects
def _require_scenario_repo(uow: UnitOfWork) -> ScenarioRepository:
if not uow.scenarios:
raise RuntimeError("Scenario repository not initialised")
return uow.scenarios
def _assert_user_can_access(user: User, *, require_manage: bool) -> None:
if not user.is_active:
raise AuthorizationError("User account is disabled.")
if user.is_superuser:
return
allowed = MANAGE_ROLES if require_manage else READ_ROLES
if not _user_role_names(user) & allowed:
raise AuthorizationError(
"Insufficient role permissions for this action.")
def ensure_project_access(
uow: UnitOfWork,
*,
project_id: int,
user: User,
require_manage: bool = False,
) -> Project:
"""Resolve a project and ensure the user holds the required permissions."""
repo = _require_project_repo(uow)
project = repo.get(project_id)
_assert_user_can_access(user, require_manage=require_manage)
return project
def ensure_scenario_access(
uow: UnitOfWork,
*,
scenario_id: int,
user: User,
require_manage: bool = False,
with_children: bool = False,
) -> Scenario:
"""Resolve a scenario and ensure the user holds the required permissions."""
repo = _require_scenario_repo(uow)
scenario = repo.get(scenario_id, with_children=with_children)
_assert_user_can_access(user, require_manage=require_manage)
return scenario
def ensure_scenario_in_project(
uow: UnitOfWork,
*,
project_id: int,
scenario_id: int,
user: User,
require_manage: bool = False,
with_children: bool = False,
) -> Scenario:
"""Resolve a scenario ensuring it belongs to the project and the user may access it."""
project = ensure_project_access(
uow,
project_id=project_id,
user=user,
require_manage=require_manage,
)
scenario = ensure_scenario_access(
uow,
scenario_id=scenario_id,
user=user,
require_manage=require_manage,
with_children=with_children,
)
if scenario.project_id != project.id:
raise EntityNotFoundError(
f"Scenario {scenario_id} does not belong to project {project_id}."
)
return scenario