From 24cb3c2f57010f777b05209bf2b4318cf6a327d1 Mon Sep 17 00:00:00 2001 From: zwitschi Date: Sun, 9 Nov 2025 23:43:13 +0100 Subject: [PATCH] feat: implement admin bootstrap settings and ensure default roles and admin account --- changelog.md | 1 + config/settings.py | 58 ++++++++++++++++++ main.py | 22 +++++++ services/bootstrap.py | 129 ++++++++++++++++++++++++++++++++++++++++ tests/test_bootstrap.py | 116 ++++++++++++++++++++++++++++++++++++ 5 files changed, 326 insertions(+) create mode 100644 services/bootstrap.py create mode 100644 tests/test_bootstrap.py diff --git a/changelog.md b/changelog.md index bdb8c93..463dcd0 100644 --- a/changelog.md +++ b/changelog.md @@ -27,3 +27,4 @@ - Extended authorization helper layer with project/scenario ownership lookups, integrated them into FastAPI dependencies, refreshed pytest fixtures to keep the suite authenticated, and documented the new patterns across RBAC plan and security guides. - Added dedicated pytest coverage for guard dependencies, exercising success plus failure paths (missing session, inactive user, missing roles, project/scenario access errors) via `tests/test_dependencies_guards.py`. - Added integration tests in `tests/test_authorization_integration.py` verifying anonymous 401 responses, role-based 403s, and authorized project manager flows across API and UI endpoints. +- Implemented environment-driven admin bootstrap settings, wired the `bootstrap_admin` helper into FastAPI startup, added pytest coverage for creation/idempotency/reset logic, and documented operational guidance in the RBAC plan and security concept. diff --git a/config/settings.py b/config/settings.py index 217a487..339a879 100644 --- a/config/settings.py +++ b/config/settings.py @@ -10,6 +10,17 @@ from typing import Optional from services.security import JWTSettings +@dataclass(frozen=True, slots=True) +class AdminBootstrapSettings: + """Default administrator bootstrap configuration.""" + + email: str + username: str + password: str + roles: tuple[str, ...] + force_reset: bool + + @dataclass(frozen=True, slots=True) class SessionSettings: """Cookie and header configuration for session token transport.""" @@ -40,6 +51,11 @@ class Settings: session_header_name: str = "Authorization" session_header_prefix: str = "Bearer" session_allow_header_fallback: bool = True + admin_email: str = "admin@calminer.local" + admin_username: str = "admin" + admin_password: str = "ChangeMe123!" + admin_roles: tuple[str, ...] = ("admin",) + admin_force_reset: bool = False @classmethod def from_environment(cls) -> "Settings": @@ -74,6 +90,21 @@ class Settings: session_allow_header_fallback=cls._bool_from_env( "CALMINER_SESSION_ALLOW_HEADER_FALLBACK", True ), + admin_email=os.getenv( + "CALMINER_SEED_ADMIN_EMAIL", "admin@calminer.local" + ), + admin_username=os.getenv( + "CALMINER_SEED_ADMIN_USERNAME", "admin" + ), + admin_password=os.getenv( + "CALMINER_SEED_ADMIN_PASSWORD", "ChangeMe123!" + ), + admin_roles=cls._parse_admin_roles( + os.getenv("CALMINER_SEED_ADMIN_ROLES") + ), + admin_force_reset=cls._bool_from_env( + "CALMINER_SEED_FORCE", False + ), ) @staticmethod @@ -98,6 +129,22 @@ class Settings: return False return default + @staticmethod + def _parse_admin_roles(raw_value: str | None) -> tuple[str, ...]: + if not raw_value: + return ("admin",) + parts = [segment.strip() + for segment in raw_value.split(",") if segment.strip()] + if "admin" not in parts: + parts.insert(0, "admin") + seen: set[str] = set() + ordered: list[str] = [] + for role_name in parts: + if role_name not in seen: + ordered.append(role_name) + seen.add(role_name) + return tuple(ordered) + def jwt_settings(self) -> JWTSettings: """Build runtime JWT settings compatible with token helpers.""" @@ -122,6 +169,17 @@ class Settings: allow_header_fallback=self.session_allow_header_fallback, ) + def admin_bootstrap_settings(self) -> AdminBootstrapSettings: + """Return configured admin bootstrap settings.""" + + return AdminBootstrapSettings( + email=self.admin_email, + username=self.admin_username, + password=self.admin_password, + roles=self.admin_roles, + force_reset=self.admin_force_reset, + ) + @lru_cache(maxsize=1) def get_settings() -> Settings: diff --git a/main.py b/main.py index 6fd8fb8..7c28ebf 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,11 @@ +import logging from typing import Awaitable, Callable from fastapi import FastAPI, Request, Response from fastapi.staticfiles import StaticFiles from config.database import Base, engine +from config.settings import get_settings from middleware.auth_session import AuthSessionMiddleware from middleware.validation import validate_json from models import ( @@ -16,6 +18,7 @@ from routes.auth import router as auth_router from routes.dashboard import router as dashboard_router from routes.projects import router as projects_router from routes.scenarios import router as scenarios_router +from services.bootstrap import bootstrap_admin # Initialize database schema (imports above ensure models are registered) Base.metadata.create_all(bind=engine) @@ -24,6 +27,8 @@ app = FastAPI() app.add_middleware(AuthSessionMiddleware) +logger = logging.getLogger(__name__) + @app.middleware("http") async def json_validation( @@ -37,6 +42,23 @@ async def health() -> dict[str, str]: return {"status": "ok"} +@app.on_event("startup") +async def ensure_admin_bootstrap() -> None: + settings = get_settings().admin_bootstrap_settings() + try: + role_result, admin_result = bootstrap_admin(settings=settings) + logger.info( + "Admin bootstrap completed: roles=%s created=%s updated=%s rotated=%s assigned=%s", + role_result.ensured, + admin_result.created_user, + admin_result.updated_user, + admin_result.password_rotated, + admin_result.roles_granted, + ) + except Exception: # pragma: no cover - defensive logging + logger.exception("Failed to bootstrap administrator account") + + app.include_router(dashboard_router) app.include_router(auth_router) app.include_router(projects_router) diff --git a/services/bootstrap.py b/services/bootstrap.py new file mode 100644 index 0000000..4ccb4d4 --- /dev/null +++ b/services/bootstrap.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Callable + +from config.settings import AdminBootstrapSettings +from models import User +from services.repositories import ensure_default_roles +from services.unit_of_work import UnitOfWork + + +logger = logging.getLogger(__name__) + + +@dataclass(slots=True) +class RoleBootstrapResult: + created: int + ensured: int + + +@dataclass(slots=True) +class AdminBootstrapResult: + created_user: bool + updated_user: bool + password_rotated: bool + roles_granted: int + + +def bootstrap_admin( + *, + settings: AdminBootstrapSettings, + unit_of_work_factory: Callable[[], UnitOfWork] = UnitOfWork, +) -> tuple[RoleBootstrapResult, AdminBootstrapResult]: + """Ensure default roles and administrator account exist.""" + + with unit_of_work_factory() as uow: + assert uow.roles is not None and uow.users is not None + + role_result = _bootstrap_roles(uow) + admin_result = _bootstrap_admin_user(uow, settings) + + logger.info( + "Admin bootstrap result: created_user=%s updated_user=%s password_rotated=%s roles_granted=%s", + admin_result.created_user, + admin_result.updated_user, + admin_result.password_rotated, + admin_result.roles_granted, + ) + return role_result, admin_result + + +def _bootstrap_roles(uow: UnitOfWork) -> RoleBootstrapResult: + assert uow.roles is not None + before = {role.name for role in uow.roles.list()} + ensure_default_roles(uow.roles) + after = {role.name for role in uow.roles.list()} + created = len(after - before) + return RoleBootstrapResult(created=created, ensured=len(after)) + + +def _bootstrap_admin_user( + uow: UnitOfWork, + settings: AdminBootstrapSettings, +) -> AdminBootstrapResult: + assert uow.users is not None and uow.roles is not None + + created_user = False + updated_user = False + password_rotated = False + roles_granted = 0 + + user = uow.users.get_by_email(settings.email, with_roles=True) + if user is None: + user = User( + email=settings.email, + username=settings.username, + password_hash=User.hash_password(settings.password), + is_active=True, + is_superuser=True, + ) + uow.users.create(user) + created_user = True + else: + if user.username != settings.username: + user.username = settings.username + updated_user = True + if not user.is_active: + user.is_active = True + updated_user = True + if not user.is_superuser: + user.is_superuser = True + updated_user = True + if settings.force_reset: + user.password_hash = User.hash_password(settings.password) + password_rotated = True + updated_user = True + uow.users.session.flush() + + user = uow.users.get(user.id, with_roles=True) + assert user is not None + + existing_roles = {role.name for role in user.roles} + for role_name in settings.roles: + role = uow.roles.get_by_name(role_name) + if role is None: + logger.warning( + "Bootstrap admin role '%s' is not defined; skipping assignment", + role_name, + ) + continue + if role.name in existing_roles: + continue + uow.users.assign_role( + user_id=user.id, + role_id=role.id, + granted_by=user.id, + ) + roles_granted += 1 + existing_roles.add(role.name) + + uow.users.session.flush() + + return AdminBootstrapResult( + created_user=created_user, + updated_user=updated_user, + password_rotated=password_rotated, + roles_granted=roles_granted, + ) diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py new file mode 100644 index 0000000..676ee42 --- /dev/null +++ b/tests/test_bootstrap.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from collections.abc import Callable + +from typing import Any + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +from config.database import Base +from config.settings import AdminBootstrapSettings +from services.bootstrap import AdminBootstrapResult, RoleBootstrapResult, bootstrap_admin +from services.unit_of_work import UnitOfWork + + +@pytest.fixture() +def session_factory() -> Callable[[], Session]: + engine = create_engine("sqlite+pysqlite:///:memory:", future=True) + Base.metadata.create_all(engine) + factory = sessionmaker(bind=engine, expire_on_commit=False, future=True) + + def _factory() -> Session: + return factory() + + return _factory + + +@pytest.fixture() +def unit_of_work_factory(session_factory: Callable[[], Session]) -> Callable[[], UnitOfWork]: + def _factory() -> UnitOfWork: + return UnitOfWork(session_factory=session_factory) + + return _factory + + +def _settings(**overrides: Any) -> AdminBootstrapSettings: + defaults: dict[str, Any] = { + "email": "admin@example.com", + "username": "admin", + "password": "changeme", + "roles": ("admin", "viewer"), + "force_reset": False, + } + defaults.update(overrides) + return AdminBootstrapSettings( + email=str(defaults["email"]), + username=str(defaults["username"]), + password=str(defaults["password"]), + roles=tuple(defaults["roles"]), + force_reset=bool(defaults["force_reset"]), + ) + + +def test_bootstrap_creates_admin_and_roles(unit_of_work_factory: Callable[[], UnitOfWork]) -> None: + settings = _settings() + + role_result, admin_result = bootstrap_admin( + settings=settings, + unit_of_work_factory=unit_of_work_factory, + ) + + assert role_result == RoleBootstrapResult(created=4, ensured=4) + assert admin_result == AdminBootstrapResult( + created_user=True, + updated_user=False, + password_rotated=False, + roles_granted=2, + ) + + with unit_of_work_factory() as uow: + users_repo = uow.users + assert users_repo is not None + user = users_repo.get_by_email(settings.email, with_roles=True) + assert user is not None + assert user.is_superuser is True + assert {role.name for role in user.roles} == {"admin", "viewer"} + + +def test_bootstrap_is_idempotent(unit_of_work_factory: Callable[[], UnitOfWork]) -> None: + settings = _settings() + + bootstrap_admin(settings=settings, + unit_of_work_factory=unit_of_work_factory) + role_result, admin_result = bootstrap_admin( + settings=settings, + unit_of_work_factory=unit_of_work_factory, + ) + + assert role_result.created == 0 + assert role_result.ensured == 4 + assert admin_result.created_user is False + assert admin_result.updated_user is False + assert admin_result.roles_granted == 0 + + +def test_bootstrap_respects_force_reset(unit_of_work_factory: Callable[[], UnitOfWork]) -> None: + base_settings = _settings(password="initial") + bootstrap_admin(settings=base_settings, + unit_of_work_factory=unit_of_work_factory) + + rotated_settings = _settings(password="rotated", force_reset=True) + _, admin_result = bootstrap_admin( + settings=rotated_settings, + unit_of_work_factory=unit_of_work_factory, + ) + + assert admin_result.password_rotated is True + assert admin_result.updated_user is True + + with unit_of_work_factory() as uow: + users_repo = uow.users + assert users_repo is not None + user = users_repo.get_by_email(rotated_settings.email) + assert user is not None + assert user.verify_password("rotated")