- Updated form template to prefill currency input with default value and added help text for clarity. - Modified integration tests to assert more descriptive error messages for invalid currency codes. - Introduced new tests for currency normalization and validation in various scenarios, including imports and exports. - Added comprehensive tests for pricing calculations, ensuring defaults are respected and overrides function correctly. - Implemented unit tests for pricing settings repository, ensuring CRUD operations and default settings are handled properly. - Enhanced scenario pricing evaluation tests to validate currency handling and metadata defaults. - Added simulation tests to ensure Monte Carlo runs are accurate and handle various distribution scenarios.
232 lines
7.2 KiB
Python
232 lines
7.2 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import Callable, Iterable
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
from config.settings import Settings
|
|
from models import Role, User
|
|
from services.repositories import (
|
|
DEFAULT_ROLE_DEFINITIONS,
|
|
PricingSettingsSeedResult,
|
|
RoleRepository,
|
|
UserRepository,
|
|
ensure_default_pricing_settings,
|
|
)
|
|
from services.unit_of_work import UnitOfWork
|
|
|
|
|
|
@dataclass
|
|
class SeedConfig:
|
|
admin_email: str
|
|
admin_username: str
|
|
admin_password: str
|
|
admin_roles: tuple[str, ...]
|
|
force_reset: bool
|
|
|
|
|
|
@dataclass
|
|
class RoleSeedResult:
|
|
created: int
|
|
updated: int
|
|
total: int
|
|
|
|
|
|
@dataclass
|
|
class AdminSeedResult:
|
|
created_user: bool
|
|
updated_user: bool
|
|
password_rotated: bool
|
|
roles_granted: int
|
|
|
|
|
|
def parse_bool(value: str | None) -> bool:
|
|
if value is None:
|
|
return False
|
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def normalise_role_list(raw_value: str | None) -> tuple[str, ...]:
|
|
if not raw_value:
|
|
return ("admin",)
|
|
parts = [segment.strip()
|
|
for segment in raw_value.split(",") if segment.strip()]
|
|
if "admin" not in parts:
|
|
parts.insert(0, "admin")
|
|
seen: set[str] = set()
|
|
ordered: list[str] = []
|
|
for role_name in parts:
|
|
if role_name not in seen:
|
|
ordered.append(role_name)
|
|
seen.add(role_name)
|
|
return tuple(ordered)
|
|
|
|
|
|
def load_config() -> SeedConfig:
|
|
load_dotenv()
|
|
admin_email = os.getenv("CALMINER_SEED_ADMIN_EMAIL",
|
|
"admin@calminer.local")
|
|
admin_username = os.getenv("CALMINER_SEED_ADMIN_USERNAME", "admin")
|
|
admin_password = os.getenv("CALMINER_SEED_ADMIN_PASSWORD", "ChangeMe123!")
|
|
admin_roles = normalise_role_list(os.getenv("CALMINER_SEED_ADMIN_ROLES"))
|
|
force_reset = parse_bool(os.getenv("CALMINER_SEED_FORCE"))
|
|
return SeedConfig(
|
|
admin_email=admin_email,
|
|
admin_username=admin_username,
|
|
admin_password=admin_password,
|
|
admin_roles=admin_roles,
|
|
force_reset=force_reset,
|
|
)
|
|
|
|
|
|
def ensure_default_roles(
|
|
role_repo: RoleRepository,
|
|
definitions: Iterable[dict[str, str]] = DEFAULT_ROLE_DEFINITIONS,
|
|
) -> RoleSeedResult:
|
|
created = 0
|
|
updated = 0
|
|
total = 0
|
|
for definition in definitions:
|
|
total += 1
|
|
existing = role_repo.get_by_name(definition["name"])
|
|
if existing is None:
|
|
role_repo.create(Role(**definition))
|
|
created += 1
|
|
continue
|
|
changed = False
|
|
if existing.display_name != definition["display_name"]:
|
|
existing.display_name = definition["display_name"]
|
|
changed = True
|
|
if existing.description != definition["description"]:
|
|
existing.description = definition["description"]
|
|
changed = True
|
|
if changed:
|
|
updated += 1
|
|
role_repo.session.flush()
|
|
return RoleSeedResult(created=created, updated=updated, total=total)
|
|
|
|
|
|
def ensure_admin_user(
|
|
user_repo: UserRepository,
|
|
role_repo: RoleRepository,
|
|
config: SeedConfig,
|
|
) -> AdminSeedResult:
|
|
created_user = False
|
|
updated_user = False
|
|
password_rotated = False
|
|
roles_granted = 0
|
|
|
|
user = user_repo.get_by_email(config.admin_email, with_roles=True)
|
|
if user is None:
|
|
user = User(
|
|
email=config.admin_email,
|
|
username=config.admin_username,
|
|
password_hash=User.hash_password(config.admin_password),
|
|
is_active=True,
|
|
is_superuser=True,
|
|
)
|
|
user_repo.create(user)
|
|
created_user = True
|
|
else:
|
|
if user.username != config.admin_username:
|
|
user.username = config.admin_username
|
|
updated_user = True
|
|
if not user.is_active:
|
|
user.is_active = True
|
|
updated_user = True
|
|
if not user.is_superuser:
|
|
user.is_superuser = True
|
|
updated_user = True
|
|
if config.force_reset:
|
|
user.password_hash = User.hash_password(config.admin_password)
|
|
password_rotated = True
|
|
updated_user = True
|
|
user_repo.session.flush()
|
|
|
|
for role_name in config.admin_roles:
|
|
role = role_repo.get_by_name(role_name)
|
|
if role is None:
|
|
logging.warning(
|
|
"Role '%s' is not defined and will be skipped", role_name)
|
|
continue
|
|
already_assigned = any(assignment.role_id ==
|
|
role.id for assignment in user.role_assignments)
|
|
if already_assigned:
|
|
continue
|
|
user_repo.assign_role(
|
|
user_id=user.id, role_id=role.id, granted_by=user.id)
|
|
roles_granted += 1
|
|
|
|
return AdminSeedResult(
|
|
created_user=created_user,
|
|
updated_user=updated_user,
|
|
password_rotated=password_rotated,
|
|
roles_granted=roles_granted,
|
|
)
|
|
|
|
|
|
def seed_initial_data(
|
|
config: SeedConfig,
|
|
*,
|
|
unit_of_work_factory: Callable[[], UnitOfWork] | None = None,
|
|
) -> None:
|
|
logging.info("Starting initial data seeding")
|
|
factory = unit_of_work_factory or UnitOfWork
|
|
with factory() as uow:
|
|
assert (
|
|
uow.roles is not None
|
|
and uow.users is not None
|
|
and uow.pricing_settings is not None
|
|
and uow.projects is not None
|
|
)
|
|
role_result = ensure_default_roles(uow.roles)
|
|
admin_result = ensure_admin_user(uow.users, uow.roles, config)
|
|
pricing_metadata = uow.get_pricing_metadata()
|
|
metadata_source = "database"
|
|
if pricing_metadata is None:
|
|
pricing_metadata = Settings.from_environment().pricing_metadata()
|
|
metadata_source = "environment"
|
|
pricing_result: PricingSettingsSeedResult = ensure_default_pricing_settings(
|
|
uow.pricing_settings,
|
|
metadata=pricing_metadata,
|
|
)
|
|
|
|
projects_without_pricing = [
|
|
project
|
|
for project in uow.projects.list(with_pricing=True)
|
|
if project.pricing_settings is None
|
|
]
|
|
assigned_projects = 0
|
|
for project in projects_without_pricing:
|
|
uow.set_project_pricing_settings(project, pricing_result.settings)
|
|
assigned_projects += 1
|
|
logging.info(
|
|
"Roles processed: %s total, %s created, %s updated",
|
|
role_result.total,
|
|
role_result.created,
|
|
role_result.updated,
|
|
)
|
|
logging.info(
|
|
"Admin user: created=%s updated=%s password_rotated=%s roles_granted=%s",
|
|
admin_result.created_user,
|
|
admin_result.updated_user,
|
|
admin_result.password_rotated,
|
|
admin_result.roles_granted,
|
|
)
|
|
logging.info(
|
|
"Pricing settings ensured (source=%s): slug=%s created=%s updated_fields=%s impurity_upserts=%s",
|
|
metadata_source,
|
|
pricing_result.settings.slug,
|
|
pricing_result.created,
|
|
pricing_result.updated_fields,
|
|
pricing_result.impurity_upserts,
|
|
)
|
|
logging.info(
|
|
"Projects updated with default pricing settings: %s",
|
|
assigned_projects,
|
|
)
|
|
logging.info("Initial data seeding completed successfully")
|