feat: Add database initialization, reset, and verification scripts
This commit is contained in:
673
scripts/init_db.py
Normal file
673
scripts/init_db.py
Normal file
@@ -0,0 +1,673 @@
|
||||
"""Idempotent DB initialization and seeding using Pydantic validation and raw SQL.
|
||||
|
||||
Usage:
|
||||
from scripts.init_db import init_db
|
||||
init_db()
|
||||
|
||||
This module creates PostgreSQL ENUM types if missing, creates minimal tables
|
||||
required for bootstrapping (roles, users, user_roles, pricing_settings and
|
||||
ancillary pricing tables), and seeds initial rows using INSERT ... ON CONFLICT
|
||||
DO NOTHING so it's safe to run multiple times.
|
||||
|
||||
Notes:
|
||||
- This module avoids importing application models at import time to prevent
|
||||
side-effects. Database connections are created inside functions.
|
||||
- It intentionally performs non-destructive operations only (CREATE IF NOT
|
||||
EXISTS, INSERT ... ON CONFLICT).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional
|
||||
import os
|
||||
import logging
|
||||
from decimal import Decimal
|
||||
|
||||
from pydantic import BaseModel, Field, validator
|
||||
from sqlalchemy import create_engine, text
|
||||
from sqlalchemy.engine import Engine
|
||||
from passlib.context import CryptContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
password_context = CryptContext(schemes=["argon2"], deprecated="auto")
|
||||
|
||||
# ENUM definitions matching previous schema
|
||||
ENUM_DEFINITIONS = {
|
||||
"miningoperationtype": [
|
||||
"open_pit",
|
||||
"underground",
|
||||
"in_situ_leach",
|
||||
"placer",
|
||||
"quarry",
|
||||
"mountaintop_removal",
|
||||
"other",
|
||||
],
|
||||
"scenariostatus": ["draft", "active", "archived"],
|
||||
"financialcategory": ["capex", "opex", "revenue", "contingency", "other"],
|
||||
"costbucket": [
|
||||
"capital_initial",
|
||||
"capital_sustaining",
|
||||
"operating_fixed",
|
||||
"operating_variable",
|
||||
"maintenance",
|
||||
"reclamation",
|
||||
"royalties",
|
||||
"general_admin",
|
||||
],
|
||||
"distributiontype": ["normal", "triangular", "uniform", "lognormal", "custom"],
|
||||
"stochasticvariable": [
|
||||
"ore_grade",
|
||||
"recovery_rate",
|
||||
"metal_price",
|
||||
"operating_cost",
|
||||
"capital_cost",
|
||||
"discount_rate",
|
||||
"throughput",
|
||||
],
|
||||
"resourcetype": [
|
||||
"diesel",
|
||||
"electricity",
|
||||
"water",
|
||||
"explosives",
|
||||
"reagents",
|
||||
"labor",
|
||||
"equipment_hours",
|
||||
"tailings_capacity",
|
||||
],
|
||||
}
|
||||
|
||||
# Minimal DDL for tables we seed / that bootstrap relies on
|
||||
TABLE_DDLS = [
|
||||
# roles
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS roles (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name VARCHAR(64) NOT NULL,
|
||||
display_name VARCHAR(128) NOT NULL,
|
||||
description TEXT,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now(),
|
||||
CONSTRAINT uq_roles_name UNIQUE (name)
|
||||
);
|
||||
""",
|
||||
# users
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
id SERIAL PRIMARY KEY,
|
||||
email VARCHAR(255) NOT NULL,
|
||||
username VARCHAR(128) NOT NULL,
|
||||
password_hash VARCHAR(255) NOT NULL,
|
||||
is_active BOOLEAN NOT NULL DEFAULT true,
|
||||
is_superuser BOOLEAN NOT NULL DEFAULT false,
|
||||
last_login_at TIMESTAMPTZ,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now(),
|
||||
CONSTRAINT uq_users_email UNIQUE (email),
|
||||
CONSTRAINT uq_users_username UNIQUE (username)
|
||||
);
|
||||
""",
|
||||
# user_roles
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS user_roles (
|
||||
user_id INTEGER NOT NULL,
|
||||
role_id INTEGER NOT NULL,
|
||||
granted_at TIMESTAMPTZ DEFAULT now(),
|
||||
granted_by INTEGER,
|
||||
PRIMARY KEY (user_id, role_id),
|
||||
CONSTRAINT uq_user_roles_user_role UNIQUE (user_id, role_id)
|
||||
);
|
||||
""",
|
||||
# pricing_settings
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS pricing_settings (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name VARCHAR(128) NOT NULL,
|
||||
slug VARCHAR(64) NOT NULL,
|
||||
description TEXT,
|
||||
default_currency VARCHAR(3),
|
||||
default_payable_pct NUMERIC(5,2) DEFAULT 100.00 NOT NULL,
|
||||
moisture_threshold_pct NUMERIC(5,2) DEFAULT 8.00 NOT NULL,
|
||||
moisture_penalty_per_pct NUMERIC(14,4) DEFAULT 0.0000 NOT NULL,
|
||||
metadata JSONB,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now(),
|
||||
CONSTRAINT uq_pricing_settings_slug UNIQUE (slug),
|
||||
CONSTRAINT uq_pricing_settings_name UNIQUE (name)
|
||||
);
|
||||
""",
|
||||
# pricing_metal_settings
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS pricing_metal_settings (
|
||||
id SERIAL PRIMARY KEY,
|
||||
pricing_settings_id INTEGER NOT NULL REFERENCES pricing_settings(id) ON DELETE CASCADE,
|
||||
metal_code VARCHAR(32) NOT NULL,
|
||||
payable_pct NUMERIC(5,2),
|
||||
moisture_threshold_pct NUMERIC(5,2),
|
||||
moisture_penalty_per_pct NUMERIC(14,4),
|
||||
data JSONB,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now(),
|
||||
CONSTRAINT uq_pricing_metal_settings_code UNIQUE (pricing_settings_id, metal_code)
|
||||
);
|
||||
""",
|
||||
# pricing_impurity_settings
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS pricing_impurity_settings (
|
||||
id SERIAL PRIMARY KEY,
|
||||
pricing_settings_id INTEGER NOT NULL REFERENCES pricing_settings(id) ON DELETE CASCADE,
|
||||
impurity_code VARCHAR(32) NOT NULL,
|
||||
threshold_ppm NUMERIC(14,4) DEFAULT 0.0000 NOT NULL,
|
||||
penalty_per_ppm NUMERIC(14,4) DEFAULT 0.0000 NOT NULL,
|
||||
notes TEXT,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now(),
|
||||
CONSTRAINT uq_pricing_impurity_settings_code UNIQUE (pricing_settings_id, impurity_code)
|
||||
);
|
||||
""",
|
||||
# core domain tables: projects, scenarios, financial_inputs, simulation_parameters
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS projects (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
location VARCHAR(255),
|
||||
operation_type miningoperationtype NOT NULL,
|
||||
description TEXT,
|
||||
pricing_settings_id INTEGER REFERENCES pricing_settings(id) ON DELETE SET NULL,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now(),
|
||||
CONSTRAINT uq_projects_name UNIQUE (name)
|
||||
);
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS scenarios (
|
||||
id SERIAL PRIMARY KEY,
|
||||
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
description TEXT,
|
||||
status scenariostatus NOT NULL,
|
||||
start_date DATE,
|
||||
end_date DATE,
|
||||
discount_rate NUMERIC(5,2),
|
||||
currency VARCHAR(3),
|
||||
primary_resource resourcetype,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now()
|
||||
);
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS financial_inputs (
|
||||
id SERIAL PRIMARY KEY,
|
||||
scenario_id INTEGER NOT NULL REFERENCES scenarios(id) ON DELETE CASCADE,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
category financialcategory NOT NULL,
|
||||
cost_bucket costbucket,
|
||||
amount NUMERIC(18,2) NOT NULL,
|
||||
currency VARCHAR(3),
|
||||
effective_date DATE,
|
||||
notes TEXT,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now()
|
||||
);
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS simulation_parameters (
|
||||
id SERIAL PRIMARY KEY,
|
||||
scenario_id INTEGER NOT NULL REFERENCES scenarios(id) ON DELETE CASCADE,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
distribution distributiontype NOT NULL,
|
||||
variable stochasticvariable,
|
||||
resource_type resourcetype,
|
||||
mean_value NUMERIC(18,4),
|
||||
standard_deviation NUMERIC(18,4),
|
||||
minimum_value NUMERIC(18,4),
|
||||
maximum_value NUMERIC(18,4),
|
||||
unit VARCHAR(32),
|
||||
configuration JSONB,
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now()
|
||||
);
|
||||
""",
|
||||
]
|
||||
|
||||
# Seeds
|
||||
DEFAULT_ROLES = [
|
||||
{"id": 1, "name": "admin", "display_name": "Administrator",
|
||||
"description": "Full platform access with user management rights."},
|
||||
{"id": 2, "name": "project_manager", "display_name": "Project Manager",
|
||||
"description": "Manage projects, scenarios, and associated data."},
|
||||
{"id": 3, "name": "analyst", "display_name": "Analyst",
|
||||
"description": "Review dashboards and scenario outputs."},
|
||||
{"id": 4, "name": "viewer", "display_name": "Viewer",
|
||||
"description": "Read-only access to assigned projects and reports."},
|
||||
]
|
||||
|
||||
DEFAULT_ADMIN = {"id": 1, "email": "admin@calminer.local", "username": "admin",
|
||||
"password": "ChangeMe123!", "is_active": True, "is_superuser": True}
|
||||
DEFAULT_PRICING = {
|
||||
"slug": "default",
|
||||
"name": "Default Pricing",
|
||||
"description": "Automatically generated default pricing settings.",
|
||||
"default_currency": "USD",
|
||||
"default_payable_pct": 100.0,
|
||||
"moisture_threshold_pct": 8.0,
|
||||
"moisture_penalty_per_pct": 0.0,
|
||||
}
|
||||
|
||||
|
||||
class ProjectSeed(BaseModel):
|
||||
name: str
|
||||
location: str | None = None
|
||||
operation_type: str
|
||||
description: str | None = None
|
||||
|
||||
|
||||
class ScenarioSeed(BaseModel):
|
||||
project_name: str
|
||||
name: str
|
||||
description: str | None = None
|
||||
status: str = "active"
|
||||
discount_rate: float | None = Field(default=None)
|
||||
currency: str | None = Field(default="USD")
|
||||
primary_resource: str | None = Field(default=None)
|
||||
|
||||
|
||||
class FinancialInputSeed(BaseModel):
|
||||
scenario_name: str
|
||||
project_name: str
|
||||
name: str
|
||||
category: str
|
||||
cost_bucket: str | None = None
|
||||
amount: Decimal
|
||||
currency: str = "USD"
|
||||
notes: str | None = None
|
||||
|
||||
|
||||
class RoleSeed(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
display_name: str
|
||||
description: Optional[str]
|
||||
|
||||
|
||||
class UserSeed(BaseModel):
|
||||
id: int
|
||||
email: str
|
||||
username: str
|
||||
password: str
|
||||
is_active: bool = True
|
||||
is_superuser: bool = False
|
||||
|
||||
@validator("password")
|
||||
def password_min_len(cls, v: str) -> str:
|
||||
if not v or len(v) < 8:
|
||||
raise ValueError("password must be at least 8 characters")
|
||||
return v
|
||||
|
||||
|
||||
class PricingSeed(BaseModel):
|
||||
slug: str
|
||||
name: str
|
||||
description: Optional[str]
|
||||
default_currency: Optional[str]
|
||||
default_payable_pct: float
|
||||
moisture_threshold_pct: float
|
||||
moisture_penalty_per_pct: float
|
||||
|
||||
|
||||
DEFAULT_PROJECTS: list[ProjectSeed] = [
|
||||
ProjectSeed(
|
||||
name="Helios Copper",
|
||||
location="Chile",
|
||||
operation_type="open_pit",
|
||||
description="Flagship open pit copper operation used for demos",
|
||||
),
|
||||
ProjectSeed(
|
||||
name="Luna Nickel",
|
||||
location="Australia",
|
||||
operation_type="underground",
|
||||
description="Underground nickel sulphide project with stochastic modelling",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
DEFAULT_SCENARIOS: list[ScenarioSeed] = [
|
||||
ScenarioSeed(
|
||||
project_name="Helios Copper",
|
||||
name="Base Case",
|
||||
description="Deterministic base case for Helios",
|
||||
status="active",
|
||||
discount_rate=8.0,
|
||||
primary_resource="diesel",
|
||||
),
|
||||
ScenarioSeed(
|
||||
project_name="Helios Copper",
|
||||
name="Expansion Case",
|
||||
description="Expansion scenario with increased throughput",
|
||||
status="draft",
|
||||
discount_rate=9.0,
|
||||
primary_resource="electricity",
|
||||
),
|
||||
ScenarioSeed(
|
||||
project_name="Luna Nickel",
|
||||
name="Feasibility",
|
||||
description="Feasibility scenario targeting steady state",
|
||||
status="active",
|
||||
discount_rate=10.0,
|
||||
primary_resource="electricity",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
DEFAULT_FINANCIAL_INPUTS: list[FinancialInputSeed] = [
|
||||
FinancialInputSeed(
|
||||
project_name="Helios Copper",
|
||||
scenario_name="Base Case",
|
||||
name="Initial Capital",
|
||||
category="capex",
|
||||
cost_bucket="capital_initial",
|
||||
amount=Decimal("450000000"),
|
||||
notes="Initial mine development costs",
|
||||
),
|
||||
FinancialInputSeed(
|
||||
project_name="Helios Copper",
|
||||
scenario_name="Base Case",
|
||||
name="Processing Opex",
|
||||
category="opex",
|
||||
cost_bucket="operating_variable",
|
||||
amount=Decimal("75000000"),
|
||||
notes="Annual processing operating expenditure",
|
||||
),
|
||||
FinancialInputSeed(
|
||||
project_name="Helios Copper",
|
||||
scenario_name="Expansion Case",
|
||||
name="Expansion Capital",
|
||||
category="capex",
|
||||
cost_bucket="capital_sustaining",
|
||||
amount=Decimal("120000000"),
|
||||
),
|
||||
FinancialInputSeed(
|
||||
project_name="Luna Nickel",
|
||||
scenario_name="Feasibility",
|
||||
name="Nickel Revenue",
|
||||
category="revenue",
|
||||
cost_bucket=None,
|
||||
amount=Decimal("315000000"),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def _get_database_url() -> str:
|
||||
# Prefer the same DATABASE_URL used by the application
|
||||
from config.database import DATABASE_URL
|
||||
|
||||
return DATABASE_URL
|
||||
|
||||
|
||||
def _create_engine(database_url: Optional[str] = None) -> Engine:
|
||||
database_url = database_url or _get_database_url()
|
||||
engine = create_engine(database_url, future=True)
|
||||
return engine
|
||||
|
||||
|
||||
def _create_enum_if_missing_sql(type_name: str, values: List[str]) -> str:
|
||||
# Use a DO block to safely create the enum only if it is missing
|
||||
vals = ", ".join(f"'{v}'" for v in values)
|
||||
sql = (
|
||||
"DO $$ BEGIN "
|
||||
f"IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = '{type_name}') THEN "
|
||||
f"CREATE TYPE {type_name} AS ENUM ({vals}); "
|
||||
"END IF; END $$;"
|
||||
)
|
||||
return sql
|
||||
|
||||
|
||||
def ensure_enums(engine: Engine) -> None:
|
||||
with engine.begin() as conn:
|
||||
for name, vals in ENUM_DEFINITIONS.items():
|
||||
sql = _create_enum_if_missing_sql(name, vals)
|
||||
logger.debug("Ensuring enum %s: %s", name, sql)
|
||||
conn.execute(text(sql))
|
||||
|
||||
|
||||
def ensure_tables(engine: Engine) -> None:
|
||||
with engine.begin() as conn:
|
||||
for ddl in TABLE_DDLS:
|
||||
logger.debug("Executing DDL:\n%s", ddl)
|
||||
conn.execute(text(ddl))
|
||||
|
||||
|
||||
def seed_roles(engine: Engine) -> None:
|
||||
with engine.begin() as conn:
|
||||
for r in DEFAULT_ROLES:
|
||||
seed = RoleSeed(**r)
|
||||
conn.execute(
|
||||
text(
|
||||
"INSERT INTO roles (id, name, display_name, description) VALUES (:id, :name, :display_name, :description) "
|
||||
"ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, display_name = EXCLUDED.display_name, description = EXCLUDED.description"
|
||||
),
|
||||
dict(id=seed.id, name=seed.name,
|
||||
display_name=seed.display_name, description=seed.description),
|
||||
)
|
||||
|
||||
|
||||
def seed_admin_user(engine: Engine) -> None:
|
||||
with engine.begin() as conn:
|
||||
# Use environment-configured admin settings when present so initializer
|
||||
# aligns with the application's bootstrap configuration.
|
||||
admin_email = os.getenv(
|
||||
"CALMINER_SEED_ADMIN_EMAIL", DEFAULT_ADMIN["email"])
|
||||
admin_username = os.getenv(
|
||||
"CALMINER_SEED_ADMIN_USERNAME", DEFAULT_ADMIN["username"])
|
||||
admin_password = os.getenv(
|
||||
"CALMINER_SEED_ADMIN_PASSWORD", DEFAULT_ADMIN["password"])
|
||||
u = UserSeed(
|
||||
id=DEFAULT_ADMIN.get("id", 1),
|
||||
email=admin_email,
|
||||
username=admin_username,
|
||||
password=admin_password,
|
||||
is_active=DEFAULT_ADMIN.get("is_active", True),
|
||||
is_superuser=DEFAULT_ADMIN.get("is_superuser", True),
|
||||
)
|
||||
password_hash = password_context.hash(u.password)
|
||||
# Upsert by username to avoid conflicting with different admin email configs
|
||||
conn.execute(
|
||||
text(
|
||||
"INSERT INTO users (email, username, password_hash, is_active, is_superuser) "
|
||||
"VALUES (:email, :username, :password_hash, :is_active, :is_superuser) "
|
||||
"ON CONFLICT (username) DO UPDATE SET email = EXCLUDED.email, password_hash = EXCLUDED.password_hash, is_active = EXCLUDED.is_active, is_superuser = EXCLUDED.is_superuser"
|
||||
),
|
||||
dict(email=u.email, username=u.username, password_hash=password_hash,
|
||||
is_active=u.is_active, is_superuser=u.is_superuser),
|
||||
)
|
||||
# ensure admin has admin role
|
||||
# Resolve user_id for role assignment: select by username
|
||||
row = conn.execute(text("SELECT id FROM users WHERE username = :username"), dict(
|
||||
username=u.username)).fetchone()
|
||||
if row is not None:
|
||||
user_id = row.id
|
||||
else:
|
||||
user_id = None
|
||||
if user_id is not None:
|
||||
conn.execute(
|
||||
text(
|
||||
"INSERT INTO user_roles (user_id, role_id, granted_by) VALUES (:user_id, :role_id, :granted_by) "
|
||||
"ON CONFLICT (user_id, role_id) DO NOTHING"
|
||||
),
|
||||
dict(user_id=user_id, role_id=1, granted_by=user_id),
|
||||
)
|
||||
|
||||
|
||||
def ensure_default_pricing(engine: Engine) -> None:
|
||||
with engine.begin() as conn:
|
||||
p = PricingSeed(**DEFAULT_PRICING)
|
||||
# Try insert on slug conflict
|
||||
conn.execute(
|
||||
text(
|
||||
"INSERT INTO pricing_settings (slug, name, description, default_currency, default_payable_pct, moisture_threshold_pct, moisture_penalty_per_pct) "
|
||||
"VALUES (:slug, :name, :description, :default_currency, :default_payable_pct, :moisture_threshold_pct, :moisture_penalty_per_pct) "
|
||||
"ON CONFLICT (slug) DO UPDATE SET name = EXCLUDED.name"
|
||||
),
|
||||
dict(
|
||||
slug=p.slug,
|
||||
name=p.name,
|
||||
description=p.description,
|
||||
default_currency=p.default_currency,
|
||||
default_payable_pct=p.default_payable_pct,
|
||||
moisture_threshold_pct=p.moisture_threshold_pct,
|
||||
moisture_penalty_per_pct=p.moisture_penalty_per_pct,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _project_id_by_name(conn, project_name: str) -> Optional[int]:
|
||||
row = conn.execute(
|
||||
text("SELECT id FROM projects WHERE name = :name"),
|
||||
{"name": project_name},
|
||||
).fetchone()
|
||||
return row.id if row else None
|
||||
|
||||
|
||||
def ensure_default_projects(engine: Engine) -> None:
|
||||
with engine.begin() as conn:
|
||||
for project in DEFAULT_PROJECTS:
|
||||
conn.execute(
|
||||
text(
|
||||
"""
|
||||
INSERT INTO projects (name, location, operation_type, description)
|
||||
VALUES (:name, :location, :operation_type, :description)
|
||||
ON CONFLICT (name) DO UPDATE SET
|
||||
location = EXCLUDED.location,
|
||||
operation_type = EXCLUDED.operation_type,
|
||||
description = EXCLUDED.description
|
||||
"""
|
||||
),
|
||||
project.model_dump(),
|
||||
)
|
||||
|
||||
|
||||
def ensure_default_scenarios(engine: Engine) -> None:
|
||||
with engine.begin() as conn:
|
||||
for scenario in DEFAULT_SCENARIOS:
|
||||
project_id = _project_id_by_name(conn, scenario.project_name)
|
||||
if project_id is None:
|
||||
logger.warning(
|
||||
"Skipping scenario seed '%s' because project '%s' does not exist",
|
||||
scenario.name,
|
||||
scenario.project_name,
|
||||
)
|
||||
continue
|
||||
|
||||
payload = scenario.model_dump(exclude={"project_name"})
|
||||
payload.update({"project_id": project_id})
|
||||
conn.execute(
|
||||
text(
|
||||
"""
|
||||
INSERT INTO scenarios (
|
||||
project_id, name, description, status, discount_rate,
|
||||
currency, primary_resource
|
||||
)
|
||||
VALUES (
|
||||
:project_id, :name, :description, CAST(:status AS scenariostatus),
|
||||
:discount_rate, :currency,
|
||||
CASE WHEN :primary_resource IS NULL
|
||||
THEN NULL
|
||||
ELSE CAST(:primary_resource AS resourcetype)
|
||||
END
|
||||
)
|
||||
ON CONFLICT (project_id, name) DO UPDATE SET
|
||||
description = EXCLUDED.description,
|
||||
status = EXCLUDED.status,
|
||||
discount_rate = EXCLUDED.discount_rate,
|
||||
currency = EXCLUDED.currency,
|
||||
primary_resource = EXCLUDED.primary_resource
|
||||
"""
|
||||
),
|
||||
payload,
|
||||
)
|
||||
|
||||
|
||||
def ensure_default_financial_inputs(engine: Engine) -> None:
|
||||
with engine.begin() as conn:
|
||||
for item in DEFAULT_FINANCIAL_INPUTS:
|
||||
project_id = _project_id_by_name(conn, item.project_name)
|
||||
if project_id is None:
|
||||
logger.warning(
|
||||
"Skipping financial input '%s'; project '%s' missing",
|
||||
item.name,
|
||||
item.project_name,
|
||||
)
|
||||
continue
|
||||
|
||||
scenario_row = conn.execute(
|
||||
text(
|
||||
"SELECT id FROM scenarios WHERE project_id = :project_id AND name = :name"
|
||||
),
|
||||
{"project_id": project_id, "name": item.scenario_name},
|
||||
).fetchone()
|
||||
if scenario_row is None:
|
||||
logger.warning(
|
||||
"Skipping financial input '%s'; scenario '%s' missing for project '%s'",
|
||||
item.name,
|
||||
item.scenario_name,
|
||||
item.project_name,
|
||||
)
|
||||
continue
|
||||
|
||||
payload = item.model_dump(
|
||||
exclude={"project_name", "scenario_name"},
|
||||
)
|
||||
payload.update({"scenario_id": scenario_row.id})
|
||||
conn.execute(
|
||||
text(
|
||||
"""
|
||||
INSERT INTO financial_inputs (
|
||||
scenario_id, name, category, cost_bucket, amount, currency, notes
|
||||
)
|
||||
VALUES (
|
||||
:scenario_id, :name, CAST(:category AS financialcategory),
|
||||
CASE WHEN :cost_bucket IS NULL THEN NULL
|
||||
ELSE CAST(:cost_bucket AS costbucket)
|
||||
END,
|
||||
:amount,
|
||||
:currency,
|
||||
:notes
|
||||
)
|
||||
ON CONFLICT (scenario_id, name) DO UPDATE SET
|
||||
category = EXCLUDED.category,
|
||||
cost_bucket = EXCLUDED.cost_bucket,
|
||||
amount = EXCLUDED.amount,
|
||||
currency = EXCLUDED.currency,
|
||||
notes = EXCLUDED.notes
|
||||
"""
|
||||
),
|
||||
payload,
|
||||
)
|
||||
|
||||
|
||||
def init_db(database_url: Optional[str] = None) -> None:
|
||||
"""Run the idempotent initialization sequence.
|
||||
|
||||
Steps:
|
||||
- Ensure enum types exist.
|
||||
- Ensure required tables exist.
|
||||
- Seed roles and admin user.
|
||||
- Ensure default pricing settings record exists.
|
||||
- Seed sample projects, scenarios, and financial inputs.
|
||||
"""
|
||||
engine = _create_engine(database_url)
|
||||
logger.info("Starting DB initialization using engine=%s", engine)
|
||||
ensure_enums(engine)
|
||||
ensure_tables(engine)
|
||||
seed_roles(engine)
|
||||
seed_admin_user(engine)
|
||||
ensure_default_pricing(engine)
|
||||
ensure_default_projects(engine)
|
||||
ensure_default_scenarios(engine)
|
||||
ensure_default_financial_inputs(engine)
|
||||
logger.info("DB initialization complete")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Allow running manually: python -m scripts.init_db
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
init_db()
|
||||
91
scripts/reset_db.py
Normal file
91
scripts/reset_db.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Utility to reset development Postgres schema artifacts.
|
||||
|
||||
This script drops managed tables and enum types created by `scripts.init_db`.
|
||||
It is intended for local development only; it refuses to run if CALMINER_ENV
|
||||
indicates production or staging. The operation is idempotent: missing objects
|
||||
are ignored. Use with caution.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
from config.database import DATABASE_URL
|
||||
from scripts.init_db import ENUM_DEFINITIONS, _create_engine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ResetOptions:
|
||||
drop_tables: bool = True
|
||||
drop_enums: bool = True
|
||||
|
||||
|
||||
MANAGED_TABLES: tuple[str, ...] = (
|
||||
"simulation_parameters",
|
||||
"financial_inputs",
|
||||
"scenarios",
|
||||
"projects",
|
||||
"pricing_impurity_settings",
|
||||
"pricing_metal_settings",
|
||||
"pricing_settings",
|
||||
"user_roles",
|
||||
"users",
|
||||
"roles",
|
||||
)
|
||||
|
||||
|
||||
FORBIDDEN_ENVIRONMENTS: set[str] = {"production", "staging", "prod", "stage"}
|
||||
|
||||
|
||||
def _ensure_safe_environment() -> None:
|
||||
env = os.getenv("CALMINER_ENV", "development").lower()
|
||||
if env in FORBIDDEN_ENVIRONMENTS:
|
||||
raise RuntimeError(
|
||||
f"Refusing to reset database in environment '{env}'. "
|
||||
"Set CALMINER_ENV to 'development' to proceed."
|
||||
)
|
||||
|
||||
|
||||
def _drop_tables(engine: Engine, tables: Iterable[str]) -> None:
|
||||
if not tables:
|
||||
return
|
||||
with engine.begin() as conn:
|
||||
for table in tables:
|
||||
logger.info("Dropping table if exists: %s", table)
|
||||
conn.execute(text(f"DROP TABLE IF EXISTS {table} CASCADE"))
|
||||
|
||||
|
||||
def _drop_enums(engine: Engine, enum_names: Iterable[str]) -> None:
|
||||
if not enum_names:
|
||||
return
|
||||
with engine.begin() as conn:
|
||||
for enum_name in enum_names:
|
||||
logger.info("Dropping enum type if exists: %s", enum_name)
|
||||
conn.execute(text(f"DROP TYPE IF EXISTS {enum_name} CASCADE"))
|
||||
|
||||
|
||||
def reset_database(*, options: ResetOptions | None = None, database_url: str | None = None) -> None:
|
||||
"""Drop managed tables and enums for a clean slate."""
|
||||
_ensure_safe_environment()
|
||||
opts = options or ResetOptions()
|
||||
engine = _create_engine(database_url or DATABASE_URL)
|
||||
|
||||
if opts.drop_tables:
|
||||
_drop_tables(engine, MANAGED_TABLES)
|
||||
|
||||
if opts.drop_enums:
|
||||
_drop_enums(engine, ENUM_DEFINITIONS.keys())
|
||||
|
||||
logger.info("Database reset complete")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
reset_database()
|
||||
86
scripts/verify_db.py
Normal file
86
scripts/verify_db.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Verify DB initialization results: enums, roles, admin user, pricing_settings."""
|
||||
from __future__ import annotations
|
||||
import logging
|
||||
from sqlalchemy import create_engine, text
|
||||
from config.database import DATABASE_URL
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ENUMS = [
|
||||
'miningoperationtype',
|
||||
'scenariostatus',
|
||||
'financialcategory',
|
||||
'costbucket',
|
||||
'distributiontype',
|
||||
'stochasticvariable',
|
||||
'resourcetype',
|
||||
]
|
||||
|
||||
SQL_CHECK_ENUM = "SELECT typname FROM pg_type WHERE typname = ANY(:names)"
|
||||
SQL_ROLES = "SELECT id, name, display_name FROM roles ORDER BY id"
|
||||
SQL_ADMIN = "SELECT id, email, username, is_active, is_superuser FROM users WHERE id = 1"
|
||||
SQL_USER_ROLES = "SELECT user_id, role_id, granted_by FROM user_roles WHERE user_id = 1"
|
||||
SQL_PRICING = "SELECT id, slug, name, default_currency FROM pricing_settings WHERE slug = 'default'"
|
||||
|
||||
|
||||
def run():
|
||||
engine = create_engine(DATABASE_URL, future=True)
|
||||
with engine.connect() as conn:
|
||||
print('Using DATABASE_URL:', DATABASE_URL)
|
||||
# enums
|
||||
res = conn.execute(text(SQL_CHECK_ENUM), dict(names=ENUMS)).fetchall()
|
||||
found = [r[0] for r in res]
|
||||
print('\nEnums found:')
|
||||
for name in ENUMS:
|
||||
print(f' {name}:', 'YES' if name in found else 'NO')
|
||||
|
||||
# roles
|
||||
try:
|
||||
roles = conn.execute(text(SQL_ROLES)).fetchall()
|
||||
print('\nRoles:')
|
||||
if roles:
|
||||
for r in roles:
|
||||
print(f' id={r.id} name={r.name} display_name={r.display_name}')
|
||||
else:
|
||||
print(' (no roles found)')
|
||||
except Exception as e:
|
||||
print('\nRoles query failed:', e)
|
||||
|
||||
# admin user
|
||||
try:
|
||||
admin = conn.execute(text(SQL_ADMIN)).fetchone()
|
||||
print('\nAdmin user:')
|
||||
if admin:
|
||||
print(f' id={admin.id} email={admin.email} username={admin.username} is_active={admin.is_active} is_superuser={admin.is_superuser}')
|
||||
else:
|
||||
print(' (admin user not found)')
|
||||
except Exception as e:
|
||||
print('\nAdmin query failed:', e)
|
||||
|
||||
# user_roles
|
||||
try:
|
||||
ur = conn.execute(text(SQL_USER_ROLES)).fetchall()
|
||||
print('\nUser roles for user_id=1:')
|
||||
if ur:
|
||||
for row in ur:
|
||||
print(f' user_id={row.user_id} role_id={row.role_id} granted_by={row.granted_by}')
|
||||
else:
|
||||
print(' (no user_roles rows for user_id=1)')
|
||||
except Exception as e:
|
||||
print('\nUser_roles query failed:', e)
|
||||
|
||||
# pricing settings
|
||||
try:
|
||||
p = conn.execute(text(SQL_PRICING)).fetchone()
|
||||
print('\nPricing settings (slug=default):')
|
||||
if p:
|
||||
print(f' id={p.id} slug={p.slug} name={p.name} default_currency={p.default_currency}')
|
||||
else:
|
||||
print(' (default pricing settings not found)')
|
||||
except Exception as e:
|
||||
print('\nPricing query failed:', e)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
Reference in New Issue
Block a user