- Implemented role-based access control for project and scenario routes. - Added authorization checks to ensure users have appropriate roles for viewing and managing projects and scenarios. - Introduced utility functions for ensuring project and scenario access based on user roles. - Refactored project and scenario routes to utilize new authorization helpers. - Created initial data seeding script to set up default roles and an admin user. - Added tests for authorization helpers and initial data seeding functionality. - Updated exception handling to include authorization errors.
105 lines
3.0 KiB
Python
105 lines
3.0 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Iterable
|
|
|
|
from models import Project, Role, Scenario, User
|
|
from services.exceptions import AuthorizationError, EntityNotFoundError
|
|
from services.repositories import ProjectRepository, ScenarioRepository
|
|
from services.unit_of_work import UnitOfWork
|
|
|
|
READ_ROLES: frozenset[str] = frozenset(
|
|
{"viewer", "analyst", "project_manager", "admin"}
|
|
)
|
|
MANAGE_ROLES: frozenset[str] = frozenset({"project_manager", "admin"})
|
|
|
|
|
|
def _user_role_names(user: User) -> set[str]:
|
|
roles: Iterable[Role] = getattr(user, "roles", []) or []
|
|
return {role.name for role in roles}
|
|
|
|
|
|
def _require_project_repo(uow: UnitOfWork) -> ProjectRepository:
|
|
if not uow.projects:
|
|
raise RuntimeError("Project repository not initialised")
|
|
return uow.projects
|
|
|
|
|
|
def _require_scenario_repo(uow: UnitOfWork) -> ScenarioRepository:
|
|
if not uow.scenarios:
|
|
raise RuntimeError("Scenario repository not initialised")
|
|
return uow.scenarios
|
|
|
|
|
|
def _assert_user_can_access(user: User, *, require_manage: bool) -> None:
|
|
if not user.is_active:
|
|
raise AuthorizationError("User account is disabled.")
|
|
if user.is_superuser:
|
|
return
|
|
|
|
allowed = MANAGE_ROLES if require_manage else READ_ROLES
|
|
if not _user_role_names(user) & allowed:
|
|
raise AuthorizationError(
|
|
"Insufficient role permissions for this action.")
|
|
|
|
|
|
def ensure_project_access(
|
|
uow: UnitOfWork,
|
|
*,
|
|
project_id: int,
|
|
user: User,
|
|
require_manage: bool = False,
|
|
) -> Project:
|
|
"""Resolve a project and ensure the user holds the required permissions."""
|
|
|
|
repo = _require_project_repo(uow)
|
|
project = repo.get(project_id)
|
|
_assert_user_can_access(user, require_manage=require_manage)
|
|
return project
|
|
|
|
|
|
def ensure_scenario_access(
|
|
uow: UnitOfWork,
|
|
*,
|
|
scenario_id: int,
|
|
user: User,
|
|
require_manage: bool = False,
|
|
with_children: bool = False,
|
|
) -> Scenario:
|
|
"""Resolve a scenario and ensure the user holds the required permissions."""
|
|
|
|
repo = _require_scenario_repo(uow)
|
|
scenario = repo.get(scenario_id, with_children=with_children)
|
|
_assert_user_can_access(user, require_manage=require_manage)
|
|
return scenario
|
|
|
|
|
|
def ensure_scenario_in_project(
|
|
uow: UnitOfWork,
|
|
*,
|
|
project_id: int,
|
|
scenario_id: int,
|
|
user: User,
|
|
require_manage: bool = False,
|
|
with_children: bool = False,
|
|
) -> Scenario:
|
|
"""Resolve a scenario ensuring it belongs to the project and the user may access it."""
|
|
|
|
project = ensure_project_access(
|
|
uow,
|
|
project_id=project_id,
|
|
user=user,
|
|
require_manage=require_manage,
|
|
)
|
|
scenario = ensure_scenario_access(
|
|
uow,
|
|
scenario_id=scenario_id,
|
|
user=user,
|
|
require_manage=require_manage,
|
|
with_children=with_children,
|
|
)
|
|
if scenario.project_id != project.id:
|
|
raise EntityNotFoundError(
|
|
f"Scenario {scenario_id} does not belong to project {project_id}."
|
|
)
|
|
return scenario
|