from __future__ import annotations import logging import os from dataclasses import dataclass from typing import Callable, Iterable from dotenv import load_dotenv from config.settings import Settings from models import Role, User from services.repositories import ( DEFAULT_ROLE_DEFINITIONS, PricingSettingsSeedResult, RoleRepository, UserRepository, ensure_default_pricing_settings, ) from services.unit_of_work import UnitOfWork @dataclass class SeedConfig: admin_email: str admin_username: str admin_password: str admin_roles: tuple[str, ...] force_reset: bool @dataclass class RoleSeedResult: created: int updated: int total: int @dataclass class AdminSeedResult: created_user: bool updated_user: bool password_rotated: bool roles_granted: int def parse_bool(value: str | None) -> bool: if value is None: return False return value.strip().lower() in {"1", "true", "yes", "on"} def normalise_role_list(raw_value: str | None) -> tuple[str, ...]: if not raw_value: return ("admin",) parts = [segment.strip() for segment in raw_value.split(",") if segment.strip()] if "admin" not in parts: parts.insert(0, "admin") seen: set[str] = set() ordered: list[str] = [] for role_name in parts: if role_name not in seen: ordered.append(role_name) seen.add(role_name) return tuple(ordered) def load_config() -> SeedConfig: load_dotenv() admin_email = os.getenv("CALMINER_SEED_ADMIN_EMAIL", "admin@calminer.local") admin_username = os.getenv("CALMINER_SEED_ADMIN_USERNAME", "admin") admin_password = os.getenv("CALMINER_SEED_ADMIN_PASSWORD", "ChangeMe123!") admin_roles = normalise_role_list(os.getenv("CALMINER_SEED_ADMIN_ROLES")) force_reset = parse_bool(os.getenv("CALMINER_SEED_FORCE")) return SeedConfig( admin_email=admin_email, admin_username=admin_username, admin_password=admin_password, admin_roles=admin_roles, force_reset=force_reset, ) def ensure_default_roles( role_repo: RoleRepository, definitions: Iterable[dict[str, str]] = DEFAULT_ROLE_DEFINITIONS, ) -> RoleSeedResult: created = 0 updated = 0 total = 0 for definition in definitions: total += 1 existing = role_repo.get_by_name(definition["name"]) if existing is None: role_repo.create(Role(**definition)) created += 1 continue changed = False if existing.display_name != definition["display_name"]: existing.display_name = definition["display_name"] changed = True if existing.description != definition["description"]: existing.description = definition["description"] changed = True if changed: updated += 1 role_repo.session.flush() return RoleSeedResult(created=created, updated=updated, total=total) def ensure_admin_user( user_repo: UserRepository, role_repo: RoleRepository, config: SeedConfig, ) -> AdminSeedResult: created_user = False updated_user = False password_rotated = False roles_granted = 0 user = user_repo.get_by_email(config.admin_email, with_roles=True) if user is None: user = User( email=config.admin_email, username=config.admin_username, password_hash=User.hash_password(config.admin_password), is_active=True, is_superuser=True, ) user_repo.create(user) created_user = True else: if user.username != config.admin_username: user.username = config.admin_username updated_user = True if not user.is_active: user.is_active = True updated_user = True if not user.is_superuser: user.is_superuser = True updated_user = True if config.force_reset: user.password_hash = User.hash_password(config.admin_password) password_rotated = True updated_user = True user_repo.session.flush() for role_name in config.admin_roles: role = role_repo.get_by_name(role_name) if role is None: logging.warning( "Role '%s' is not defined and will be skipped", role_name) continue already_assigned = any(assignment.role_id == role.id for assignment in user.role_assignments) if already_assigned: continue user_repo.assign_role( user_id=user.id, role_id=role.id, granted_by=user.id) roles_granted += 1 return AdminSeedResult( created_user=created_user, updated_user=updated_user, password_rotated=password_rotated, roles_granted=roles_granted, ) def seed_initial_data( config: SeedConfig, *, unit_of_work_factory: Callable[[], UnitOfWork] | None = None, ) -> None: logging.info("Starting initial data seeding") factory = unit_of_work_factory or UnitOfWork with factory() as uow: assert ( uow.roles is not None and uow.users is not None and uow.pricing_settings is not None and uow.projects is not None ) role_result = ensure_default_roles(uow.roles) admin_result = ensure_admin_user(uow.users, uow.roles, config) pricing_metadata = uow.get_pricing_metadata() metadata_source = "database" if pricing_metadata is None: pricing_metadata = Settings.from_environment().pricing_metadata() metadata_source = "environment" pricing_result: PricingSettingsSeedResult = ensure_default_pricing_settings( uow.pricing_settings, metadata=pricing_metadata, ) projects_without_pricing = [ project for project in uow.projects.list(with_pricing=True) if project.pricing_settings is None ] assigned_projects = 0 for project in projects_without_pricing: uow.set_project_pricing_settings(project, pricing_result.settings) assigned_projects += 1 logging.info( "Roles processed: %s total, %s created, %s updated", role_result.total, role_result.created, role_result.updated, ) logging.info( "Admin user: created=%s updated=%s password_rotated=%s roles_granted=%s", admin_result.created_user, admin_result.updated_user, admin_result.password_rotated, admin_result.roles_granted, ) logging.info( "Pricing settings ensured (source=%s): slug=%s created=%s updated_fields=%s impurity_upserts=%s", metadata_source, pricing_result.settings.slug, pricing_result.created, pricing_result.updated_fields, pricing_result.impurity_upserts, ) logging.info( "Projects updated with default pricing settings: %s", assigned_projects, ) logging.info("Initial data seeding completed successfully")