feat: implement admin bootstrap settings and ensure default roles and admin account

This commit is contained in:
2025-11-09 23:43:13 +01:00
parent 118657491c
commit 24cb3c2f57
5 changed files with 326 additions and 0 deletions

View File

@@ -27,3 +27,4 @@
- Extended authorization helper layer with project/scenario ownership lookups, integrated them into FastAPI dependencies, refreshed pytest fixtures to keep the suite authenticated, and documented the new patterns across RBAC plan and security guides. - Extended authorization helper layer with project/scenario ownership lookups, integrated them into FastAPI dependencies, refreshed pytest fixtures to keep the suite authenticated, and documented the new patterns across RBAC plan and security guides.
- Added dedicated pytest coverage for guard dependencies, exercising success plus failure paths (missing session, inactive user, missing roles, project/scenario access errors) via `tests/test_dependencies_guards.py`. - Added dedicated pytest coverage for guard dependencies, exercising success plus failure paths (missing session, inactive user, missing roles, project/scenario access errors) via `tests/test_dependencies_guards.py`.
- Added integration tests in `tests/test_authorization_integration.py` verifying anonymous 401 responses, role-based 403s, and authorized project manager flows across API and UI endpoints. - Added integration tests in `tests/test_authorization_integration.py` verifying anonymous 401 responses, role-based 403s, and authorized project manager flows across API and UI endpoints.
- Implemented environment-driven admin bootstrap settings, wired the `bootstrap_admin` helper into FastAPI startup, added pytest coverage for creation/idempotency/reset logic, and documented operational guidance in the RBAC plan and security concept.

View File

@@ -10,6 +10,17 @@ from typing import Optional
from services.security import JWTSettings from services.security import JWTSettings
@dataclass(frozen=True, slots=True)
class AdminBootstrapSettings:
"""Default administrator bootstrap configuration."""
email: str
username: str
password: str
roles: tuple[str, ...]
force_reset: bool
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
class SessionSettings: class SessionSettings:
"""Cookie and header configuration for session token transport.""" """Cookie and header configuration for session token transport."""
@@ -40,6 +51,11 @@ class Settings:
session_header_name: str = "Authorization" session_header_name: str = "Authorization"
session_header_prefix: str = "Bearer" session_header_prefix: str = "Bearer"
session_allow_header_fallback: bool = True session_allow_header_fallback: bool = True
admin_email: str = "admin@calminer.local"
admin_username: str = "admin"
admin_password: str = "ChangeMe123!"
admin_roles: tuple[str, ...] = ("admin",)
admin_force_reset: bool = False
@classmethod @classmethod
def from_environment(cls) -> "Settings": def from_environment(cls) -> "Settings":
@@ -74,6 +90,21 @@ class Settings:
session_allow_header_fallback=cls._bool_from_env( session_allow_header_fallback=cls._bool_from_env(
"CALMINER_SESSION_ALLOW_HEADER_FALLBACK", True "CALMINER_SESSION_ALLOW_HEADER_FALLBACK", True
), ),
admin_email=os.getenv(
"CALMINER_SEED_ADMIN_EMAIL", "admin@calminer.local"
),
admin_username=os.getenv(
"CALMINER_SEED_ADMIN_USERNAME", "admin"
),
admin_password=os.getenv(
"CALMINER_SEED_ADMIN_PASSWORD", "ChangeMe123!"
),
admin_roles=cls._parse_admin_roles(
os.getenv("CALMINER_SEED_ADMIN_ROLES")
),
admin_force_reset=cls._bool_from_env(
"CALMINER_SEED_FORCE", False
),
) )
@staticmethod @staticmethod
@@ -98,6 +129,22 @@ class Settings:
return False return False
return default return default
@staticmethod
def _parse_admin_roles(raw_value: str | None) -> tuple[str, ...]:
if not raw_value:
return ("admin",)
parts = [segment.strip()
for segment in raw_value.split(",") if segment.strip()]
if "admin" not in parts:
parts.insert(0, "admin")
seen: set[str] = set()
ordered: list[str] = []
for role_name in parts:
if role_name not in seen:
ordered.append(role_name)
seen.add(role_name)
return tuple(ordered)
def jwt_settings(self) -> JWTSettings: def jwt_settings(self) -> JWTSettings:
"""Build runtime JWT settings compatible with token helpers.""" """Build runtime JWT settings compatible with token helpers."""
@@ -122,6 +169,17 @@ class Settings:
allow_header_fallback=self.session_allow_header_fallback, allow_header_fallback=self.session_allow_header_fallback,
) )
def admin_bootstrap_settings(self) -> AdminBootstrapSettings:
"""Return configured admin bootstrap settings."""
return AdminBootstrapSettings(
email=self.admin_email,
username=self.admin_username,
password=self.admin_password,
roles=self.admin_roles,
force_reset=self.admin_force_reset,
)
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def get_settings() -> Settings: def get_settings() -> Settings:

22
main.py
View File

@@ -1,9 +1,11 @@
import logging
from typing import Awaitable, Callable from typing import Awaitable, Callable
from fastapi import FastAPI, Request, Response from fastapi import FastAPI, Request, Response
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from config.database import Base, engine from config.database import Base, engine
from config.settings import get_settings
from middleware.auth_session import AuthSessionMiddleware from middleware.auth_session import AuthSessionMiddleware
from middleware.validation import validate_json from middleware.validation import validate_json
from models import ( from models import (
@@ -16,6 +18,7 @@ from routes.auth import router as auth_router
from routes.dashboard import router as dashboard_router from routes.dashboard import router as dashboard_router
from routes.projects import router as projects_router from routes.projects import router as projects_router
from routes.scenarios import router as scenarios_router from routes.scenarios import router as scenarios_router
from services.bootstrap import bootstrap_admin
# Initialize database schema (imports above ensure models are registered) # Initialize database schema (imports above ensure models are registered)
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
@@ -24,6 +27,8 @@ app = FastAPI()
app.add_middleware(AuthSessionMiddleware) app.add_middleware(AuthSessionMiddleware)
logger = logging.getLogger(__name__)
@app.middleware("http") @app.middleware("http")
async def json_validation( async def json_validation(
@@ -37,6 +42,23 @@ async def health() -> dict[str, str]:
return {"status": "ok"} return {"status": "ok"}
@app.on_event("startup")
async def ensure_admin_bootstrap() -> None:
settings = get_settings().admin_bootstrap_settings()
try:
role_result, admin_result = bootstrap_admin(settings=settings)
logger.info(
"Admin bootstrap completed: roles=%s created=%s updated=%s rotated=%s assigned=%s",
role_result.ensured,
admin_result.created_user,
admin_result.updated_user,
admin_result.password_rotated,
admin_result.roles_granted,
)
except Exception: # pragma: no cover - defensive logging
logger.exception("Failed to bootstrap administrator account")
app.include_router(dashboard_router) app.include_router(dashboard_router)
app.include_router(auth_router) app.include_router(auth_router)
app.include_router(projects_router) app.include_router(projects_router)

129
services/bootstrap.py Normal file
View File

@@ -0,0 +1,129 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Callable
from config.settings import AdminBootstrapSettings
from models import User
from services.repositories import ensure_default_roles
from services.unit_of_work import UnitOfWork
logger = logging.getLogger(__name__)
@dataclass(slots=True)
class RoleBootstrapResult:
created: int
ensured: int
@dataclass(slots=True)
class AdminBootstrapResult:
created_user: bool
updated_user: bool
password_rotated: bool
roles_granted: int
def bootstrap_admin(
*,
settings: AdminBootstrapSettings,
unit_of_work_factory: Callable[[], UnitOfWork] = UnitOfWork,
) -> tuple[RoleBootstrapResult, AdminBootstrapResult]:
"""Ensure default roles and administrator account exist."""
with unit_of_work_factory() as uow:
assert uow.roles is not None and uow.users is not None
role_result = _bootstrap_roles(uow)
admin_result = _bootstrap_admin_user(uow, settings)
logger.info(
"Admin bootstrap result: created_user=%s updated_user=%s password_rotated=%s roles_granted=%s",
admin_result.created_user,
admin_result.updated_user,
admin_result.password_rotated,
admin_result.roles_granted,
)
return role_result, admin_result
def _bootstrap_roles(uow: UnitOfWork) -> RoleBootstrapResult:
assert uow.roles is not None
before = {role.name for role in uow.roles.list()}
ensure_default_roles(uow.roles)
after = {role.name for role in uow.roles.list()}
created = len(after - before)
return RoleBootstrapResult(created=created, ensured=len(after))
def _bootstrap_admin_user(
uow: UnitOfWork,
settings: AdminBootstrapSettings,
) -> AdminBootstrapResult:
assert uow.users is not None and uow.roles is not None
created_user = False
updated_user = False
password_rotated = False
roles_granted = 0
user = uow.users.get_by_email(settings.email, with_roles=True)
if user is None:
user = User(
email=settings.email,
username=settings.username,
password_hash=User.hash_password(settings.password),
is_active=True,
is_superuser=True,
)
uow.users.create(user)
created_user = True
else:
if user.username != settings.username:
user.username = settings.username
updated_user = True
if not user.is_active:
user.is_active = True
updated_user = True
if not user.is_superuser:
user.is_superuser = True
updated_user = True
if settings.force_reset:
user.password_hash = User.hash_password(settings.password)
password_rotated = True
updated_user = True
uow.users.session.flush()
user = uow.users.get(user.id, with_roles=True)
assert user is not None
existing_roles = {role.name for role in user.roles}
for role_name in settings.roles:
role = uow.roles.get_by_name(role_name)
if role is None:
logger.warning(
"Bootstrap admin role '%s' is not defined; skipping assignment",
role_name,
)
continue
if role.name in existing_roles:
continue
uow.users.assign_role(
user_id=user.id,
role_id=role.id,
granted_by=user.id,
)
roles_granted += 1
existing_roles.add(role.name)
uow.users.session.flush()
return AdminBootstrapResult(
created_user=created_user,
updated_user=updated_user,
password_rotated=password_rotated,
roles_granted=roles_granted,
)

116
tests/test_bootstrap.py Normal file
View File

@@ -0,0 +1,116 @@
from __future__ import annotations
from collections.abc import Callable
from typing import Any
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from config.database import Base
from config.settings import AdminBootstrapSettings
from services.bootstrap import AdminBootstrapResult, RoleBootstrapResult, bootstrap_admin
from services.unit_of_work import UnitOfWork
@pytest.fixture()
def session_factory() -> Callable[[], Session]:
engine = create_engine("sqlite+pysqlite:///:memory:", future=True)
Base.metadata.create_all(engine)
factory = sessionmaker(bind=engine, expire_on_commit=False, future=True)
def _factory() -> Session:
return factory()
return _factory
@pytest.fixture()
def unit_of_work_factory(session_factory: Callable[[], Session]) -> Callable[[], UnitOfWork]:
def _factory() -> UnitOfWork:
return UnitOfWork(session_factory=session_factory)
return _factory
def _settings(**overrides: Any) -> AdminBootstrapSettings:
defaults: dict[str, Any] = {
"email": "admin@example.com",
"username": "admin",
"password": "changeme",
"roles": ("admin", "viewer"),
"force_reset": False,
}
defaults.update(overrides)
return AdminBootstrapSettings(
email=str(defaults["email"]),
username=str(defaults["username"]),
password=str(defaults["password"]),
roles=tuple(defaults["roles"]),
force_reset=bool(defaults["force_reset"]),
)
def test_bootstrap_creates_admin_and_roles(unit_of_work_factory: Callable[[], UnitOfWork]) -> None:
settings = _settings()
role_result, admin_result = bootstrap_admin(
settings=settings,
unit_of_work_factory=unit_of_work_factory,
)
assert role_result == RoleBootstrapResult(created=4, ensured=4)
assert admin_result == AdminBootstrapResult(
created_user=True,
updated_user=False,
password_rotated=False,
roles_granted=2,
)
with unit_of_work_factory() as uow:
users_repo = uow.users
assert users_repo is not None
user = users_repo.get_by_email(settings.email, with_roles=True)
assert user is not None
assert user.is_superuser is True
assert {role.name for role in user.roles} == {"admin", "viewer"}
def test_bootstrap_is_idempotent(unit_of_work_factory: Callable[[], UnitOfWork]) -> None:
settings = _settings()
bootstrap_admin(settings=settings,
unit_of_work_factory=unit_of_work_factory)
role_result, admin_result = bootstrap_admin(
settings=settings,
unit_of_work_factory=unit_of_work_factory,
)
assert role_result.created == 0
assert role_result.ensured == 4
assert admin_result.created_user is False
assert admin_result.updated_user is False
assert admin_result.roles_granted == 0
def test_bootstrap_respects_force_reset(unit_of_work_factory: Callable[[], UnitOfWork]) -> None:
base_settings = _settings(password="initial")
bootstrap_admin(settings=base_settings,
unit_of_work_factory=unit_of_work_factory)
rotated_settings = _settings(password="rotated", force_reset=True)
_, admin_result = bootstrap_admin(
settings=rotated_settings,
unit_of_work_factory=unit_of_work_factory,
)
assert admin_result.password_rotated is True
assert admin_result.updated_user is True
with unit_of_work_factory() as uow:
users_repo = uow.users
assert users_repo is not None
user = users_repo.get_by_email(rotated_settings.email)
assert user is not None
assert user.verify_password("rotated")