13 Commits

Author SHA1 Message Date
acf6f50bbd feat: Add NPV comparison and distribution charts to reporting
Some checks failed
CI / lint (push) Successful in 15s
CI / build (push) Has been skipped
CI / test (push) Failing after 17s
CI / deploy (push) Has been skipped
- Implemented NPV comparison chart generation using Plotly in ReportingService.
- Added distribution histogram for Monte Carlo results.
- Updated reporting templates to include new charts and improved layout.
- Created new settings and currencies management pages.
- Enhanced sidebar navigation with dynamic URL handling.
- Improved CSS styles for chart containers and overall layout.
- Added new simulation and theme settings pages with placeholders for future features.
2025-11-12 19:39:27 +01:00
ad306bd0aa feat: Refactor database initialization for SQLite compatibility 2025-11-12 18:30:35 +01:00
ed4187970c feat: Implement SQLite support with environment-driven backend switching 2025-11-12 18:29:49 +01:00
0fbe9f543e fix: Update .gitignore to include additional SQLite database files 2025-11-12 18:21:39 +01:00
80825c2c5d chore: Update changelog with recent verification and documentation updates 2025-11-12 18:17:09 +01:00
44a3bfc1bf fix: Remove unnecessary 'uvicorn' command from docker-compose.override.yml 2025-11-12 18:17:04 +01:00
1f892ebdbb feat: Implement SQLAlchemy enum helper and normalize enum values in database initialization 2025-11-12 18:11:19 +01:00
bcdc9e861e feat: Enhance CSS with custom properties for theming and layout adjustments 2025-11-12 18:11:02 +01:00
23523f70f1 feat: Add comprehensive tests for database initialization and seeding 2025-11-12 16:38:20 +01:00
8ef6724960 feat: Add database initialization, reset, and verification scripts 2025-11-12 16:30:17 +01:00
6e466a3fd2 Refactor database initialization and remove Alembic migrations
- Removed legacy Alembic migration files and consolidated schema management into a new Pydantic-backed initializer (`scripts/init_db.py`).
- Updated `main.py` to ensure the new DB initializer runs on startup, maintaining idempotency.
- Adjusted session management in `config/database.py` to prevent DetachedInstanceError.
- Introduced new enums in `models/enums.py` for better organization and clarity.
- Refactored various models to utilize the new enums, improving code maintainability.
- Enhanced middleware to handle JSON validation more robustly, ensuring non-JSON requests do not trigger JSON errors.
- Added tests for middleware and enums to ensure expected behavior and consistency.
- Updated changelog to reflect significant changes and improvements.
2025-11-12 16:29:44 +01:00
9d4c807475 feat: Update logo images in footer and header templates 2025-11-12 16:00:11 +01:00
9cd555e134 feat: Add pre-commit configuration for code quality tools 2025-11-12 12:07:39 +01:00
54 changed files with 2737 additions and 1736 deletions

2
.gitignore vendored
View File

@@ -47,8 +47,10 @@ htmlcov/
logs/
# SQLite database
data/
*.sqlite3
test*.db
local*.db
# Act runner files
.runner

13
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,13 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.1
hooks:
- id: ruff
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.8.0
hooks:
- id: black
- repo: https://github.com/PyCQA/bandit
rev: 1.7.9
hooks:
- id: bandit

View File

@@ -102,13 +102,12 @@ RUN pip install --upgrade pip \
COPY . /app
RUN chown -R appuser:app /app \
&& chmod +x /app/scripts/docker-entrypoint.sh
RUN chown -R appuser:app /app
USER appuser
EXPOSE 8003
ENTRYPOINT ["/app/scripts/docker-entrypoint.sh"]
ENTRYPOINT ["uvicorn"]
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8003", "--workers", "4"]
CMD ["main:app", "--host", "0.0.0.0", "--port", "8003", "--workers", "4"]

View File

@@ -8,4 +8,6 @@ The system is designed to help mining companies make informed decisions by simul
## Documentation & quickstart
This repository contains only code. See detailed developer and architecture documentation in the [Docs](https://git.allucanget.biz/allucanget/calminer-docs) repository.
- Detailed developer, architecture, and operations guides live in the companion [calminer-docs](../calminer-docs/) repository.
- For a local run, create a `.env` (see `.env.example`), install requirements, then execute `python -m scripts.init_db` followed by `uvicorn main:app --reload`. The initializer is safe to rerun and seeds demo data automatically.
- To wipe and recreate the schema in development, run `CALMINER_ENV=development python -m scripts.reset_db` before invoking the initializer again.

View File

@@ -1,35 +0,0 @@
[alembic]
script_location = alembic
sqlalchemy.url = %(DATABASE_URL)s
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s

View File

@@ -1,62 +0,0 @@
from __future__ import annotations
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
from config.database import Base, DATABASE_URL
from models import * # noqa: F401,F403 - ensure models are imported for metadata registration
# this is the Alembic Config object, which provides access to the values within the .ini file.
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Interpret the config file for Python logging.
# This line sets up loggers basically.
config.set_main_option("sqlalchemy.url", DATABASE_URL)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
def run_migrations() -> None:
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
run_migrations()

View File

@@ -1,17 +0,0 @@
"""${message}"""
revision = ${repr(revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -1,718 +0,0 @@
"""Combined initial schema"""
from __future__ import annotations
from datetime import datetime, timezone
from alembic import op
import sqlalchemy as sa
from passlib.context import CryptContext
from sqlalchemy.sql import column, table
# revision identifiers, used by Alembic.
revision = "20251111_00"
down_revision = None
branch_labels = None
depends_on = None
password_context = CryptContext(schemes=["argon2"], deprecated="auto")
mining_operation_type = sa.Enum(
"open_pit",
"underground",
"in_situ_leach",
"placer",
"quarry",
"mountaintop_removal",
"other",
name="miningoperationtype",
)
scenario_status = sa.Enum(
"draft",
"active",
"archived",
name="scenariostatus",
)
financial_category = sa.Enum(
"capex",
"opex",
"revenue",
"contingency",
"other",
name="financialcategory",
)
cost_bucket = sa.Enum(
"capital_initial",
"capital_sustaining",
"operating_fixed",
"operating_variable",
"maintenance",
"reclamation",
"royalties",
"general_admin",
name="costbucket",
)
distribution_type = sa.Enum(
"normal",
"triangular",
"uniform",
"lognormal",
"custom",
name="distributiontype",
)
stochastic_variable = sa.Enum(
"ore_grade",
"recovery_rate",
"metal_price",
"operating_cost",
"capital_cost",
"discount_rate",
"throughput",
name="stochasticvariable",
)
resource_type = sa.Enum(
"diesel",
"electricity",
"water",
"explosives",
"reagents",
"labor",
"equipment_hours",
"tailings_capacity",
name="resourcetype",
)
DEFAULT_PRICING_SLUG = "default"
def _ensure_default_pricing_settings(connection) -> int:
settings_table = table(
"pricing_settings",
column("id", sa.Integer()),
column("slug", sa.String()),
column("name", sa.String()),
column("description", sa.Text()),
column("default_currency", sa.String()),
column("default_payable_pct", sa.Numeric()),
column("moisture_threshold_pct", sa.Numeric()),
column("moisture_penalty_per_pct", sa.Numeric()),
column("created_at", sa.DateTime(timezone=True)),
column("updated_at", sa.DateTime(timezone=True)),
)
existing = connection.execute(
sa.select(settings_table.c.id).where(
settings_table.c.slug == DEFAULT_PRICING_SLUG
)
).scalar_one_or_none()
if existing is not None:
return existing
now = datetime.now(timezone.utc)
insert_stmt = settings_table.insert().values(
slug=DEFAULT_PRICING_SLUG,
name="Default Pricing",
description="Automatically generated default pricing settings.",
default_currency="USD",
default_payable_pct=100.0,
moisture_threshold_pct=8.0,
moisture_penalty_per_pct=0.0,
created_at=now,
updated_at=now,
)
result = connection.execute(insert_stmt)
default_id = result.inserted_primary_key[0]
if default_id is None:
default_id = connection.execute(
sa.select(settings_table.c.id).where(
settings_table.c.slug == DEFAULT_PRICING_SLUG
)
).scalar_one()
return default_id
def upgrade() -> None:
bind = op.get_bind()
# Enumerations
mining_operation_type.create(bind, checkfirst=True)
scenario_status.create(bind, checkfirst=True)
financial_category.create(bind, checkfirst=True)
cost_bucket.create(bind, checkfirst=True)
distribution_type.create(bind, checkfirst=True)
stochastic_variable.create(bind, checkfirst=True)
resource_type.create(bind, checkfirst=True)
# Pricing settings core tables
op.create_table(
"pricing_settings",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("name", sa.String(length=128), nullable=False),
sa.Column("slug", sa.String(length=64), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("default_currency", sa.String(length=3), nullable=True),
sa.Column(
"default_payable_pct",
sa.Numeric(precision=5, scale=2),
nullable=False,
server_default=sa.text("100.00"),
),
sa.Column(
"moisture_threshold_pct",
sa.Numeric(precision=5, scale=2),
nullable=False,
server_default=sa.text("8.00"),
),
sa.Column(
"moisture_penalty_per_pct",
sa.Numeric(precision=14, scale=4),
nullable=False,
server_default=sa.text("0.0000"),
),
sa.Column("metadata", sa.JSON(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.UniqueConstraint("name", name="uq_pricing_settings_name"),
sa.UniqueConstraint("slug", name="uq_pricing_settings_slug"),
)
op.create_index(
op.f("ix_pricing_settings_id"),
"pricing_settings",
["id"],
unique=False,
)
op.create_table(
"pricing_metal_settings",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"pricing_settings_id",
sa.Integer(),
sa.ForeignKey("pricing_settings.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("metal_code", sa.String(length=32), nullable=False),
sa.Column("payable_pct", sa.Numeric(
precision=5, scale=2), nullable=True),
sa.Column(
"moisture_threshold_pct",
sa.Numeric(precision=5, scale=2),
nullable=True,
),
sa.Column(
"moisture_penalty_per_pct",
sa.Numeric(precision=14, scale=4),
nullable=True,
),
sa.Column("data", sa.JSON(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.UniqueConstraint(
"pricing_settings_id",
"metal_code",
name="uq_pricing_metal_settings_code",
),
)
op.create_index(
op.f("ix_pricing_metal_settings_id"),
"pricing_metal_settings",
["id"],
unique=False,
)
op.create_index(
op.f("ix_pricing_metal_settings_pricing_settings_id"),
"pricing_metal_settings",
["pricing_settings_id"],
unique=False,
)
op.create_table(
"pricing_impurity_settings",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"pricing_settings_id",
sa.Integer(),
sa.ForeignKey("pricing_settings.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("impurity_code", sa.String(length=32), nullable=False),
sa.Column(
"threshold_ppm",
sa.Numeric(precision=14, scale=4),
nullable=False,
server_default=sa.text("0.0000"),
),
sa.Column(
"penalty_per_ppm",
sa.Numeric(precision=14, scale=4),
nullable=False,
server_default=sa.text("0.0000"),
),
sa.Column("notes", sa.Text(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.UniqueConstraint(
"pricing_settings_id",
"impurity_code",
name="uq_pricing_impurity_settings_code",
),
)
op.create_index(
op.f("ix_pricing_impurity_settings_id"),
"pricing_impurity_settings",
["id"],
unique=False,
)
op.create_index(
op.f("ix_pricing_impurity_settings_pricing_settings_id"),
"pricing_impurity_settings",
["pricing_settings_id"],
unique=False,
)
# Core domain tables
op.create_table(
"projects",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=255), nullable=False),
sa.Column("location", sa.String(length=255), nullable=True),
sa.Column("operation_type", mining_operation_type, nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column(
"pricing_settings_id",
sa.Integer(),
sa.ForeignKey("pricing_settings.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("name"),
)
op.create_index(op.f("ix_projects_id"), "projects", ["id"], unique=False)
op.create_index(
"ix_projects_pricing_settings_id",
"projects",
["pricing_settings_id"],
unique=False,
)
op.create_table(
"scenarios",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("project_id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=255), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("status", scenario_status, nullable=False),
sa.Column("start_date", sa.Date(), nullable=True),
sa.Column("end_date", sa.Date(), nullable=True),
sa.Column("discount_rate", sa.Numeric(
precision=5, scale=2), nullable=True),
sa.Column("currency", sa.String(length=3), nullable=True),
sa.Column("primary_resource", resource_type, nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.ForeignKeyConstraint(
["project_id"], ["projects.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_scenarios_id"), "scenarios", ["id"], unique=False)
op.create_index(
op.f("ix_scenarios_project_id"),
"scenarios",
["project_id"],
unique=False,
)
op.create_table(
"financial_inputs",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("scenario_id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=255), nullable=False),
sa.Column("category", financial_category, nullable=False),
sa.Column("cost_bucket", cost_bucket, nullable=True),
sa.Column("amount", sa.Numeric(precision=18, scale=2), nullable=False),
sa.Column("currency", sa.String(length=3), nullable=True),
sa.Column("effective_date", sa.Date(), nullable=True),
sa.Column("notes", sa.Text(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.ForeignKeyConstraint(
["scenario_id"], ["scenarios.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_financial_inputs_id"),
"financial_inputs",
["id"],
unique=False,
)
op.create_index(
op.f("ix_financial_inputs_scenario_id"),
"financial_inputs",
["scenario_id"],
unique=False,
)
op.create_table(
"simulation_parameters",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("scenario_id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=255), nullable=False),
sa.Column("distribution", distribution_type, nullable=False),
sa.Column("variable", stochastic_variable, nullable=True),
sa.Column("resource_type", resource_type, nullable=True),
sa.Column("mean_value", sa.Numeric(
precision=18, scale=4), nullable=True),
sa.Column(
"standard_deviation",
sa.Numeric(precision=18, scale=4),
nullable=True,
),
sa.Column(
"minimum_value",
sa.Numeric(precision=18, scale=4),
nullable=True,
),
sa.Column(
"maximum_value",
sa.Numeric(precision=18, scale=4),
nullable=True,
),
sa.Column("unit", sa.String(length=32), nullable=True),
sa.Column("configuration", sa.JSON(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.ForeignKeyConstraint(
["scenario_id"], ["scenarios.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_simulation_parameters_id"),
"simulation_parameters",
["id"],
unique=False,
)
op.create_index(
op.f("ix_simulation_parameters_scenario_id"),
"simulation_parameters",
["scenario_id"],
unique=False,
)
# Authentication and RBAC tables
op.create_table(
"users",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("email", sa.String(length=255), nullable=False),
sa.Column("username", sa.String(length=128), nullable=False),
sa.Column("password_hash", sa.String(length=255), nullable=False),
sa.Column("is_active", sa.Boolean(),
nullable=False, server_default=sa.true()),
sa.Column(
"is_superuser",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
sa.Column("last_login_at", sa.DateTime(timezone=True), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.UniqueConstraint("email", name="uq_users_email"),
sa.UniqueConstraint("username", name="uq_users_username"),
)
op.create_index(
"ix_users_active_superuser",
"users",
["is_active", "is_superuser"],
unique=False,
)
op.create_table(
"roles",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("name", sa.String(length=64), nullable=False),
sa.Column("display_name", sa.String(length=128), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.UniqueConstraint("name", name="uq_roles_name"),
)
op.create_table(
"user_roles",
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("role_id", sa.Integer(), nullable=False),
sa.Column(
"granted_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column("granted_by", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["role_id"], ["roles.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(
["granted_by"], ["users.id"], ondelete="SET NULL"),
sa.PrimaryKeyConstraint("user_id", "role_id"),
sa.UniqueConstraint("user_id", "role_id",
name="uq_user_roles_user_role"),
)
op.create_index(
"ix_user_roles_role_id",
"user_roles",
["role_id"],
unique=False,
)
# Seed roles and default admin
roles_table = table(
"roles",
column("id", sa.Integer()),
column("name", sa.String()),
column("display_name", sa.String()),
column("description", sa.Text()),
)
op.bulk_insert(
roles_table,
[
{
"id": 1,
"name": "admin",
"display_name": "Administrator",
"description": "Full platform access with user management rights.",
},
{
"id": 2,
"name": "project_manager",
"display_name": "Project Manager",
"description": "Manage projects, scenarios, and associated data.",
},
{
"id": 3,
"name": "analyst",
"display_name": "Analyst",
"description": "Review dashboards and scenario outputs.",
},
{
"id": 4,
"name": "viewer",
"display_name": "Viewer",
"description": "Read-only access to assigned projects and reports.",
},
],
)
admin_password_hash = password_context.hash("ChangeMe123!")
users_table = table(
"users",
column("id", sa.Integer()),
column("email", sa.String()),
column("username", sa.String()),
column("password_hash", sa.String()),
column("is_active", sa.Boolean()),
column("is_superuser", sa.Boolean()),
)
op.bulk_insert(
users_table,
[
{
"id": 1,
"email": "admin@calminer.local",
"username": "admin",
"password_hash": admin_password_hash,
"is_active": True,
"is_superuser": True,
}
],
)
user_roles_table = table(
"user_roles",
column("user_id", sa.Integer()),
column("role_id", sa.Integer()),
column("granted_by", sa.Integer()),
)
op.bulk_insert(
user_roles_table,
[
{
"user_id": 1,
"role_id": 1,
"granted_by": 1,
}
],
)
# Ensure a default pricing settings record exists for future project linkage
_ensure_default_pricing_settings(bind)
def downgrade() -> None:
# Drop RBAC
op.drop_index("ix_user_roles_role_id", table_name="user_roles")
op.drop_table("user_roles")
op.drop_table("roles")
op.drop_index("ix_users_active_superuser", table_name="users")
op.drop_table("users")
# Drop domain tables
op.drop_index(
op.f("ix_simulation_parameters_scenario_id"),
table_name="simulation_parameters",
)
op.drop_index(op.f("ix_simulation_parameters_id"),
table_name="simulation_parameters")
op.drop_table("simulation_parameters")
op.drop_index(
op.f("ix_financial_inputs_scenario_id"), table_name="financial_inputs"
)
op.drop_index(op.f("ix_financial_inputs_id"),
table_name="financial_inputs")
op.drop_table("financial_inputs")
op.drop_index(op.f("ix_scenarios_project_id"), table_name="scenarios")
op.drop_index(op.f("ix_scenarios_id"), table_name="scenarios")
op.drop_table("scenarios")
op.drop_index("ix_projects_pricing_settings_id", table_name="projects")
op.drop_index(op.f("ix_projects_id"), table_name="projects")
op.drop_table("projects")
# Drop pricing settings ancillary tables
op.drop_index(
op.f("ix_pricing_impurity_settings_pricing_settings_id"),
table_name="pricing_impurity_settings",
)
op.drop_index(
op.f("ix_pricing_impurity_settings_id"),
table_name="pricing_impurity_settings",
)
op.drop_table("pricing_impurity_settings")
op.drop_index(
op.f("ix_pricing_metal_settings_pricing_settings_id"),
table_name="pricing_metal_settings",
)
op.drop_index(
op.f("ix_pricing_metal_settings_id"),
table_name="pricing_metal_settings",
)
op.drop_table("pricing_metal_settings")
op.drop_index(op.f("ix_pricing_settings_id"),
table_name="pricing_settings")
op.drop_table("pricing_settings")
# Drop enumerations
resource_type.drop(op.get_bind(), checkfirst=True)
stochastic_variable.drop(op.get_bind(), checkfirst=True)
distribution_type.drop(op.get_bind(), checkfirst=True)
cost_bucket.drop(op.get_bind(), checkfirst=True)
financial_category.drop(op.get_bind(), checkfirst=True)
scenario_status.drop(op.get_bind(), checkfirst=True)
mining_operation_type.drop(op.get_bind(), checkfirst=True)

View File

@@ -1,38 +0,0 @@
"""Add performance_metrics table"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "20251111_01"
down_revision = "20251111_00"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"performance_metrics",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("timestamp", sa.DateTime(), nullable=True),
sa.Column("metric_name", sa.String(), nullable=True),
sa.Column("value", sa.Float(), nullable=True),
sa.Column("labels", sa.String(), nullable=True),
sa.Column("endpoint", sa.String(), nullable=True),
sa.Column("method", sa.String(), nullable=True),
sa.Column("status_code", sa.Integer(), nullable=True),
sa.Column("duration_seconds", sa.Float(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_performance_metrics_timestamp"), "performance_metrics", ["timestamp"], unique=False)
op.create_index(op.f("ix_performance_metrics_metric_name"), "performance_metrics", ["metric_name"], unique=False)
op.create_index(op.f("ix_performance_metrics_endpoint"), "performance_metrics", ["endpoint"], unique=False)
def downgrade() -> None:
op.drop_index(op.f("ix_performance_metrics_endpoint"), table_name="performance_metrics")
op.drop_index(op.f("ix_performance_metrics_metric_name"), table_name="performance_metrics")
op.drop_index(op.f("ix_performance_metrics_timestamp"), table_name="performance_metrics")
op.drop_table("performance_metrics")

View File

@@ -1,134 +0,0 @@
"""Add metadata columns to roles table"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "20251112_00_add_roles_metadata_columns"
down_revision = "20251111_01"
branch_labels = None
depends_on = None
ROLE_BACKFILL = (
("admin", "Administrator", "Full platform access with user management rights."),
(
"project_manager",
"Project Manager",
"Manage projects, scenarios, and associated data.",
),
("analyst", "Analyst", "Review dashboards and scenario outputs."),
(
"viewer",
"Viewer",
"Read-only access to assigned projects and reports.",
),
)
def upgrade() -> None:
op.add_column(
"roles",
sa.Column("display_name", sa.String(length=128), nullable=True),
)
op.add_column(
"roles",
sa.Column("description", sa.Text(), nullable=True),
)
op.add_column(
"roles",
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=True,
server_default=sa.text("timezone('UTC', now())"),
),
)
op.add_column(
"roles",
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=True,
server_default=sa.text("timezone('UTC', now())"),
),
)
connection = op.get_bind()
for name, display_name, description in ROLE_BACKFILL:
connection.execute(
sa.text(
"""
UPDATE roles
SET display_name = :display_name,
description = COALESCE(description, :description)
WHERE name = :name
AND display_name IS NULL
"""
),
{
"name": name,
"display_name": display_name,
"description": description,
},
)
connection.execute(
sa.text(
"""
UPDATE roles
SET display_name = INITCAP(REPLACE(name, '_', ' '))
WHERE display_name IS NULL
"""
)
)
connection.execute(
sa.text(
"""
UPDATE roles
SET created_at = timezone('UTC', now())
WHERE created_at IS NULL
"""
)
)
connection.execute(
sa.text(
"""
UPDATE roles
SET updated_at = timezone('UTC', now())
WHERE updated_at IS NULL
"""
)
)
op.alter_column(
"roles",
"display_name",
existing_type=sa.String(length=128),
nullable=False,
)
op.alter_column(
"roles",
"created_at",
existing_type=sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("timezone('UTC', now())"),
)
op.alter_column(
"roles",
"updated_at",
existing_type=sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("timezone('UTC', now())"),
)
def downgrade() -> None:
op.drop_column("roles", "updated_at")
op.drop_column("roles", "created_at")
op.drop_column("roles", "description")
op.drop_column("roles", "display_name")

View File

@@ -2,7 +2,17 @@
## 2025-11-12
- Fixed critical 500 error in reporting dashboard by correcting route reference in reporting.html template - changed 'reports.project_list_page' to 'projects.project_list_page' to resolve NoMatchFound error when accessing /ui/reporting.
- Completed navigation validation by inventorying all sidebar navigation links, identifying missing routes for simulations, reporting, settings, themes, and currencies, created new UI routes in routes/ui.py with proper authentication guards, built corresponding templates (simulations.html, reporting.html, settings.html, theme_settings.html, currencies.html), registered the UI router in main.py, updated sidebar navigation to use route names instead of hardcoded URLs, and enhanced navigation.js to use dynamic URL resolution for proper route handling.
- Fixed critical template rendering error in sidebar_nav.html where URL objects from request.url_for() were being used with string methods, causing TypeError. Added |string filters to convert URL objects to strings for proper template rendering.
- Integrated Plotly charting for interactive visualizations in reporting templates, added chart generation methods to ReportingService (\_generate_npv_comparison_chart, \_generate_distribution_histogram), updated project summary and scenario distribution contexts to include chart JSON data, enhanced templates with chart containers and JavaScript rendering, added chart-container CSS styling, and validated all reporting tests pass.
- Completed local run verification: started application with `uvicorn main:app --reload` without errors, verified authenticated routes (/login, /, /projects/ui, /projects) load correctly with seeded data, and summarized findings for deployment pipeline readiness.
- Fixed docker-compose.override.yml command array to remove duplicate "uvicorn" entry, enabling successful container startup with uvicorn reload in development mode.
- Completed deployment pipeline verification: built Docker image without errors, validated docker-compose configuration, deployed locally with docker-compose (app and postgres containers started successfully), and confirmed application startup logs showing database bootstrap and seeded data initialization.
- Completed documentation of current data models: updated `calminer-docs/architecture/08_concepts/02_data_model.md` with comprehensive SQLAlchemy model schemas, enumerations, Pydantic API schemas, and analysis of discrepancies between models and schemas.
- Switched `models/performance_metric.py` to reuse the shared declarative base from `config.database`, clearing the SQLAlchemy 2.0 `declarative_base` deprecation warning and verifying repository tests still pass.
- Replaced the Alembic migration workflow with the idempotent Pydantic-backed initializer (`scripts/init_db.py`), added a guarded reset utility (`scripts/reset_db.py`), removed migration artifacts/tooling (Alembic directory, config, Docker entrypoint), refreshed the container entrypoint to invoke `uvicorn` directly, and updated installation/architecture docs plus the README to direct developers to the new seeding/reset flow.
- Eliminated Bandit hardcoded-secret findings by replacing literal JWT tokens and passwords across auth/security tests with randomized helpers drawn from `tests/utils/security.py`, ensuring fixtures still assert expected behaviours.
- Centralized Bandit configuration in `pyproject.toml`, reran `bandit -c pyproject.toml -r calminer tests`, and verified the scan now reports zero issues.
- Updated `.github/instructions/TODO.md` and `.github/instructions/DONE.md` to reflect the completed security scan remediation workflow.
@@ -10,9 +20,11 @@
- Resolved Ruff E402 warnings by moving module docstrings ahead of `from __future__ import annotations` across currency and pricing service modules, dropped the unused `HTTPException` import in `monitoring/__init__.py`, and confirmed a clean `ruff check .` run.
- Enhanced the deploy job in `.gitea/workflows/cicache.yml` to capture Kubernetes pod, deployment, and container logs into `/logs/deployment/` for staging/production rollouts and publish them via a `deployment-logs` artifact, updating CI/CD documentation with retrieval instructions.
- Fixed CI dashboard template lookup failures by renaming `templates/Dashboard.html` to `templates/dashboard.html` and verifying `tests/test_dashboard_route.py` locally to ensure TemplateNotFound no longer occurs on case-sensitive filesystems.
- Implemented SQLite support as primary local database with environment-driven backend switching (`CALMINER_USE_SQLITE=true`), updated `scripts/init_db.py` for database-agnostic DDL generation (PostgreSQL enums vs SQLite CHECK constraints), tested compatibility with both backends, and verified application startup and seeded data initialization work seamlessly across SQLite and PostgreSQL.
## 2025-11-11
- Collapsed legacy Alembic revisions into `alembic/versions/00_initial.py`, removed superseded migration files, and verified the consolidated schema via SQLite upgrade and Postgres version stamping.
- Implemented base URL routing to redirect unauthenticated users to login and authenticated users to dashboard.
- Added comprehensive end-to-end tests for login flow, including redirects, session handling, and error messaging for invalid/inactive accounts.
- Updated header and footer templates to consistently use `logo_big.png` image instead of text logo, with appropriate CSS styling for sizing.

View File

@@ -11,12 +11,21 @@ def _build_database_url() -> str:
"""Construct the SQLAlchemy database URL from granular environment vars.
Falls back to `DATABASE_URL` for backward compatibility.
Supports SQLite when CALMINER_USE_SQLITE is set.
"""
legacy_url = os.environ.get("DATABASE_URL", "")
if legacy_url and legacy_url.strip() != "":
return legacy_url
use_sqlite = os.environ.get("CALMINER_USE_SQLITE", "").lower() in ("true", "1", "yes")
if use_sqlite:
# Use SQLite database
db_path = os.environ.get("DATABASE_PATH", "./data/calminer.db")
# Ensure the directory exists
os.makedirs(os.path.dirname(db_path), exist_ok=True)
return f"sqlite:///{db_path}"
driver = os.environ.get("DATABASE_DRIVER", "postgresql")
host = os.environ.get("DATABASE_HOST")
port = os.environ.get("DATABASE_PORT", "5432")
@@ -54,7 +63,15 @@ def _build_database_url() -> str:
DATABASE_URL = _build_database_url()
engine = create_engine(DATABASE_URL, echo=True, future=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Avoid expiring ORM objects on commit so that objects returned from UnitOfWork
# remain usable for the duration of the request cycle without causing
# DetachedInstanceError when accessed after the session commits.
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False,
)
Base = declarative_base()

View File

@@ -31,7 +31,6 @@ services:
# Override command for development with reload
command:
[
"uvicorn",
"main:app",
"--host",
"0.0.0.0",

42
main.py
View File

@@ -15,8 +15,10 @@ from routes.exports import router as exports_router
from routes.projects import router as projects_router
from routes.reports import router as reports_router
from routes.scenarios import router as scenarios_router
from routes.ui import router as ui_router
from monitoring import router as monitoring_router
from services.bootstrap import bootstrap_admin, bootstrap_pricing_settings
from scripts.init_db import init_db as init_db_script
app = FastAPI()
@@ -44,6 +46,14 @@ async def ensure_admin_bootstrap() -> None:
admin_settings = settings.admin_bootstrap_settings()
pricing_metadata = settings.pricing_metadata()
try:
# Ensure DB schema/types/seeds required for bootstrapping exist.
# The initializer is idempotent and safe to run on every startup.
try:
init_db_script()
except Exception:
logger.exception(
"DB initializer failed; continuing to bootstrap (non-fatal)")
role_result, admin_result = bootstrap_admin(settings=admin_settings)
pricing_result = bootstrap_pricing_settings(metadata=pricing_metadata)
logger.info(
@@ -54,14 +64,29 @@ async def ensure_admin_bootstrap() -> None:
admin_result.password_rotated,
admin_result.roles_granted,
)
logger.info(
"Pricing settings bootstrap completed: slug=%s created=%s updated_fields=%s impurity_upserts=%s projects_assigned=%s",
pricing_result.seed.settings.slug,
pricing_result.seed.created,
pricing_result.seed.updated_fields,
pricing_result.seed.impurity_upserts,
pricing_result.projects_assigned,
)
# Avoid accessing ORM-managed attributes that may be detached outside
# of the UnitOfWork/session scope. Attempt a safe extraction and
# fall back to minimal logging if attributes are unavailable.
try:
seed = pricing_result.seed
slug = getattr(seed.settings, "slug", None) if seed and getattr(
seed, "settings", None) else None
created = getattr(seed, "created", None)
updated_fields = getattr(seed, "updated_fields", None)
impurity_upserts = getattr(seed, "impurity_upserts", None)
logger.info(
"Pricing settings bootstrap completed: slug=%s created=%s updated_fields=%s impurity_upserts=%s projects_assigned=%s",
slug,
created,
updated_fields,
impurity_upserts,
pricing_result.projects_assigned,
)
except Exception:
logger.info(
"Pricing settings bootstrap completed (partial): projects_assigned=%s",
pricing_result.projects_assigned,
)
except Exception: # pragma: no cover - defensive logging
logger.exception(
"Failed to bootstrap administrator or pricing settings")
@@ -74,6 +99,7 @@ app.include_router(exports_router)
app.include_router(projects_router)
app.include_router(scenarios_router)
app.include_router(reports_router)
app.include_router(ui_router)
app.include_router(monitoring_router)
app.mount("/static", StaticFiles(directory="static"), name="static")

View File

@@ -8,6 +8,7 @@ from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoin
from starlette.types import ASGIApp
from config.settings import Settings, get_settings
from sqlalchemy.orm.exc import DetachedInstanceError
from models import User
from monitoring.metrics import ACTIVE_CONNECTIONS
from services.exceptions import EntityNotFoundError
@@ -66,21 +67,42 @@ class AuthSessionMiddleware(BaseHTTPMiddleware):
resolved = self._resolve_session(request)
# Track active sessions for authenticated users
if resolved.session.user and resolved.session.user.is_active:
try:
user_active = bool(resolved.session.user and getattr(
resolved.session.user, "is_active", False))
except DetachedInstanceError:
user_active = False
if user_active:
AuthSessionMiddleware._active_sessions += 1
ACTIVE_CONNECTIONS.set(AuthSessionMiddleware._active_sessions)
response: Response | None = None
try:
response = await call_next(request)
return response
finally:
# Decrement on response
if resolved.session.user and resolved.session.user.is_active:
# Always decrement the active sessions counter if we incremented it.
if user_active:
AuthSessionMiddleware._active_sessions = max(
0, AuthSessionMiddleware._active_sessions - 1)
ACTIVE_CONNECTIONS.set(AuthSessionMiddleware._active_sessions)
self._apply_session(response, resolved)
# Only apply session cookies if a response was produced by downstream
# application. If an exception occurred before a response was created
# we avoid raising another error here.
import logging
if response is not None:
try:
self._apply_session(response, resolved)
except Exception:
logging.getLogger(__name__).exception(
"Failed to apply session cookies to response"
)
else:
logging.getLogger(__name__).debug(
"AuthSessionMiddleware: no response produced by downstream app (response is None)"
)
def _resolve_session(self, request: Request) -> _ResolutionResult:
settings = self._settings_provider()

View File

@@ -10,10 +10,14 @@ async def validate_json(
) -> Response:
# Only validate JSON for requests with a body
if request.method in ("POST", "PUT", "PATCH"):
try:
# attempt to parse json body
await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
# Only attempt JSON parsing when the client indicates a JSON content type.
content_type = (request.headers.get("content-type") or "").lower()
if "json" in content_type:
try:
# attempt to parse json body
await request.json()
except Exception:
raise HTTPException(
status_code=400, detail="Invalid JSON payload")
response = await call_next(request)
return response

View File

@@ -1,14 +1,11 @@
"""Database models and shared metadata for the CalMiner domain."""
from .financial_input import FinancialCategory, FinancialInput
from .financial_input import FinancialInput
from .metadata import (
COST_BUCKET_METADATA,
RESOURCE_METADATA,
STOCHASTIC_VARIABLE_METADATA,
CostBucket,
ResourceDescriptor,
ResourceType,
StochasticVariable,
StochasticVariableDescriptor,
)
from .performance_metric import PerformanceMetric
@@ -17,9 +14,18 @@ from .pricing_settings import (
PricingMetalSettings,
PricingSettings,
)
from .project import MiningOperationType, Project
from .scenario import Scenario, ScenarioStatus
from .simulation_parameter import DistributionType, SimulationParameter
from .enums import (
CostBucket,
DistributionType,
FinancialCategory,
MiningOperationType,
ResourceType,
ScenarioStatus,
StochasticVariable,
)
from .project import Project
from .scenario import Scenario
from .simulation_parameter import SimulationParameter
from .user import Role, User, UserRole, password_context
__all__ = [

96
models/enums.py Normal file
View File

@@ -0,0 +1,96 @@
from __future__ import annotations
from enum import Enum
from typing import Type
from sqlalchemy import Enum as SQLEnum
def sql_enum(enum_cls: Type[Enum], *, name: str) -> SQLEnum:
"""Build a SQLAlchemy Enum that maps using the enum member values."""
return SQLEnum(
enum_cls,
name=name,
create_type=False,
validate_strings=True,
values_callable=lambda enum_cls: [member.value for member in enum_cls],
)
class MiningOperationType(str, Enum):
"""Supported mining operation categories."""
OPEN_PIT = "open_pit"
UNDERGROUND = "underground"
IN_SITU_LEACH = "in_situ_leach"
PLACER = "placer"
QUARRY = "quarry"
MOUNTAINTOP_REMOVAL = "mountaintop_removal"
OTHER = "other"
class ScenarioStatus(str, Enum):
"""Lifecycle states for project scenarios."""
DRAFT = "draft"
ACTIVE = "active"
ARCHIVED = "archived"
class FinancialCategory(str, Enum):
"""Enumeration of cost and revenue classifications."""
CAPITAL_EXPENDITURE = "capex"
OPERATING_EXPENDITURE = "opex"
REVENUE = "revenue"
CONTINGENCY = "contingency"
OTHER = "other"
class DistributionType(str, Enum):
"""Supported stochastic distribution families for simulations."""
NORMAL = "normal"
TRIANGULAR = "triangular"
UNIFORM = "uniform"
LOGNORMAL = "lognormal"
CUSTOM = "custom"
class ResourceType(str, Enum):
"""Primary consumables and resources used in mining operations."""
DIESEL = "diesel"
ELECTRICITY = "electricity"
WATER = "water"
EXPLOSIVES = "explosives"
REAGENTS = "reagents"
LABOR = "labor"
EQUIPMENT_HOURS = "equipment_hours"
TAILINGS_CAPACITY = "tailings_capacity"
class CostBucket(str, Enum):
"""Granular cost buckets aligned with project accounting."""
CAPITAL_INITIAL = "capital_initial"
CAPITAL_SUSTAINING = "capital_sustaining"
OPERATING_FIXED = "operating_fixed"
OPERATING_VARIABLE = "operating_variable"
MAINTENANCE = "maintenance"
RECLAMATION = "reclamation"
ROYALTIES = "royalties"
GENERAL_ADMIN = "general_admin"
class StochasticVariable(str, Enum):
"""Domain variables that typically require probabilistic modelling."""
ORE_GRADE = "ore_grade"
RECOVERY_RATE = "recovery_rate"
METAL_PRICE = "metal_price"
OPERATING_COST = "operating_cost"
CAPITAL_COST = "capital_cost"
DISCOUNT_RATE = "discount_rate"
THROUGHPUT = "throughput"

View File

@@ -1,13 +1,11 @@
from __future__ import annotations
from datetime import date, datetime
from enum import Enum
from typing import TYPE_CHECKING
from sqlalchemy import (
Date,
DateTime,
Enum as SQLEnum,
ForeignKey,
Integer,
Numeric,
@@ -19,23 +17,13 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship, validates
from sqlalchemy.sql import func
from config.database import Base
from .metadata import CostBucket
from .enums import CostBucket, FinancialCategory, sql_enum
from services.currency import normalise_currency
if TYPE_CHECKING: # pragma: no cover
from .scenario import Scenario
class FinancialCategory(str, Enum):
"""Enumeration of cost and revenue classifications."""
CAPITAL_EXPENDITURE = "capex"
OPERATING_EXPENDITURE = "opex"
REVENUE = "revenue"
CONTINGENCY = "contingency"
OTHER = "other"
class FinancialInput(Base):
"""Line-item financial assumption attached to a scenario."""
@@ -47,10 +35,10 @@ class FinancialInput(Base):
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
category: Mapped[FinancialCategory] = mapped_column(
SQLEnum(FinancialCategory), nullable=False
sql_enum(FinancialCategory, name="financialcategory"), nullable=False
)
cost_bucket: Mapped[CostBucket | None] = mapped_column(
SQLEnum(CostBucket), nullable=True
sql_enum(CostBucket, name="costbucket"), nullable=True
)
amount: Mapped[float] = mapped_column(Numeric(18, 2), nullable=False)
currency: Mapped[str | None] = mapped_column(String(3), nullable=True)

View File

@@ -1,45 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
class ResourceType(str, Enum):
"""Primary consumables and resources used in mining operations."""
DIESEL = "diesel"
ELECTRICITY = "electricity"
WATER = "water"
EXPLOSIVES = "explosives"
REAGENTS = "reagents"
LABOR = "labor"
EQUIPMENT_HOURS = "equipment_hours"
TAILINGS_CAPACITY = "tailings_capacity"
class CostBucket(str, Enum):
"""Granular cost buckets aligned with project accounting."""
CAPITAL_INITIAL = "capital_initial"
CAPITAL_SUSTAINING = "capital_sustaining"
OPERATING_FIXED = "operating_fixed"
OPERATING_VARIABLE = "operating_variable"
MAINTENANCE = "maintenance"
RECLAMATION = "reclamation"
ROYALTIES = "royalties"
GENERAL_ADMIN = "general_admin"
class StochasticVariable(str, Enum):
"""Domain variables that typically require probabilistic modelling."""
ORE_GRADE = "ore_grade"
RECOVERY_RATE = "recovery_rate"
METAL_PRICE = "metal_price"
OPERATING_COST = "operating_cost"
CAPITAL_COST = "capital_cost"
DISCOUNT_RATE = "discount_rate"
THROUGHPUT = "throughput"
from .enums import ResourceType, CostBucket, StochasticVariable
@dataclass(frozen=True)

View File

@@ -1,10 +1,11 @@
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, List
from sqlalchemy import DateTime, Enum as SQLEnum, ForeignKey, Integer, String, Text
from .enums import MiningOperationType, sql_enum
from sqlalchemy import DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
@@ -15,18 +16,6 @@ if TYPE_CHECKING: # pragma: no cover
from .pricing_settings import PricingSettings
class MiningOperationType(str, Enum):
"""Supported mining operation categories."""
OPEN_PIT = "open_pit"
UNDERGROUND = "underground"
IN_SITU_LEACH = "in_situ_leach"
PLACER = "placer"
QUARRY = "quarry"
MOUNTAINTOP_REMOVAL = "mountaintop_removal"
OTHER = "other"
class Project(Base):
"""Top-level mining project grouping multiple scenarios."""
@@ -36,7 +25,9 @@ class Project(Base):
name: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
location: Mapped[str | None] = mapped_column(String(255), nullable=True)
operation_type: Mapped[MiningOperationType] = mapped_column(
SQLEnum(MiningOperationType), nullable=False, default=MiningOperationType.OTHER
sql_enum(MiningOperationType, name="miningoperationtype"),
nullable=False,
default=MiningOperationType.OTHER,
)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
pricing_settings_id: Mapped[int | None] = mapped_column(

View File

@@ -1,25 +1,24 @@
from __future__ import annotations
from datetime import date, datetime
from enum import Enum
from typing import TYPE_CHECKING, List
from sqlalchemy import (
Date,
DateTime,
Enum as SQLEnum,
ForeignKey,
Integer,
Numeric,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship, validates
from sqlalchemy.sql import func
from config.database import Base
from services.currency import normalise_currency
from .metadata import ResourceType
from .enums import ResourceType, ScenarioStatus, sql_enum
if TYPE_CHECKING: # pragma: no cover
from .financial_input import FinancialInput
@@ -27,18 +26,14 @@ if TYPE_CHECKING: # pragma: no cover
from .simulation_parameter import SimulationParameter
class ScenarioStatus(str, Enum):
"""Lifecycle states for project scenarios."""
DRAFT = "draft"
ACTIVE = "active"
ARCHIVED = "archived"
class Scenario(Base):
"""A specific configuration of assumptions for a project."""
__tablename__ = "scenarios"
__table_args__ = (
UniqueConstraint("project_id", "name",
name="uq_scenarios_project_name"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
project_id: Mapped[int] = mapped_column(
@@ -47,7 +42,9 @@ class Scenario(Base):
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
status: Mapped[ScenarioStatus] = mapped_column(
SQLEnum(ScenarioStatus), nullable=False, default=ScenarioStatus.DRAFT
sql_enum(ScenarioStatus, name="scenariostatus"),
nullable=False,
default=ScenarioStatus.DRAFT,
)
start_date: Mapped[date | None] = mapped_column(Date, nullable=True)
end_date: Mapped[date | None] = mapped_column(Date, nullable=True)
@@ -55,7 +52,7 @@ class Scenario(Base):
Numeric(5, 2), nullable=True)
currency: Mapped[str | None] = mapped_column(String(3), nullable=True)
primary_resource: Mapped[ResourceType | None] = mapped_column(
SQLEnum(ResourceType), nullable=True
sql_enum(ResourceType, name="resourcetype"), nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()

View File

@@ -1,13 +1,13 @@
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING
from .enums import DistributionType, ResourceType, StochasticVariable, sql_enum
from sqlalchemy import (
JSON,
DateTime,
Enum as SQLEnum,
ForeignKey,
Integer,
Numeric,
@@ -17,22 +17,11 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from config.database import Base
from .metadata import ResourceType, StochasticVariable
if TYPE_CHECKING: # pragma: no cover
from .scenario import Scenario
class DistributionType(str, Enum):
"""Supported stochastic distribution families for simulations."""
NORMAL = "normal"
TRIANGULAR = "triangular"
UNIFORM = "uniform"
LOGNORMAL = "lognormal"
CUSTOM = "custom"
class SimulationParameter(Base):
"""Probability distribution settings for scenario simulations."""
@@ -44,13 +33,13 @@ class SimulationParameter(Base):
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
distribution: Mapped[DistributionType] = mapped_column(
SQLEnum(DistributionType), nullable=False
sql_enum(DistributionType, name="distributiontype"), nullable=False
)
variable: Mapped[StochasticVariable | None] = mapped_column(
SQLEnum(StochasticVariable), nullable=True
sql_enum(StochasticVariable, name="stochasticvariable"), nullable=True
)
resource_type: Mapped[ResourceType | None] = mapped_column(
SQLEnum(ResourceType), nullable=True
sql_enum(ResourceType, name="resourcetype"), nullable=True
)
mean_value: Mapped[float | None] = mapped_column(
Numeric(18, 4), nullable=True)

View File

@@ -27,7 +27,6 @@ branch = true
source = ["."]
omit = [
"tests/*",
"alembic/*",
"scripts/*",
"main.py",
"routes/reports.py",
@@ -39,6 +38,6 @@ skip_empty = true
show_missing = true
[tool.bandit]
exclude_dirs = ["alembic", "scripts"]
exclude_dirs = ["scripts"]
skips = ["B101", "B601"] # B101: assert_used, B601: shell_injection (may be false positives)

View File

@@ -1,2 +1 @@
-r requirements.txt
alembic
-r requirements.txt

109
routes/ui.py Normal file
View File

@@ -0,0 +1,109 @@
from __future__ import annotations
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from dependencies import require_any_role, require_roles
from models import User
router = APIRouter(tags=["UI"])
templates = Jinja2Templates(directory="templates")
READ_ROLES = ("viewer", "analyst", "project_manager", "admin")
MANAGE_ROLES = ("project_manager", "admin")
@router.get(
"/ui/simulations",
response_class=HTMLResponse,
include_in_schema=False,
name="ui.simulations",
)
def simulations_dashboard(
request: Request,
_: User = Depends(require_any_role(*READ_ROLES)),
) -> HTMLResponse:
return templates.TemplateResponse(
request,
"simulations.html",
{
"title": "Simulations",
},
)
@router.get(
"/ui/reporting",
response_class=HTMLResponse,
include_in_schema=False,
name="ui.reporting",
)
def reporting_dashboard(
request: Request,
_: User = Depends(require_any_role(*READ_ROLES)),
) -> HTMLResponse:
return templates.TemplateResponse(
request,
"reporting.html",
{
"title": "Reporting",
},
)
@router.get(
"/ui/settings",
response_class=HTMLResponse,
include_in_schema=False,
name="ui.settings",
)
def settings_page(
request: Request,
_: User = Depends(require_any_role(*READ_ROLES)),
) -> HTMLResponse:
return templates.TemplateResponse(
request,
"settings.html",
{
"title": "Settings",
},
)
@router.get(
"/theme-settings",
response_class=HTMLResponse,
include_in_schema=False,
name="ui.theme_settings",
)
def theme_settings_page(
request: Request,
_: User = Depends(require_any_role(*READ_ROLES)),
) -> HTMLResponse:
return templates.TemplateResponse(
request,
"theme_settings.html",
{
"title": "Theme Settings",
},
)
@router.get(
"/ui/currencies",
response_class=HTMLResponse,
include_in_schema=False,
name="ui.currencies",
)
def currencies_page(
request: Request,
_: User = Depends(require_roles(*MANAGE_ROLES)),
) -> HTMLResponse:
return templates.TemplateResponse(
request,
"currencies.html",
{
"title": "Currency Management",
},
)

View File

@@ -1,9 +0,0 @@
#!/usr/bin/env sh
set -e
PYTHONPATH="/app:${PYTHONPATH}"
export PYTHONPATH
python -m scripts.run_migrations
exec "$@"

972
scripts/init_db.py Normal file
View File

@@ -0,0 +1,972 @@
"""Idempotent DB initialization and seeding using Pydantic validation and raw SQL.
Usage:
from scripts.init_db import init_db
init_db()
This module creates PostgreSQL ENUM types if missing, creates minimal tables
required for bootstrapping (roles, users, user_roles, pricing_settings and
ancillary pricing tables), and seeds initial rows using INSERT ... ON CONFLICT
DO NOTHING so it's safe to run multiple times.
Notes:
- This module avoids importing application models at import time to prevent
side-effects. Database connections are created inside functions.
- It intentionally performs non-destructive operations only (CREATE IF NOT
EXISTS, INSERT ... ON CONFLICT).
"""
from __future__ import annotations
from typing import List, Optional, Set
import os
import logging
from decimal import Decimal
from pydantic import BaseModel, Field, validator
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from passlib.context import CryptContext
logger = logging.getLogger(__name__)
password_context = CryptContext(schemes=["argon2"], deprecated="auto")
# ENUM definitions matching previous schema
ENUM_DEFINITIONS = {
"miningoperationtype": [
"open_pit",
"underground",
"in_situ_leach",
"placer",
"quarry",
"mountaintop_removal",
"other",
],
"scenariostatus": ["draft", "active", "archived"],
"financialcategory": ["capex", "opex", "revenue", "contingency", "other"],
"costbucket": [
"capital_initial",
"capital_sustaining",
"operating_fixed",
"operating_variable",
"maintenance",
"reclamation",
"royalties",
"general_admin",
],
"distributiontype": ["normal", "triangular", "uniform", "lognormal", "custom"],
"stochasticvariable": [
"ore_grade",
"recovery_rate",
"metal_price",
"operating_cost",
"capital_cost",
"discount_rate",
"throughput",
],
"resourcetype": [
"diesel",
"electricity",
"water",
"explosives",
"reagents",
"labor",
"equipment_hours",
"tailings_capacity",
],
}
# Minimal DDL for tables we seed / that bootstrap relies on
def _get_table_ddls(is_sqlite: bool) -> List[str]:
if is_sqlite:
return [
# roles
"""
CREATE TABLE IF NOT EXISTS roles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
description TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""",
# users
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
is_active INTEGER NOT NULL DEFAULT 1,
is_superuser INTEGER NOT NULL DEFAULT 0,
last_login_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""",
# user_roles
"""
CREATE TABLE IF NOT EXISTS user_roles (
user_id INTEGER NOT NULL,
role_id INTEGER NOT NULL,
granted_at DATETIME DEFAULT CURRENT_TIMESTAMP,
granted_by INTEGER,
PRIMARY KEY (user_id, role_id)
);
""",
# pricing_settings
"""
CREATE TABLE IF NOT EXISTS pricing_settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
slug TEXT NOT NULL UNIQUE,
description TEXT,
default_currency TEXT,
default_payable_pct REAL DEFAULT 100.00 NOT NULL,
moisture_threshold_pct REAL DEFAULT 8.00 NOT NULL,
moisture_penalty_per_pct REAL DEFAULT 0.0000 NOT NULL,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""",
# pricing_metal_settings
"""
CREATE TABLE IF NOT EXISTS pricing_metal_settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pricing_settings_id INTEGER NOT NULL REFERENCES pricing_settings(id) ON DELETE CASCADE,
metal_code TEXT NOT NULL,
payable_pct REAL,
moisture_threshold_pct REAL,
moisture_penalty_per_pct REAL,
data TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE (pricing_settings_id, metal_code)
);
""",
# pricing_impurity_settings
"""
CREATE TABLE IF NOT EXISTS pricing_impurity_settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pricing_settings_id INTEGER NOT NULL REFERENCES pricing_settings(id) ON DELETE CASCADE,
impurity_code TEXT NOT NULL,
threshold_ppm REAL DEFAULT 0.0000 NOT NULL,
penalty_per_ppm REAL DEFAULT 0.0000 NOT NULL,
notes TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE (pricing_settings_id, impurity_code)
);
""",
# core domain tables: projects, scenarios, financial_inputs, simulation_parameters
"""
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
location TEXT,
operation_type TEXT NOT NULL CHECK (operation_type IN ('open_pit', 'underground', 'in_situ_leach', 'placer', 'quarry', 'mountaintop_removal', 'other')),
description TEXT,
pricing_settings_id INTEGER REFERENCES pricing_settings(id) ON DELETE SET NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""",
"""
CREATE TABLE IF NOT EXISTS scenarios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
name TEXT NOT NULL,
description TEXT,
status TEXT NOT NULL CHECK (status IN ('draft', 'active', 'archived')),
start_date DATE,
end_date DATE,
discount_rate REAL,
currency TEXT,
primary_resource TEXT CHECK (primary_resource IN ('diesel', 'electricity', 'water', 'explosives', 'reagents', 'labor', 'equipment_hours', 'tailings_capacity') OR primary_resource IS NULL),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE (project_id, name)
);
""",
"""
CREATE TABLE IF NOT EXISTS financial_inputs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scenario_id INTEGER NOT NULL REFERENCES scenarios(id) ON DELETE CASCADE,
name TEXT NOT NULL,
category TEXT NOT NULL CHECK (category IN ('capex', 'opex', 'revenue', 'contingency', 'other')),
cost_bucket TEXT CHECK (cost_bucket IN ('capital_initial', 'capital_sustaining', 'operating_fixed', 'operating_variable', 'maintenance', 'reclamation', 'royalties', 'general_admin') OR cost_bucket IS NULL),
amount REAL NOT NULL,
currency TEXT,
effective_date DATE,
notes TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE (scenario_id, name)
);
""",
"""
CREATE TABLE IF NOT EXISTS simulation_parameters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scenario_id INTEGER NOT NULL REFERENCES scenarios(id) ON DELETE CASCADE,
name TEXT NOT NULL,
distribution TEXT NOT NULL CHECK (distribution IN ('normal', 'triangular', 'uniform', 'lognormal', 'custom')),
variable TEXT CHECK (variable IN ('ore_grade', 'recovery_rate', 'metal_price', 'operating_cost', 'capital_cost', 'discount_rate', 'throughput') OR variable IS NULL),
resource_type TEXT CHECK (resource_type IN ('diesel', 'electricity', 'water', 'explosives', 'reagents', 'labor', 'equipment_hours', 'tailings_capacity') OR resource_type IS NULL),
mean_value REAL,
standard_deviation REAL,
minimum_value REAL,
maximum_value REAL,
unit TEXT,
configuration TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""",
]
else:
# PostgreSQL DDLs
return [
# roles
"""
CREATE TABLE IF NOT EXISTS roles (
id INTEGER PRIMARY KEY,
name VARCHAR(64) NOT NULL,
display_name VARCHAR(128) NOT NULL,
description TEXT,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_roles_name UNIQUE (name)
);
""",
# users
"""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL,
username VARCHAR(128) NOT NULL,
password_hash VARCHAR(255) NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT true,
is_superuser BOOLEAN NOT NULL DEFAULT false,
last_login_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_users_email UNIQUE (email),
CONSTRAINT uq_users_username UNIQUE (username)
);
""",
# user_roles
"""
CREATE TABLE IF NOT EXISTS user_roles (
user_id INTEGER NOT NULL,
role_id INTEGER NOT NULL,
granted_at TIMESTAMPTZ DEFAULT now(),
granted_by INTEGER,
PRIMARY KEY (user_id, role_id),
CONSTRAINT uq_user_roles_user_role UNIQUE (user_id, role_id)
);
""",
# pricing_settings
"""
CREATE TABLE IF NOT EXISTS pricing_settings (
id SERIAL PRIMARY KEY,
name VARCHAR(128) NOT NULL,
slug VARCHAR(64) NOT NULL,
description TEXT,
default_currency VARCHAR(3),
default_payable_pct NUMERIC(5,2) DEFAULT 100.00 NOT NULL,
moisture_threshold_pct NUMERIC(5,2) DEFAULT 8.00 NOT NULL,
moisture_penalty_per_pct NUMERIC(14,4) DEFAULT 0.0000 NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_pricing_settings_slug UNIQUE (slug),
CONSTRAINT uq_pricing_settings_name UNIQUE (name)
);
""",
# pricing_metal_settings
"""
CREATE TABLE IF NOT EXISTS pricing_metal_settings (
id SERIAL PRIMARY KEY,
pricing_settings_id INTEGER NOT NULL REFERENCES pricing_settings(id) ON DELETE CASCADE,
metal_code VARCHAR(32) NOT NULL,
payable_pct NUMERIC(5,2),
moisture_threshold_pct NUMERIC(5,2),
moisture_penalty_per_pct NUMERIC(14,4),
data JSONB,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_pricing_metal_settings_code UNIQUE (pricing_settings_id, metal_code)
);
""",
# pricing_impurity_settings
"""
CREATE TABLE IF NOT EXISTS pricing_impurity_settings (
id SERIAL PRIMARY KEY,
pricing_settings_id INTEGER NOT NULL REFERENCES pricing_settings(id) ON DELETE CASCADE,
impurity_code VARCHAR(32) NOT NULL,
threshold_ppm NUMERIC(14,4) DEFAULT 0.0000 NOT NULL,
penalty_per_ppm NUMERIC(14,4) DEFAULT 0.0000 NOT NULL,
notes TEXT,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_pricing_impurity_settings_code UNIQUE (pricing_settings_id, impurity_code)
);
""",
# core domain tables: projects, scenarios, financial_inputs, simulation_parameters
"""
CREATE TABLE IF NOT EXISTS projects (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
location VARCHAR(255),
operation_type miningoperationtype NOT NULL,
description TEXT,
pricing_settings_id INTEGER REFERENCES pricing_settings(id) ON DELETE SET NULL,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_projects_name UNIQUE (name)
);
""",
"""
CREATE TABLE IF NOT EXISTS scenarios (
id SERIAL PRIMARY KEY,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
description TEXT,
status scenariostatus NOT NULL,
start_date DATE,
end_date DATE,
discount_rate NUMERIC(5,2),
currency VARCHAR(3),
primary_resource resourcetype,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_scenarios_project_name UNIQUE (project_id, name)
);
""",
"""
CREATE TABLE IF NOT EXISTS financial_inputs (
id SERIAL PRIMARY KEY,
scenario_id INTEGER NOT NULL REFERENCES scenarios(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
category financialcategory NOT NULL,
cost_bucket costbucket,
amount NUMERIC(18,2) NOT NULL,
currency VARCHAR(3),
effective_date DATE,
notes TEXT,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_financial_inputs_scenario_name UNIQUE (scenario_id, name)
);
""",
"""
CREATE TABLE IF NOT EXISTS simulation_parameters (
id SERIAL PRIMARY KEY,
scenario_id INTEGER NOT NULL REFERENCES scenarios(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
distribution distributiontype NOT NULL,
variable stochasticvariable,
resource_type resourcetype,
mean_value NUMERIC(18,4),
standard_deviation NUMERIC(18,4),
minimum_value NUMERIC(18,4),
maximum_value NUMERIC(18,4),
unit VARCHAR(32),
configuration JSONB,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
""",
]
# Seeds
DEFAULT_ROLES = [
{"id": 1, "name": "admin", "display_name": "Administrator",
"description": "Full platform access with user management rights."},
{"id": 2, "name": "project_manager", "display_name": "Project Manager",
"description": "Manage projects, scenarios, and associated data."},
{"id": 3, "name": "analyst", "display_name": "Analyst",
"description": "Review dashboards and scenario outputs."},
{"id": 4, "name": "viewer", "display_name": "Viewer",
"description": "Read-only access to assigned projects and reports."},
]
DEFAULT_ADMIN = {"id": 1, "email": "admin@calminer.local", "username": "admin",
"password": "ChangeMe123!", "is_active": True, "is_superuser": True}
DEFAULT_PRICING = {
"slug": "default",
"name": "Default Pricing",
"description": "Automatically generated default pricing settings.",
"default_currency": "USD",
"default_payable_pct": 100.0,
"moisture_threshold_pct": 8.0,
"moisture_penalty_per_pct": 0.0,
}
class ProjectSeed(BaseModel):
name: str
location: str | None = None
operation_type: str
description: str | None = None
class ScenarioSeed(BaseModel):
project_name: str
name: str
description: str | None = None
status: str = "active"
discount_rate: float | None = Field(default=None)
currency: str | None = Field(default="USD")
primary_resource: str | None = Field(default=None)
class FinancialInputSeed(BaseModel):
scenario_name: str
project_name: str
name: str
category: str
cost_bucket: str | None = None
amount: Decimal
currency: str = "USD"
notes: str | None = None
class RoleSeed(BaseModel):
id: int
name: str
display_name: str
description: Optional[str]
class UserSeed(BaseModel):
id: int
email: str
username: str
password: str
is_active: bool = True
is_superuser: bool = False
@validator("password")
def password_min_len(cls, v: str) -> str:
if not v or len(v) < 8:
raise ValueError("password must be at least 8 characters")
return v
class PricingSeed(BaseModel):
slug: str
name: str
description: Optional[str]
default_currency: Optional[str]
default_payable_pct: float
moisture_threshold_pct: float
moisture_penalty_per_pct: float
DEFAULT_PROJECTS: list[ProjectSeed] = [
ProjectSeed(
name="Helios Copper",
location="Chile",
operation_type="open_pit",
description="Flagship open pit copper operation used for demos",
),
ProjectSeed(
name="Luna Nickel",
location="Australia",
operation_type="underground",
description="Underground nickel sulphide project with stochastic modelling",
),
]
DEFAULT_SCENARIOS: list[ScenarioSeed] = [
ScenarioSeed(
project_name="Helios Copper",
name="Base Case",
description="Deterministic base case for Helios",
status="active",
discount_rate=8.0,
primary_resource="diesel",
),
ScenarioSeed(
project_name="Helios Copper",
name="Expansion Case",
description="Expansion scenario with increased throughput",
status="draft",
discount_rate=9.0,
primary_resource="electricity",
),
ScenarioSeed(
project_name="Luna Nickel",
name="Feasibility",
description="Feasibility scenario targeting steady state",
status="active",
discount_rate=10.0,
primary_resource="electricity",
),
]
DEFAULT_FINANCIAL_INPUTS: list[FinancialInputSeed] = [
FinancialInputSeed(
project_name="Helios Copper",
scenario_name="Base Case",
name="Initial Capital",
category="capex",
cost_bucket="capital_initial",
amount=Decimal("450000000"),
notes="Initial mine development costs",
),
FinancialInputSeed(
project_name="Helios Copper",
scenario_name="Base Case",
name="Processing Opex",
category="opex",
cost_bucket="operating_variable",
amount=Decimal("75000000"),
notes="Annual processing operating expenditure",
),
FinancialInputSeed(
project_name="Helios Copper",
scenario_name="Expansion Case",
name="Expansion Capital",
category="capex",
cost_bucket="capital_sustaining",
amount=Decimal("120000000"),
),
FinancialInputSeed(
project_name="Luna Nickel",
scenario_name="Feasibility",
name="Nickel Revenue",
category="revenue",
cost_bucket=None,
amount=Decimal("315000000"),
),
]
def _get_database_url() -> str:
# Prefer the same DATABASE_URL used by the application
from config.database import DATABASE_URL
return DATABASE_URL
def _is_sqlite(database_url: str) -> bool:
return database_url.startswith("sqlite://")
def _create_engine(database_url: Optional[str] = None) -> Engine:
database_url = database_url or _get_database_url()
engine = create_engine(database_url, future=True)
return engine
def _create_enum_if_missing_sql(type_name: str, values: List[str]) -> str:
# Use a DO block to safely create the enum only if it is missing
vals = ", ".join(f"'{v}'" for v in values)
sql = (
"DO $$ BEGIN "
f"IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = '{type_name}') THEN "
f"CREATE TYPE {type_name} AS ENUM ({vals}); "
"END IF; END $$;"
)
return sql
def ensure_enums(engine: Engine, is_sqlite: bool) -> None:
if is_sqlite:
# SQLite doesn't have enums, constraints are in table DDL
logger.debug("Skipping enum creation for SQLite")
return
with engine.begin() as conn:
for name, vals in ENUM_DEFINITIONS.items():
sql = _create_enum_if_missing_sql(name, vals)
logger.debug("Ensuring enum %s: %s", name, sql)
conn.execute(text(sql))
def _fetch_enum_values(conn, type_name: str) -> Set[str]:
rows = conn.execute(
text(
"""
SELECT e.enumlabel
FROM pg_enum e
JOIN pg_type t ON t.oid = e.enumtypid
WHERE t.typname = :type_name
"""
),
{"type_name": type_name},
)
return {row.enumlabel for row in rows}
def normalize_enum_values(engine: Engine, is_sqlite: bool) -> None:
if is_sqlite:
# No enums to normalize in SQLite
logger.debug("Skipping enum normalization for SQLite")
return
with engine.begin() as conn:
for type_name, expected_values in ENUM_DEFINITIONS.items():
try:
existing_values = _fetch_enum_values(conn, type_name)
except Exception as exc: # pragma: no cover - system catalogs missing
logger.debug(
"Skipping enum normalization for %s due to error: %s",
type_name,
exc,
)
continue
expected_set = set(expected_values)
for value in list(existing_values):
if value in expected_set:
continue
normalized = value.lower()
if (
normalized != value
and normalized in expected_set
and normalized not in existing_values
):
logger.info(
"Renaming enum value %s.%s -> %s",
type_name,
value,
normalized,
)
conn.execute(
text(
f"ALTER TYPE {type_name} RENAME VALUE :old_value TO :new_value"
),
{"old_value": value, "new_value": normalized},
)
existing_values.remove(value)
existing_values.add(normalized)
def ensure_tables(engine: Engine, is_sqlite: bool) -> None:
table_ddls = _get_table_ddls(is_sqlite)
with engine.begin() as conn:
for ddl in table_ddls:
logger.debug("Executing DDL:\n%s", ddl)
conn.execute(text(ddl))
CONSTRAINT_DDLS = [
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'uq_scenarios_project_name'
) THEN
ALTER TABLE scenarios
ADD CONSTRAINT uq_scenarios_project_name UNIQUE (project_id, name);
END IF;
END;
$$;
""",
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'uq_financial_inputs_scenario_name'
) THEN
ALTER TABLE financial_inputs
ADD CONSTRAINT uq_financial_inputs_scenario_name UNIQUE (scenario_id, name);
END IF;
END;
$$;
""",
]
def ensure_constraints(engine: Engine, is_sqlite: bool) -> None:
if is_sqlite:
# Constraints are already in table DDL for SQLite
logger.debug("Skipping constraint creation for SQLite")
return
with engine.begin() as conn:
for ddl in CONSTRAINT_DDLS:
logger.debug("Ensuring constraint via:\n%s", ddl)
conn.execute(text(ddl))
def seed_roles(engine: Engine, is_sqlite: bool) -> None:
with engine.begin() as conn:
for r in DEFAULT_ROLES:
seed = RoleSeed(**r)
conn.execute(
text(
"INSERT INTO roles (id, name, display_name, description) VALUES (:id, :name, :display_name, :description) "
"ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, display_name = EXCLUDED.display_name, description = EXCLUDED.description"
),
dict(id=seed.id, name=seed.name,
display_name=seed.display_name, description=seed.description),
)
def seed_admin_user(engine: Engine, is_sqlite: bool) -> None:
with engine.begin() as conn:
# Use environment-configured admin settings when present so initializer
# aligns with the application's bootstrap configuration.
admin_email = os.getenv(
"CALMINER_SEED_ADMIN_EMAIL", DEFAULT_ADMIN["email"])
admin_username = os.getenv(
"CALMINER_SEED_ADMIN_USERNAME", DEFAULT_ADMIN["username"])
admin_password = os.getenv(
"CALMINER_SEED_ADMIN_PASSWORD", DEFAULT_ADMIN["password"])
u = UserSeed(
id=DEFAULT_ADMIN.get("id", 1),
email=admin_email,
username=admin_username,
password=admin_password,
is_active=DEFAULT_ADMIN.get("is_active", True),
is_superuser=DEFAULT_ADMIN.get("is_superuser", True),
)
password_hash = password_context.hash(u.password)
# Upsert by username to avoid conflicting with different admin email configs
conn.execute(
text(
"INSERT INTO users (email, username, password_hash, is_active, is_superuser) "
"VALUES (:email, :username, :password_hash, :is_active, :is_superuser) "
"ON CONFLICT (username) DO UPDATE SET email = EXCLUDED.email, password_hash = EXCLUDED.password_hash, is_active = EXCLUDED.is_active, is_superuser = EXCLUDED.is_superuser"
),
dict(email=u.email, username=u.username, password_hash=password_hash,
is_active=u.is_active, is_superuser=u.is_superuser),
)
# ensure admin has admin role
# Resolve user_id for role assignment: select by username
row = conn.execute(text("SELECT id FROM users WHERE username = :username"), dict(
username=u.username)).fetchone()
if row is not None:
user_id = row.id
else:
user_id = None
if user_id is not None:
conn.execute(
text(
"INSERT INTO user_roles (user_id, role_id, granted_by) VALUES (:user_id, :role_id, :granted_by) "
"ON CONFLICT (user_id, role_id) DO NOTHING"
),
dict(user_id=user_id, role_id=1, granted_by=user_id),
)
def ensure_default_pricing(engine: Engine, is_sqlite: bool) -> None:
with engine.begin() as conn:
p = PricingSeed(**DEFAULT_PRICING)
# Try insert on slug conflict
conn.execute(
text(
"INSERT INTO pricing_settings (slug, name, description, default_currency, default_payable_pct, moisture_threshold_pct, moisture_penalty_per_pct) "
"VALUES (:slug, :name, :description, :default_currency, :default_payable_pct, :moisture_threshold_pct, :moisture_penalty_per_pct) "
"ON CONFLICT (slug) DO UPDATE SET name = EXCLUDED.name"
),
dict(
slug=p.slug,
name=p.name,
description=p.description,
default_currency=p.default_currency,
default_payable_pct=p.default_payable_pct,
moisture_threshold_pct=p.moisture_threshold_pct,
moisture_penalty_per_pct=p.moisture_penalty_per_pct,
),
)
def _project_id_by_name(conn, project_name: str) -> Optional[int]:
row = conn.execute(
text("SELECT id FROM projects WHERE name = :name"),
{"name": project_name},
).fetchone()
return row.id if row else None
def ensure_default_projects(engine: Engine, is_sqlite: bool) -> None:
with engine.begin() as conn:
for project in DEFAULT_PROJECTS:
conn.execute(
text(
"""
INSERT INTO projects (name, location, operation_type, description)
VALUES (:name, :location, :operation_type, :description)
ON CONFLICT (name) DO UPDATE SET
location = EXCLUDED.location,
operation_type = EXCLUDED.operation_type,
description = EXCLUDED.description
"""
),
project.model_dump(),
)
def ensure_default_scenarios(engine: Engine, is_sqlite: bool) -> None:
with engine.begin() as conn:
for scenario in DEFAULT_SCENARIOS:
project_id = _project_id_by_name(conn, scenario.project_name)
if project_id is None:
logger.warning(
"Skipping scenario seed '%s' because project '%s' does not exist",
scenario.name,
scenario.project_name,
)
continue
payload = scenario.model_dump(exclude={"project_name"})
payload.update({"project_id": project_id})
if is_sqlite:
sql = """
INSERT INTO scenarios (
project_id, name, description, status, discount_rate,
currency, primary_resource
)
VALUES (
:project_id, :name, :description, :status,
:discount_rate, :currency, :primary_resource
)
ON CONFLICT (project_id, name) DO UPDATE SET
description = EXCLUDED.description,
status = EXCLUDED.status,
discount_rate = EXCLUDED.discount_rate,
currency = EXCLUDED.currency,
primary_resource = EXCLUDED.primary_resource
"""
else:
sql = """
INSERT INTO scenarios (
project_id, name, description, status, discount_rate,
currency, primary_resource
)
VALUES (
:project_id, :name, :description, CAST(:status AS scenariostatus),
:discount_rate, :currency,
CASE WHEN :primary_resource IS NULL
THEN NULL
ELSE CAST(:primary_resource AS resourcetype)
END
)
ON CONFLICT (project_id, name) DO UPDATE SET
description = EXCLUDED.description,
status = EXCLUDED.status,
discount_rate = EXCLUDED.discount_rate,
currency = EXCLUDED.currency,
primary_resource = EXCLUDED.primary_resource
"""
conn.execute(text(sql), payload)
def ensure_default_financial_inputs(engine: Engine, is_sqlite: bool) -> None:
with engine.begin() as conn:
for item in DEFAULT_FINANCIAL_INPUTS:
project_id = _project_id_by_name(conn, item.project_name)
if project_id is None:
logger.warning(
"Skipping financial input '%s'; project '%s' missing",
item.name,
item.project_name,
)
continue
scenario_row = conn.execute(
text(
"SELECT id FROM scenarios WHERE project_id = :project_id AND name = :name"
),
{"project_id": project_id, "name": item.scenario_name},
).fetchone()
if scenario_row is None:
logger.warning(
"Skipping financial input '%s'; scenario '%s' missing for project '%s'",
item.name,
item.scenario_name,
item.project_name,
)
continue
payload = item.model_dump(
exclude={"project_name", "scenario_name"},
)
if is_sqlite:
# Convert Decimal to float for SQLite
payload["amount"] = float(payload["amount"])
payload.update({"scenario_id": scenario_row.id})
if is_sqlite:
sql = """
INSERT INTO financial_inputs (
scenario_id, name, category, cost_bucket, amount, currency, notes
)
VALUES (
:scenario_id, :name, :category, :cost_bucket,
:amount, :currency, :notes
)
ON CONFLICT (scenario_id, name) DO UPDATE SET
category = EXCLUDED.category,
cost_bucket = EXCLUDED.cost_bucket,
amount = EXCLUDED.amount,
currency = EXCLUDED.currency,
notes = EXCLUDED.notes
"""
else:
sql = """
INSERT INTO financial_inputs (
scenario_id, name, category, cost_bucket, amount, currency, notes
)
VALUES (
:scenario_id, :name, CAST(:category AS financialcategory),
CASE WHEN :cost_bucket IS NULL THEN NULL
ELSE CAST(:cost_bucket AS costbucket)
END,
:amount,
:currency,
:notes
)
ON CONFLICT (scenario_id, name) DO UPDATE SET
category = EXCLUDED.category,
cost_bucket = EXCLUDED.cost_bucket,
amount = EXCLUDED.amount,
currency = EXCLUDED.currency,
notes = EXCLUDED.notes
"""
conn.execute(text(sql), payload)
def init_db(database_url: Optional[str] = None) -> None:
"""Run the idempotent initialization sequence.
Steps:
- Ensure enum types exist.
- Ensure required tables exist.
- Seed roles and admin user.
- Ensure default pricing settings record exists.
- Seed sample projects, scenarios, and financial inputs.
"""
database_url = database_url or _get_database_url()
is_sqlite = _is_sqlite(database_url)
engine = _create_engine(database_url)
logger.info("Starting DB initialization using engine=%s", engine)
ensure_enums(engine, is_sqlite)
normalize_enum_values(engine, is_sqlite)
ensure_tables(engine, is_sqlite)
ensure_constraints(engine, is_sqlite)
seed_roles(engine, is_sqlite)
seed_admin_user(engine, is_sqlite)
ensure_default_pricing(engine, is_sqlite)
ensure_default_projects(engine, is_sqlite)
ensure_default_scenarios(engine, is_sqlite)
ensure_default_financial_inputs(engine, is_sqlite)
logger.info("DB initialization complete")
if __name__ == "__main__":
# Allow running manually: python -m scripts.init_db
logging.basicConfig(level=logging.INFO)
init_db()

91
scripts/reset_db.py Normal file
View File

@@ -0,0 +1,91 @@
"""Utility to reset development Postgres schema artifacts.
This script drops managed tables and enum types created by `scripts.init_db`.
It is intended for local development only; it refuses to run if CALMINER_ENV
indicates production or staging. The operation is idempotent: missing objects
are ignored. Use with caution.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from typing import Iterable
from sqlalchemy import text
from sqlalchemy.engine import Engine
from config.database import DATABASE_URL
from scripts.init_db import ENUM_DEFINITIONS, _create_engine
logger = logging.getLogger(__name__)
@dataclass(slots=True)
class ResetOptions:
drop_tables: bool = True
drop_enums: bool = True
MANAGED_TABLES: tuple[str, ...] = (
"simulation_parameters",
"financial_inputs",
"scenarios",
"projects",
"pricing_impurity_settings",
"pricing_metal_settings",
"pricing_settings",
"user_roles",
"users",
"roles",
)
FORBIDDEN_ENVIRONMENTS: set[str] = {"production", "staging", "prod", "stage"}
def _ensure_safe_environment() -> None:
env = os.getenv("CALMINER_ENV", "development").lower()
if env in FORBIDDEN_ENVIRONMENTS:
raise RuntimeError(
f"Refusing to reset database in environment '{env}'. "
"Set CALMINER_ENV to 'development' to proceed."
)
def _drop_tables(engine: Engine, tables: Iterable[str]) -> None:
if not tables:
return
with engine.begin() as conn:
for table in tables:
logger.info("Dropping table if exists: %s", table)
conn.execute(text(f"DROP TABLE IF EXISTS {table} CASCADE"))
def _drop_enums(engine: Engine, enum_names: Iterable[str]) -> None:
if not enum_names:
return
with engine.begin() as conn:
for enum_name in enum_names:
logger.info("Dropping enum type if exists: %s", enum_name)
conn.execute(text(f"DROP TYPE IF EXISTS {enum_name} CASCADE"))
def reset_database(*, options: ResetOptions | None = None, database_url: str | None = None) -> None:
"""Drop managed tables and enums for a clean slate."""
_ensure_safe_environment()
opts = options or ResetOptions()
engine = _create_engine(database_url or DATABASE_URL)
if opts.drop_tables:
_drop_tables(engine, MANAGED_TABLES)
if opts.drop_enums:
_drop_enums(engine, ENUM_DEFINITIONS.keys())
logger.info("Database reset complete")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
reset_database()

View File

@@ -1,42 +0,0 @@
"""Utility for applying Alembic migrations before application startup."""
from __future__ import annotations
import logging
from pathlib import Path
from alembic import command
from alembic.config import Config
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _load_env() -> None:
"""Ensure environment variables from .env are available."""
load_dotenv()
def _alembic_config(project_root: Path) -> Config:
config_path = project_root / "alembic.ini"
if not config_path.exists():
raise FileNotFoundError(f"Missing alembic.ini at {config_path}")
config = Config(str(config_path))
config.set_main_option("script_location", str(project_root / "alembic"))
return config
def run_migrations(target_revision: str = "head") -> None:
"""Apply Alembic migrations up to the given revision."""
project_root = Path(__file__).resolve().parent.parent
_load_env()
config = _alembic_config(project_root)
logger.info("Applying database migrations up to %s", target_revision)
command.upgrade(config, target_revision)
logger.info("Database migrations applied successfully")
if __name__ == "__main__":
run_migrations()

86
scripts/verify_db.py Normal file
View File

@@ -0,0 +1,86 @@
"""Verify DB initialization results: enums, roles, admin user, pricing_settings."""
from __future__ import annotations
import logging
from sqlalchemy import create_engine, text
from config.database import DATABASE_URL
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
ENUMS = [
'miningoperationtype',
'scenariostatus',
'financialcategory',
'costbucket',
'distributiontype',
'stochasticvariable',
'resourcetype',
]
SQL_CHECK_ENUM = "SELECT typname FROM pg_type WHERE typname = ANY(:names)"
SQL_ROLES = "SELECT id, name, display_name FROM roles ORDER BY id"
SQL_ADMIN = "SELECT id, email, username, is_active, is_superuser FROM users WHERE id = 1"
SQL_USER_ROLES = "SELECT user_id, role_id, granted_by FROM user_roles WHERE user_id = 1"
SQL_PRICING = "SELECT id, slug, name, default_currency FROM pricing_settings WHERE slug = 'default'"
def run():
engine = create_engine(DATABASE_URL, future=True)
with engine.connect() as conn:
print('Using DATABASE_URL:', DATABASE_URL)
# enums
res = conn.execute(text(SQL_CHECK_ENUM), dict(names=ENUMS)).fetchall()
found = [r[0] for r in res]
print('\nEnums found:')
for name in ENUMS:
print(f' {name}:', 'YES' if name in found else 'NO')
# roles
try:
roles = conn.execute(text(SQL_ROLES)).fetchall()
print('\nRoles:')
if roles:
for r in roles:
print(f' id={r.id} name={r.name} display_name={r.display_name}')
else:
print(' (no roles found)')
except Exception as e:
print('\nRoles query failed:', e)
# admin user
try:
admin = conn.execute(text(SQL_ADMIN)).fetchone()
print('\nAdmin user:')
if admin:
print(f' id={admin.id} email={admin.email} username={admin.username} is_active={admin.is_active} is_superuser={admin.is_superuser}')
else:
print(' (admin user not found)')
except Exception as e:
print('\nAdmin query failed:', e)
# user_roles
try:
ur = conn.execute(text(SQL_USER_ROLES)).fetchall()
print('\nUser roles for user_id=1:')
if ur:
for row in ur:
print(f' user_id={row.user_id} role_id={row.role_id} granted_by={row.granted_by}')
else:
print(' (no user_roles rows for user_id=1)')
except Exception as e:
print('\nUser_roles query failed:', e)
# pricing settings
try:
p = conn.execute(text(SQL_PRICING)).fetchone()
print('\nPricing settings (slug=default):')
if p:
print(f' id={p.id} slug={p.slug} name={p.name} default_currency={p.default_currency}')
else:
print(' (default pricing settings not found)')
except Exception as e:
print('\nPricing query failed:', e)
if __name__ == '__main__':
run()

View File

@@ -162,12 +162,21 @@ def bootstrap_pricing_settings(
uow.set_project_pricing_settings(project, default_settings)
assigned += 1
logger.info(
"Pricing bootstrap result: slug=%s created=%s updated_fields=%s impurity_upserts=%s projects_assigned=%s",
seed_result.settings.slug,
seed_result.created,
seed_result.updated_fields,
seed_result.impurity_upserts,
assigned,
)
return PricingBootstrapResult(seed=seed_result, projects_assigned=assigned)
# Capture logging-safe primitives while the UnitOfWork (and session)
# are still active to avoid DetachedInstanceError when accessing ORM
# instances outside the session scope.
seed_slug = seed_result.settings.slug if seed_result and seed_result.settings else None
seed_created = getattr(seed_result, "created", None)
seed_updated_fields = getattr(seed_result, "updated_fields", None)
seed_impurity_upserts = getattr(seed_result, "impurity_upserts", None)
logger.info(
"Pricing bootstrap result: slug=%s created=%s updated_fields=%s impurity_upserts=%s projects_assigned=%s",
seed_slug,
seed_created,
seed_updated_fields,
seed_impurity_upserts,
assigned,
)
return PricingBootstrapResult(seed=seed_result, projects_assigned=assigned)

View File

@@ -8,6 +8,9 @@ import math
from typing import Mapping, Sequence
from urllib.parse import urlencode
import plotly.graph_objects as go
import plotly.io as pio
from fastapi import Request
from models import FinancialCategory, Project, Scenario
@@ -515,6 +518,7 @@ class ReportingService:
"label": "Download JSON",
}
],
"chart_data": self._generate_npv_comparison_chart(reports),
}
def build_scenario_comparison_context(
@@ -611,8 +615,64 @@ class ReportingService:
"label": "Download JSON",
}
],
"chart_data": self._generate_distribution_histogram(report.monte_carlo) if report.monte_carlo else "{}",
}
def _generate_npv_comparison_chart(self, reports: Sequence[ScenarioReport]) -> str:
"""Generate Plotly chart JSON for NPV comparison across scenarios."""
scenario_names = []
npv_values = []
for report in reports:
scenario_names.append(report.scenario.name)
npv_values.append(report.deterministic.npv or 0)
fig = go.Figure(data=[
go.Bar(
x=scenario_names,
y=npv_values,
name='NPV',
marker_color='lightblue'
)
])
fig.update_layout(
title="NPV Comparison Across Scenarios",
xaxis_title="Scenario",
yaxis_title="NPV",
showlegend=False
)
return pio.to_json(fig) or "{}"
def _generate_distribution_histogram(self, monte_carlo: ScenarioMonteCarloResult) -> str:
"""Generate Plotly histogram for Monte Carlo distribution."""
if not monte_carlo.available or not monte_carlo.result or not monte_carlo.result.samples:
return "{}"
# Get NPV samples
npv_samples = monte_carlo.result.samples.get(SimulationMetric.NPV, [])
if len(npv_samples) == 0:
return "{}"
fig = go.Figure(data=[
go.Histogram(
x=npv_samples,
nbinsx=50,
name='NPV Distribution',
marker_color='lightgreen'
)
])
fig.update_layout(
title="Monte Carlo NPV Distribution",
xaxis_title="NPV",
yaxis_title="Frequency",
showlegend=False
)
return pio.to_json(fig) or "{}"
def _build_cash_flows(scenario: Scenario) -> tuple[list[CashFlow], ScenarioFinancialTotals]:
cash_flows: list[CashFlow] = []

View File

@@ -27,6 +27,11 @@ from services.export_query import ProjectExportFilters, ScenarioExportFilters
from services.pricing import PricingMetadata
def _enum_value(e):
"""Return the underlying value for Enum members, otherwise return as-is."""
return getattr(e, "value", e)
class ProjectRepository:
"""Persistence operations for Project entities."""
@@ -202,7 +207,9 @@ class ScenarioRepository:
return self.session.execute(stmt).scalar_one()
def count_by_status(self, status: ScenarioStatus) -> int:
stmt = select(func.count(Scenario.id)).where(Scenario.status == status)
status_val = _enum_value(status)
stmt = select(func.count(Scenario.id)).where(
Scenario.status == status_val)
return self.session.execute(stmt).scalar_one()
def recent(self, limit: int = 5, *, with_project: bool = False) -> Sequence[Scenario]:
@@ -219,9 +226,10 @@ class ScenarioRepository:
limit: int | None = None,
with_project: bool = False,
) -> Sequence[Scenario]:
status_val = _enum_value(status)
stmt = (
select(Scenario)
.where(Scenario.status == status)
.where(Scenario.status == status_val)
.order_by(Scenario.updated_at.desc())
)
if with_project:
@@ -311,7 +319,11 @@ class ScenarioRepository:
stmt = stmt.where(Scenario.name.ilike(name_pattern))
if filters.statuses:
stmt = stmt.where(Scenario.status.in_(filters.statuses))
# Accept Enum members or raw values in filters.statuses
status_values = [
_enum_value(s) for s in (filters.statuses or [])
]
stmt = stmt.where(Scenario.status.in_(status_values))
if filters.start_date_from:
stmt = stmt.where(Scenario.start_date >=

View File

@@ -1,3 +1,95 @@
:root {
--bg: #0b0f14;
--bg-2: #0f141b;
--card: #151b23;
--text: #e6edf3;
--muted: #a9b4c0;
--brand: #f1b21a;
--brand-2: #f6c648;
--brand-3: #f9d475;
--accent: #2ba58f;
--danger: #d14b4b;
--shadow: 0 10px 30px rgba(0, 0, 0, 0.35);
--radius: 14px;
--radius-sm: 10px;
--container: 1180px;
--muted: var(--muted);
--color-text-subtle: rgba(169, 180, 192, 0.6);
--color-text-invert: #ffffff;
--color-text-dark: #0f172a;
--color-text-strong: #111827;
--color-border: rgba(255, 255, 255, 0.08);
--color-border-strong: rgba(255, 255, 255, 0.12);
--color-highlight: rgba(241, 178, 26, 0.08);
--color-panel-shadow: rgba(0, 0, 0, 0.25);
--color-panel-shadow-deep: rgba(0, 0, 0, 0.35);
--color-surface-alt: rgba(21, 27, 35, 0.7);
--space-2xs: 0.25rem;
--space-xs: 0.5rem;
--space-sm: 0.75rem;
--space-md: 1rem;
--space-lg: 1.5rem;
--space-xl: 2rem;
--space-2xl: 3rem;
--font-size-xs: 0.75rem;
--font-size-sm: 0.875rem;
--font-size-base: 1rem;
--font-size-lg: 1.25rem;
--font-size-xl: 1.5rem;
--font-size-2xl: 2rem;
--panel-radius: var(--radius);
--table-radius: var(--radius-sm);
}
* {
box-sizing: border-box;
}
html,
body {
height: 100%;
}
body {
margin: 0;
font-family: ui-sans-serif, system-ui, -apple-system, "Segoe UI", "Roboto",
Helvetica, Arial, "Apple Color Emoji", "Segoe UI Emoji";
color: var(--text);
background: linear-gradient(180deg, var(--bg) 0%, var(--bg-2) 100%);
line-height: 1.45;
}
h1,
h2,
h3,
h4,
h5,
h6 {
margin: 0 0 0.5rem 0;
font-weight: 700;
line-height: 1.2;
}
h1 {
font-size: var(--font-size-2xl);
}
h2 {
font-size: var(--font-size-xl);
}
h3 {
font-size: var(--font-size-lg);
}
p {
margin: 0 0 1rem 0;
}
a {
color: var(--brand);
}
.report-overview {
margin-bottom: 2.5rem;
}
@@ -25,6 +117,16 @@
margin-top: 3rem;
}
.chart-container {
width: 100%;
height: 400px;
background: rgba(15, 20, 27, 0.8);
border-radius: var(--radius-sm);
border: 1px solid rgba(255, 255, 255, 0.05);
box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.06);
margin-bottom: 1rem;
}
.section-header {
margin-bottom: 1.25rem;
}
@@ -199,97 +301,6 @@
background: rgba(241, 178, 26, 0.14);
border-color: var(--brand);
}
:root {
--bg: #0b0f14;
--bg-2: #0f141b;
--card: #151b23;
--text: #e6edf3;
--muted: #a9b4c0;
--brand: #f1b21a;
--brand-2: #f6c648;
--brand-3: #f9d475;
--accent: #2ba58f;
--danger: #d14b4b;
--shadow: 0 10px 30px rgba(0, 0, 0, 0.35);
--radius: 14px;
--radius-sm: 10px;
--container: 1180px;
--muted: var(--muted);
--color-text-subtle: rgba(169, 180, 192, 0.6);
--color-text-invert: #ffffff;
--color-text-dark: #0f172a;
--color-text-strong: #111827;
--color-border: rgba(255, 255, 255, 0.08);
--color-border-strong: rgba(255, 255, 255, 0.12);
--color-highlight: rgba(241, 178, 26, 0.08);
--color-panel-shadow: rgba(0, 0, 0, 0.25);
--color-panel-shadow-deep: rgba(0, 0, 0, 0.35);
--color-surface-alt: rgba(21, 27, 35, 0.7);
--space-2xs: 0.25rem;
--space-xs: 0.5rem;
--space-sm: 0.75rem;
--space-md: 1rem;
--space-lg: 1.5rem;
--space-xl: 2rem;
--space-2xl: 3rem;
--font-size-xs: 0.75rem;
--font-size-sm: 0.875rem;
--font-size-base: 1rem;
--font-size-lg: 1.25rem;
--font-size-xl: 1.5rem;
--font-size-2xl: 2rem;
--panel-radius: var(--radius);
--table-radius: var(--radius-sm);
}
* {
box-sizing: border-box;
}
html,
body {
height: 100%;
}
body {
margin: 0;
font-family: ui-sans-serif, system-ui, -apple-system, "Segoe UI", "Roboto",
Helvetica, Arial, "Apple Color Emoji", "Segoe UI Emoji";
color: var(--text);
background: linear-gradient(180deg, var(--bg) 0%, var(--bg-2) 100%);
line-height: 1.45;
}
h1,
h2,
h3,
h4,
h5,
h6 {
margin: 0 0 0.5rem 0;
font-weight: 700;
line-height: 1.2;
}
h1 {
font-size: var(--font-size-2xl);
}
h2 {
font-size: var(--font-size-xl);
}
h3 {
font-size: var(--font-size-lg);
}
p {
margin: 0 0 1rem 0;
}
a {
color: var(--brand);
}
.app-layout {
display: flex;
@@ -321,20 +332,29 @@ a {
display: flex;
align-items: center;
gap: 1rem;
padding: 0.5rem 1rem;
border-radius: 0.75rem;
}
a.sidebar-brand {
text-decoration: none;
}
a.sidebar-brand:hover,
a.sidebar-brand:focus {
color: var(--color-text-invert);
background-color: rgba(148, 197, 255, 0.18);
}
.sidebar-nav-controls {
display: flex;
justify-content: center;
gap: 0.5rem;
margin: 1rem 0;
gap: 10px;
margin: 0;
}
.nav-chevron {
width: 40px;
height: 40px;
width: 80px;
height: 80px;
border: none;
border-radius: 50%;
background: rgba(255, 255, 255, 0.1);
color: rgba(255, 255, 255, 0.88);
font-size: 1.2rem;
@@ -886,8 +906,9 @@ a {
border: none;
cursor: pointer;
font-weight: 600;
background-color: var(--color-border);
background-color: var(--brand);
color: var(--color-text-dark);
text-decoration: none;
transition: transform 0.15s ease, box-shadow 0.15s ease;
}
@@ -899,7 +920,7 @@ a {
.btn.primary {
background-color: var(--brand-2);
color: var(--color-text-invert);
color: var(--color-text-dark);
}
.btn.primary:hover,
@@ -907,6 +928,14 @@ a {
background-color: var(--brand-3);
}
.btn.btn-link {
background: var(--brand);
color: var(--color-text-dark);
text-decoration: none;
border: 1px solid var(--brand);
margin-bottom: 0.5rem;
}
.result-output {
background-color: var(--color-text-dark);
color: var(--color-surface-alt);

BIN
static/img/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 MiB

BIN
static/img/logo_128x128.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.8 MiB

After

Width:  |  Height:  |  Size: 831 KiB

View File

@@ -7,12 +7,12 @@ document.addEventListener("DOMContentLoaded", function () {
// Define the navigation order (main pages)
const navPages = [
"/",
"/projects/ui",
"/imports/ui",
"/ui/simulations",
"/ui/reporting",
"/ui/settings",
window.NAVIGATION_URLS.dashboard,
window.NAVIGATION_URLS.projects,
window.NAVIGATION_URLS.imports,
window.NAVIGATION_URLS.simulations,
window.NAVIGATION_URLS.reporting,
window.NAVIGATION_URLS.settings,
];
const currentPath = window.location.pathname;

View File

@@ -5,7 +5,7 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>{% block title %}CalMiner{% endblock %}</title>
<link rel="stylesheet" href="/static/css/main.css" />
<link rel="stylesheet" href="/static/css/imports.css" />
<link rel="stylesheet" href="/static/css/imports.css" />
{% block head_extra %}{% endblock %}
</head>
<body>
@@ -21,11 +21,27 @@
</div>
</div>
{% block scripts %}{% endblock %}
<script src="/static/js/projects.js" defer></script>
<script src="/static/js/exports.js" defer></script>
<script src="/static/js/imports.js" defer></script>
<script src="/static/js/notifications.js" defer></script>
<script src="/static/js/navigation.js" defer></script>
<script>
window.NAVIGATION_URLS = {
dashboard:
'{{ request.url_for("dashboard.home") if request else "/" }}',
projects:
'{{ request.url_for("projects.project_list_page") if request else "/projects/ui" }}',
imports:
'{{ request.url_for("imports.ui") if request else "/imports/ui" }}',
simulations:
'{{ request.url_for("ui.simulations") if request else "/ui/simulations" }}',
reporting:
'{{ request.url_for("ui.reporting") if request else "/ui/reporting" }}',
settings:
'{{ request.url_for("ui.settings") if request else "/ui/settings" }}',
};
</script>
<script src="/static/js/projects.js" defer></script>
<script src="/static/js/exports.js" defer></script>
<script src="/static/js/imports.js" defer></script>
<script src="/static/js/notifications.js" defer></script>
<script src="/static/js/navigation.js" defer></script>
<script src="/static/js/theme.js"></script>
</body>
</html>

31
templates/currencies.html Normal file
View File

@@ -0,0 +1,31 @@
{% extends "base.html" %}
{% block title %}{{ title }} | CalMiner{% endblock %}
{% block content %}
<div class="page-header">
<div>
<h1>{{ title }}</h1>
<p class="page-subtitle">Manage currency settings and exchange rates for financial calculations.</p>
</div>
</div>
<div class="settings-grid">
<div class="settings-card">
<h2>Currency Configuration</h2>
<p>Define available currencies and their properties.</p>
<p class="settings-card-note">Currency management coming soon</p>
</div>
<div class="settings-card">
<h2>Exchange Rates</h2>
<p>Configure and update currency exchange rates.</p>
<p class="settings-card-note">Exchange rate management coming soon</p>
</div>
<div class="settings-card">
<h2>Default Settings</h2>
<p>Set default currencies for new projects and scenarios.</p>
<p class="settings-card-note">Default currency settings coming soon</p>
</div>
</div>
{% endblock %}

View File

@@ -1,7 +1,7 @@
<footer class="site-footer">
<div class="container footer-inner">
<div class="footer-logo">
<img src="/static/img/logo_big.png" alt="CalMiner Logo" class="footer-logo-img" />
<img src="/static/img/logo_128x128.png" alt="CalMiner Logo" class="footer-logo-img" />
</div>
<p>
&copy; {{ current_year }} CalMiner by

View File

@@ -1,6 +1,6 @@
<div class="sidebar-inner">
<a class="sidebar-brand" href="{{ request.url_for('dashboard.home') }}">
<img src="/static/img/logo_big.png" alt="CalMiner Logo" class="brand-logo" />
<img src="/static/img/logo.png" alt="CalMiner Logo" class="brand-logo" />
<div class="brand-text">
<span class="brand-title">CalMiner</span>
<span class="brand-subtitle">Mining Planner</span>

View File

@@ -1,98 +1,67 @@
{% set dashboard_href = request.url_for('dashboard.home') if request else '/' %}
{% set projects_href = request.url_for('projects.project_list_page') if request else '/projects/ui' %}
{% set project_create_href = request.url_for('projects.create_project_form') if request else '/projects/create' %}
{% set auth_session = request.state.auth_session if request else None %}
{% set is_authenticated = auth_session and auth_session.is_authenticated %}
{% if is_authenticated %}
{% set logout_href = request.url_for('auth.logout') if request else '/logout' %}
{% set account_links = [
{"href": logout_href, "label": "Logout", "match_prefix": "/logout"}
] %}
{% else %}
{% set login_href = request.url_for('auth.login_form') if request else '/login' %}
{% set register_href = request.url_for('auth.register_form') if request else '/register' %}
{% set forgot_href = request.url_for('auth.password_reset_request_form') if request else '/forgot-password' %}
{% set account_links = [
{"href": login_href, "label": "Login", "match_prefix": "/login"},
{"href": register_href, "label": "Register", "match_prefix": "/register"},
{"href": forgot_href, "label": "Forgot Password", "match_prefix": "/forgot-password"}
] %}
{% endif %}
{% set nav_groups = [
{
"label": "Workspace",
"links": [
{"href": dashboard_href, "label": "Dashboard", "match_prefix": "/"},
{"href": projects_href, "label": "Projects", "match_prefix": "/projects"},
{"href": project_create_href, "label": "New Project", "match_prefix": "/projects/create"},
{"href": "/imports/ui", "label": "Imports", "match_prefix": "/imports"}
]
},
{
"label": "Insights",
"links": [
{"href": "/ui/simulations", "label": "Simulations"},
{"href": "/ui/reporting", "label": "Reporting"}
]
},
{
"label": "Configuration",
"links": [
{
"href": "/ui/settings",
"label": "Settings",
"children": [
{"href": "/theme-settings", "label": "Themes"},
{"href": "/ui/currencies", "label": "Currency Management"}
]
}
]
},
{
"label": "Account",
"links": account_links
}
] %}
{% set projects_href = request.url_for('projects.project_list_page') if request
else '/projects/ui' %} {% set project_create_href =
request.url_for('projects.create_project_form') if request else
'/projects/create' %} {% set auth_session = request.state.auth_session if
request else None %} {% set is_authenticated = auth_session and
auth_session.is_authenticated %} {% if is_authenticated %} {% set logout_href =
request.url_for('auth.logout') if request else '/logout' %} {% set account_links
= [ {"href": logout_href, "label": "Logout", "match_prefix": "/logout"} ] %} {%
else %} {% set login_href = request.url_for('auth.login_form') if request else
'/login' %} {% set register_href = request.url_for('auth.register_form') if
request else '/register' %} {% set forgot_href =
request.url_for('auth.password_reset_request_form') if request else
'/forgot-password' %} {% set account_links = [ {"href": login_href, "label":
"Login", "match_prefix": "/login"}, {"href": register_href, "label": "Register",
"match_prefix": "/register"}, {"href": forgot_href, "label": "Forgot Password",
"match_prefix": "/forgot-password"} ] %} {% endif %} {% set nav_groups = [ {
"label": "Workspace", "links": [ {"href": dashboard_href, "label": "Dashboard",
"match_prefix": "/"}, {"href": projects_href, "label": "Projects",
"match_prefix": "/projects"}, {"href": project_create_href, "label": "New
Project", "match_prefix": "/projects/create"}, {"href": "/imports/ui", "label":
"Imports", "match_prefix": "/imports"} ] }, { "label": "Insights", "links": [
{"href": "/ui/simulations", "label": "Simulations"}, {"href": "/ui/reporting",
"label": "Reporting"} ] }, { "label": "Configuration", "links": [ { "href":
"/ui/settings", "label": "Settings", "children": [ {"href": "/theme-settings",
"label": "Themes"}, {"href": "/ui/currencies", "label": "Currency Management"} ]
} ] }, { "label": "Account", "links": account_links } ] %}
<nav class="sidebar-nav" aria-label="Primary navigation">
{% set current_path = request.url.path if request else '' %}
{% for group in nav_groups %}
{% if group.links %}
<div class="sidebar-section">
<div class="sidebar-section-label">{{ group.label }}</div>
<div class="sidebar-section-links">
{% for link in group.links %}
{% set href = link.href %}
{% set match_prefix = link.get('match_prefix', href) %}
{% if match_prefix == '/' %}
{% set is_active = current_path == '/' %}
{% else %}
{% set is_active = current_path.startswith(match_prefix) %}
{% endif %}
<div class="sidebar-link-block">
<a href="{{ href }}" class="sidebar-link{% if is_active %} is-active{% endif %}">
{{ link.label }}
</a>
{% if link.children %}
<div class="sidebar-sublinks">
{% for child in link.children %}
{% set child_prefix = child.get('match_prefix', child.href) %}
{% if child_prefix == '/' %}
{% set child_active = current_path == '/' %}
{% else %}
{% set child_active = current_path.startswith(child_prefix) %}
{% endif %}
<a href="{{ child.href }}" class="sidebar-sublink{% if child_active %} is-active{% endif %}">
{{ child.label }}
</a>
{% endfor %}
</div>
{% endif %}
</div>
{% set current_path = request.url.path if request else '' %} {% for group in
nav_groups %} {% if group.links %}
<div class="sidebar-section">
<div class="sidebar-section-label">{{ group.label }}</div>
<div class="sidebar-section-links">
{% for link in group.links %} {% set href = link.href | string %} {% set
match_prefix = link.get('match_prefix', href) | string %} {% if
match_prefix == '/' %} {% set is_active = current_path == '/' %} {% else
%} {% set is_active = current_path.startswith(match_prefix) %} {% endif %}
<div class="sidebar-link-block">
<a
href="{{ href }}"
class="sidebar-link{% if is_active %} is-active{% endif %}"
>
{{ link.label }}
</a>
{% if link.children %}
<div class="sidebar-sublinks">
{% for child in link.children %} {% set child_prefix =
child.get('match_prefix', child.href) | string %} {% if child_prefix
== '/' %} {% set child_active = current_path == '/' %} {% else %} {%
set child_active = current_path.startswith(child_prefix) %} {% endif
%}
<a
href="{{ child.href | string }}"
class="sidebar-sublink{% if child_active %} is-active{% endif %}"
>
{{ child.label }}
</a>
{% endfor %}
</div>
{% endif %}
</div>
{% endif %}
{% endfor %}
{% endfor %}
</div>
</div>
{% endif %} {% endfor %}
</nav>

23
templates/reporting.html Normal file
View File

@@ -0,0 +1,23 @@
{% extends "base.html" %} {% block title %}{{ title }} | CalMiner{% endblock %}
{% block content %} {% include "partials/reports_header.html" %}
<section class="report-overview">
<div class="report-grid">
<article class="report-card">
<h2>Reporting Dashboard</h2>
<p class="muted">Generate and view comprehensive financial reports.</p>
<p class="muted">
Access project summaries, scenario comparisons, and distribution
analysis.
</p>
<div class="page-actions">
<a
href="{{ request.url_for('projects.project_list_page') }}"
class="button"
>View Reports</a
>
</div>
</article>
</div>
</section>
{% endblock %}

View File

@@ -1,205 +1,248 @@
{% extends "base.html" %}
{% block title %}Project Summary | CalMiner{% endblock %}
{% extends "base.html" %} {% block title %}Project Summary | CalMiner{% endblock
%} {% block content %} {% include "partials/reports_header.html" %} {% include
"partials/reports/options_card.html" %} {% include
"partials/reports/filters_card.html" %}
{% block content %}
{% include "partials/reports_header.html" %}
<section class="report-overview">
<div class="report-grid">
<article class="report-card">
<h2>Project Details</h2>
<dl class="definition-list">
<div>
<dt>Name</dt>
<dd>{{ project.name }}</dd>
</div>
<div>
<dt>Location</dt>
<dd>{{ project.location or "—" }}</dd>
</div>
<div>
<dt>Operation Type</dt>
<dd>{{ project.operation_type | replace("_", " ") | title }}</dd>
</div>
<div>
<dt>Scenarios</dt>
<dd>{{ scenario_count }}</dd>
</div>
<div>
<dt>Created</dt>
<dd>{{ project.created_at | format_datetime }}</dd>
</div>
<div>
<dt>Updated</dt>
<dd>{{ project.updated_at | format_datetime }}</dd>
</div>
</dl>
</article>
{% include "partials/reports/options_card.html" %}
{% include "partials/reports/filters_card.html" %}
<article class="report-card">
<h2>Financial Summary</h2>
<ul class="metric-list">
<li>
<span>Total Inflows</span>
<strong
>{{ aggregates.financials.total_inflows |
currency_display(project.currency) }}</strong
>
</li>
<li>
<span>Total Outflows</span>
<strong
>{{ aggregates.financials.total_outflows |
currency_display(project.currency) }}</strong
>
</li>
<li>
<span>Net Cash Flow</span>
<strong
>{{ aggregates.financials.total_net |
currency_display(project.currency) }}</strong
>
</li>
</ul>
</article>
<section class="report-overview">
<div class="report-grid">
<article class="report-card">
<h2>Project Details</h2>
<dl class="definition-list">
<div>
<dt>Name</dt>
<dd>{{ project.name }}</dd>
</div>
<div>
<dt>Location</dt>
<dd>{{ project.location or "—" }}</dd>
</div>
<div>
<dt>Operation Type</dt>
<dd>{{ project.operation_type | replace("_", " ") | title }}</dd>
</div>
<div>
<dt>Scenarios</dt>
<dd>{{ scenario_count }}</dd>
</div>
<div>
<dt>Created</dt>
<dd>{{ project.created_at | format_datetime }}</dd>
</div>
<div>
<dt>Updated</dt>
<dd>{{ project.updated_at | format_datetime }}</dd>
</div>
</dl>
</article>
<article class="report-card">
<h2>Deterministic Metrics</h2>
{% if aggregates.deterministic_metrics %}
<table class="metrics-table">
<thead>
<tr>
<th scope="col">Metric</th>
<th scope="col">Average</th>
<th scope="col">Best</th>
<th scope="col">Worst</th>
</tr>
</thead>
<tbody>
{% for key, metric in aggregates.deterministic_metrics.items() %}
<tr>
<th scope="row">{{ key | replace("_", " ") | title }}</th>
<td>{{ metric.average | format_metric(key, project.currency) }}</td>
<td>{{ metric.maximum | format_metric(key, project.currency) }}</td>
<td>{{ metric.minimum | format_metric(key, project.currency) }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="muted">
Deterministic metrics are unavailable for the current filters.
</p>
{% endif %}
</article>
</div>
</section>
<article class="report-card">
<h2>Financial Summary</h2>
<ul class="metric-list">
<section class="report-section">
<header class="section-header">
<h2>NPV Comparison</h2>
<p class="section-subtitle">
Visual comparison of Net Present Value across scenarios.
</p>
</header>
<div id="npv-chart" class="chart-container"></div>
</section>
<section class="report-section">
<header class="section-header">
<h2>Scenario Breakdown</h2>
<p class="section-subtitle">
Deterministic metrics and Monte Carlo summaries for each scenario.
</p>
</header>
{% if scenarios %} {% for item in scenarios %}
<article class="scenario-card">
<div class="scenario-card-header">
<div>
<h3>{{ item.scenario.name }}</h3>
<p class="muted">
{{ item.scenario.status | title }} · {{ item.scenario.primary_resource
or "No primary resource" }}
</p>
</div>
<div class="scenario-meta">
<span class="meta-label">Currency</span>
<span class="meta-value"
>{{ item.scenario.currency or project.currency or "—" }}</span
>
</div>
{% include "partials/reports/scenario_actions.html" %}
</div>
<div class="scenario-grid">
<section class="scenario-panel">
<h4>Financial Totals</h4>
<ul class="metric-list compact">
<li>
<span>Total Inflows</span>
<strong>{{ aggregates.financials.total_inflows | currency_display(project.currency) }}</strong>
<span>Inflows</span>
<strong
>{{ item.financials.inflows |
currency_display(item.scenario.currency or project.currency)
}}</strong
>
</li>
<li>
<span>Total Outflows</span>
<strong>{{ aggregates.financials.total_outflows | currency_display(project.currency) }}</strong>
<span>Outflows</span>
<strong
>{{ item.financials.outflows |
currency_display(item.scenario.currency or project.currency)
}}</strong
>
</li>
<li>
<span>Net Cash Flow</span>
<strong>{{ aggregates.financials.total_net | currency_display(project.currency) }}</strong>
<span>Net</span>
<strong
>{{ item.financials.net | currency_display(item.scenario.currency
or project.currency) }}</strong
>
</li>
</ul>
</article>
<article class="report-card">
<h2>Deterministic Metrics</h2>
{% if aggregates.deterministic_metrics %}
<table class="metrics-table">
<thead>
<tr>
<th scope="col">Metric</th>
<th scope="col">Average</th>
<th scope="col">Best</th>
<th scope="col">Worst</th>
</tr>
</thead>
<tbody>
{% for key, metric in aggregates.deterministic_metrics.items() %}
<tr>
<th scope="row">{{ key | replace("_", " ") | title }}</th>
<td>{{ metric.average | format_metric(key, project.currency) }}</td>
<td>{{ metric.maximum | format_metric(key, project.currency) }}</td>
<td>{{ metric.minimum | format_metric(key, project.currency) }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<h5>By Category</h5>
{% if item.financials.by_category %}
<ul class="metric-list compact">
{% for label, value in item.financials.by_category.items() %}
<li>
<span>{{ label | replace("_", " ") | title }}</span>
<strong
>{{ value | currency_display(item.scenario.currency or
project.currency) }}</strong
>
</li>
{% endfor %}
</ul>
{% else %}
<p class="muted">Deterministic metrics are unavailable for the current filters.</p>
<p class="muted">No financial inputs recorded.</p>
{% endif %}
</article>
</section>
<section class="scenario-panel">
<h4>Deterministic Metrics</h4>
<table class="metrics-table">
<tbody>
<tr>
<th scope="row">Discount Rate</th>
<td>{{ item.metrics.discount_rate | percentage_display }}</td>
</tr>
<tr>
<th scope="row">NPV</th>
<td>
{{ item.metrics.npv | currency_display(item.scenario.currency or
project.currency) }}
</td>
</tr>
<tr>
<th scope="row">IRR</th>
<td>{{ item.metrics.irr | percentage_display }}</td>
</tr>
<tr>
<th scope="row">Payback Period</th>
<td>{{ item.metrics.payback_period | period_display }}</td>
</tr>
</tbody>
</table>
{% if item.metrics.notes %}
<ul class="note-list">
{% for note in item.metrics.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %}
</section>
<section class="scenario-panel">
<h4>Monte Carlo Summary</h4>
{% if item.monte_carlo and item.monte_carlo.available %}
<p class="muted">
Iterations: {{ item.monte_carlo.iterations }} {% if percentiles %} ·
Percentiles: {% for percentile in percentiles %} {{ '%g' % percentile
}}{% if not loop.last %}, {% endif %} {% endfor %} {% endif %}
</p>
{% include "partials/reports/monte_carlo_table.html" %} {% else %}
<p class="muted">
Monte Carlo metrics are unavailable for this scenario.
</p>
{% if item.monte_carlo and item.monte_carlo.notes %}
<ul class="note-list">
{% for note in item.monte_carlo.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %} {% endif %}
</section>
</div>
</section>
<section class="report-section">
<header class="section-header">
<h2>Scenario Breakdown</h2>
<p class="section-subtitle">Deterministic metrics and Monte Carlo summaries for each scenario.</p>
</header>
{% if scenarios %}
{% for item in scenarios %}
<article class="scenario-card">
<div class="scenario-card-header">
<div>
<h3>{{ item.scenario.name }}</h3>
<p class="muted">{{ item.scenario.status | title }} · {{ item.scenario.primary_resource or "No primary resource" }}</p>
</div>
<div class="scenario-meta">
<span class="meta-label">Currency</span>
<span class="meta-value">{{ item.scenario.currency or project.currency or "—" }}</span>
</div>
{% include "partials/reports/scenario_actions.html" %}
</div>
<div class="scenario-grid">
<section class="scenario-panel">
<h4>Financial Totals</h4>
<ul class="metric-list compact">
<li>
<span>Inflows</span>
<strong>{{ item.financials.inflows | currency_display(item.scenario.currency or project.currency) }}</strong>
</li>
<li>
<span>Outflows</span>
<strong>{{ item.financials.outflows | currency_display(item.scenario.currency or project.currency) }}</strong>
</li>
<li>
<span>Net</span>
<strong>{{ item.financials.net | currency_display(item.scenario.currency or project.currency) }}</strong>
</li>
</ul>
<h5>By Category</h5>
{% if item.financials.by_category %}
<ul class="metric-list compact">
{% for label, value in item.financials.by_category.items() %}
<li>
<span>{{ label | replace("_", " ") | title }}</span>
<strong>{{ value | currency_display(item.scenario.currency or project.currency) }}</strong>
</li>
{% endfor %}
</ul>
{% else %}
<p class="muted">No financial inputs recorded.</p>
{% endif %}
</section>
<section class="scenario-panel">
<h4>Deterministic Metrics</h4>
<table class="metrics-table">
<tbody>
<tr>
<th scope="row">Discount Rate</th>
<td>{{ item.metrics.discount_rate | percentage_display }}</td>
</tr>
<tr>
<th scope="row">NPV</th>
<td>{{ item.metrics.npv | currency_display(item.scenario.currency or project.currency) }}</td>
</tr>
<tr>
<th scope="row">IRR</th>
<td>{{ item.metrics.irr | percentage_display }}</td>
</tr>
<tr>
<th scope="row">Payback Period</th>
<td>{{ item.metrics.payback_period | period_display }}</td>
</tr>
</tbody>
</table>
{% if item.metrics.notes %}
<ul class="note-list">
{% for note in item.metrics.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %}
</section>
<section class="scenario-panel">
<h4>Monte Carlo Summary</h4>
{% if item.monte_carlo and item.monte_carlo.available %}
<p class="muted">
Iterations: {{ item.monte_carlo.iterations }}
{% if percentiles %}
· Percentiles:
{% for percentile in percentiles %}
{{ '%g' % percentile }}{% if not loop.last %}, {% endif %}
{% endfor %}
{% endif %}
</p>
{% include "partials/reports/monte_carlo_table.html" %}
{% else %}
<p class="muted">Monte Carlo metrics are unavailable for this scenario.</p>
{% if item.monte_carlo and item.monte_carlo.notes %}
<ul class="note-list">
{% for note in item.monte_carlo.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %}
{% endif %}
</section>
</div>
</article>
{% endfor %}
{% else %}
<p class="muted">No scenarios match the current filters.</p>
{% endif %}
</section>
</article>
{% endfor %} {% else %}
<p class="muted">No scenarios match the current filters.</p>
{% endif %}
</section>
{% endblock %} {% block scripts %}
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<script>
const chartData = {{ chart_data | safe }};
if (chartData && chartData.data) {
Plotly.newPlot('npv-chart', chartData.data, chartData.layout);
}
</script>
{% endblock %}

View File

@@ -1,149 +1,177 @@
{% extends "base.html" %}
{% block title %}Scenario Distribution | CalMiner{% endblock %}
{% extends "base.html" %} {% block title %}Scenario Distribution | CalMiner{%
endblock %} {% block content %} {% include "partials/reports_header.html" %}
{% block content %}
{% include "partials/reports_header.html" %}
<section class="report-overview">
<div class="report-grid">
<article class="report-card">
<h2>Scenario Details</h2>
<dl class="definition-list">
<div>
<dt>Name</dt>
<dd>{{ scenario.name }}</dd>
</div>
<div>
<dt>Project ID</dt>
<dd>{{ scenario.project_id }}</dd>
</div>
<div>
<dt>Status</dt>
<dd>{{ scenario.status | title }}</dd>
</div>
<div>
<dt>Currency</dt>
<dd>{{ scenario.currency or "—" }}</dd>
</div>
<div>
<dt>Discount Rate</dt>
<dd>{{ metrics.discount_rate | percentage_display }}</dd>
</div>
<div>
<dt>Updated</dt>
<dd>{{ scenario.updated_at | format_datetime }}</dd>
</div>
</dl>
</article>
<section class="report-overview">
<div class="report-grid">
<article class="report-card">
<h2>Scenario Details</h2>
<dl class="definition-list">
<div>
<dt>Name</dt>
<dd>{{ scenario.name }}</dd>
</div>
<div>
<dt>Project ID</dt>
<dd>{{ scenario.project_id }}</dd>
</div>
<div>
<dt>Status</dt>
<dd>{{ scenario.status | title }}</dd>
</div>
<div>
<dt>Currency</dt>
<dd>{{ scenario.currency or "—" }}</dd>
</div>
<div>
<dt>Discount Rate</dt>
<dd>{{ metrics.discount_rate | percentage_display }}</dd>
</div>
<div>
<dt>Updated</dt>
<dd>{{ scenario.updated_at | format_datetime }}</dd>
</div>
</dl>
</article>
<article class="report-card">
<h2>Financial Totals</h2>
<ul class="metric-list">
<li>
<span>Inflows</span>
<strong>{{ summary.inflows | currency_display(scenario.currency) }}</strong>
</li>
<li>
<span>Outflows</span>
<strong>{{ summary.outflows | currency_display(scenario.currency) }}</strong>
</li>
<li>
<span>Net Cash Flow</span>
<strong>{{ summary.net | currency_display(scenario.currency) }}</strong>
</li>
</ul>
{% if summary.by_category %}
<h3>By Category</h3>
<ul class="metric-list compact">
{% for label, value in summary.by_category.items() %}
<li>
<span>{{ label | replace("_", " ") | title }}</span>
<strong>{{ value | currency_display(scenario.currency) }}</strong>
</li>
{% endfor %}
</ul>
{% endif %}
</article>
</div>
</section>
<section class="report-section">
<header class="section-header">
<h2>Deterministic Metrics</h2>
<p class="section-subtitle">Key financial indicators calculated from deterministic cash flows.</p>
</header>
<table class="metrics-table">
<tbody>
<tr>
<th scope="row">NPV</th>
<td>{{ metrics.npv | currency_display(scenario.currency) }}</td>
</tr>
<tr>
<th scope="row">IRR</th>
<td>{{ metrics.irr | percentage_display }}</td>
</tr>
<tr>
<th scope="row">Payback Period</th>
<td>{{ metrics.payback_period | period_display }}</td>
</tr>
</tbody>
</table>
{% if metrics.notes %}
<ul class="note-list">
{% for note in metrics.notes %}
<li>{{ note }}</li>
<article class="report-card">
<h2>Financial Totals</h2>
<ul class="metric-list">
<li>
<span>Inflows</span>
<strong
>{{ summary.inflows | currency_display(scenario.currency) }}</strong
>
</li>
<li>
<span>Outflows</span>
<strong
>{{ summary.outflows | currency_display(scenario.currency)
}}</strong
>
</li>
<li>
<span>Net Cash Flow</span>
<strong
>{{ summary.net | currency_display(scenario.currency) }}</strong
>
</li>
</ul>
{% if summary.by_category %}
<h3>By Category</h3>
<ul class="metric-list compact">
{% for label, value in summary.by_category.items() %}
<li>
<span>{{ label | replace("_", " ") | title }}</span>
<strong>{{ value | currency_display(scenario.currency) }}</strong>
</li>
{% endfor %}
</ul>
{% endif %}
</section>
<section class="report-section">
<header class="section-header">
<h2>Monte Carlo Distribution</h2>
<p class="section-subtitle">Simulation-driven distributions contextualize stochastic variability.</p>
</header>
{% if monte_carlo and monte_carlo.available %}
<div class="simulation-summary">
<p>Iterations: {{ monte_carlo.iterations }} · Percentiles: {{ percentiles | join(", ") }}</p>
<table class="metrics-table">
<thead>
<tr>
<th scope="col">Metric</th>
<th scope="col">Mean</th>
<th scope="col">P5</th>
<th scope="col">Median</th>
<th scope="col">P95</th>
</tr>
</thead>
<tbody>
{% for metric, summary in monte_carlo.metrics.items() %}
<tr>
<th scope="row">{{ metric | replace("_", " ") | title }}</th>
<td>{{ summary.mean | format_metric(metric, scenario.currency) }}</td>
<td>{{ summary.percentiles['5'] | format_metric(metric, scenario.currency) }}</td>
<td>{{ summary.percentiles['50'] | format_metric(metric, scenario.currency) }}</td>
<td>{{ summary.percentiles['95'] | format_metric(metric, scenario.currency) }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% if monte_carlo.notes %}
<ul class="note-list">
{% for note in monte_carlo.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %}
</div>
{% else %}
<p class="muted">Monte Carlo output is unavailable for this scenario.</p>
{% if monte_carlo and monte_carlo.notes %}
<ul class="note-list">
{% for note in monte_carlo.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %}
</article>
</div>
</section>
<section class="report-section">
<header class="section-header">
<h2>Deterministic Metrics</h2>
<p class="section-subtitle">
Key financial indicators calculated from deterministic cash flows.
</p>
</header>
<table class="metrics-table">
<tbody>
<tr>
<th scope="row">NPV</th>
<td>{{ metrics.npv | currency_display(scenario.currency) }}</td>
</tr>
<tr>
<th scope="row">IRR</th>
<td>{{ metrics.irr | percentage_display }}</td>
</tr>
<tr>
<th scope="row">Payback Period</th>
<td>{{ metrics.payback_period | period_display }}</td>
</tr>
</tbody>
</table>
{% if metrics.notes %}
<ul class="note-list">
{% for note in metrics.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %}
</section>
<section class="report-section">
<header class="section-header">
<h2>Monte Carlo Distribution</h2>
<p class="section-subtitle">
Simulation-driven distributions contextualize stochastic variability.
</p>
</header>
{% if monte_carlo and monte_carlo.available %}
<div id="distribution-chart" class="chart-container"></div>
<div class="simulation-summary">
<p>
Iterations: {{ monte_carlo.iterations }} · Percentiles: {{ percentiles |
join(", ") }}
</p>
<table class="metrics-table">
<thead>
<tr>
<th scope="col">Metric</th>
<th scope="col">Mean</th>
<th scope="col">P5</th>
<th scope="col">Median</th>
<th scope="col">P95</th>
</tr>
</thead>
<tbody>
{% for metric, summary in monte_carlo.metrics.items() %}
<tr>
<th scope="row">{{ metric | replace("_", " ") | title }}</th>
<td>{{ summary.mean | format_metric(metric, scenario.currency) }}</td>
<td>
{{ summary.percentiles['5'] | format_metric(metric,
scenario.currency) }}
</td>
<td>
{{ summary.percentiles['50'] | format_metric(metric,
scenario.currency) }}
</td>
<td>
{{ summary.percentiles['95'] | format_metric(metric,
scenario.currency) }}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% if monte_carlo.notes %}
<ul class="note-list">
{% for note in monte_carlo.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %}
</section>
</div>
{% else %}
<p class="muted">Monte Carlo output is unavailable for this scenario.</p>
{% if monte_carlo and monte_carlo.notes %}
<ul class="note-list">
{% for note in monte_carlo.notes %}
<li>{{ note }}</li>
{% endfor %}
</ul>
{% endif %} {% endif %}
</section>
{% endblock %} {% block scripts %}
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<script>
const chartData = {{ chart_data | safe }};
if (chartData && chartData.data) {
Plotly.newPlot('distribution-chart', chartData.data, chartData.layout);
}
</script>
{% endblock %}

41
templates/settings.html Normal file
View File

@@ -0,0 +1,41 @@
{% extends "base.html" %}
{% block title %}{{ title }} | CalMiner{% endblock %}
{% block content %}
<div class="page-header">
<div>
<h1>{{ title }}</h1>
<p class="page-subtitle">Configure application settings and preferences.</p>
</div>
</div>
<div class="settings-grid">
<div class="settings-card">
<h2>Theme Settings</h2>
<p>Customize the appearance and color scheme of the application.</p>
<div class="page-actions">
<a href="{{ request.url_for('ui.theme_settings') }}" class="button">Configure Themes</a>
</div>
</div>
<div class="settings-card">
<h2>Currency Management</h2>
<p>Manage currency settings and exchange rates.</p>
<div class="page-actions">
<a href="{{ request.url_for('ui.currencies') }}" class="button">Manage Currencies</a>
</div>
</div>
<div class="settings-card">
<h2>User Preferences</h2>
<p>Configure personal preferences and defaults.</p>
<p class="settings-card-note">Coming soon</p>
</div>
<div class="settings-card">
<h2>System Configuration</h2>
<p>Advanced system settings and maintenance options.</p>
<p class="settings-card-note">Coming soon</p>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,16 @@
{% extends "base.html" %}
{% block title %}{{ title }} | CalMiner{% endblock %}
{% block content %}
{% include "partials/reports_header.html" %}
<section class="report-overview">
<div class="report-grid">
<article class="report-card">
<h2>Simulation Dashboard</h2>
<p class="muted">Run and monitor Monte Carlo simulations across scenarios.</p>
<p class="muted">This feature is coming soon.</p>
</article>
</div>
</section>
{% endblock %}

View File

@@ -0,0 +1,31 @@
{% extends "base.html" %}
{% block title %}{{ title }} | CalMiner{% endblock %}
{% block content %}
<div class="page-header">
<div>
<h1>{{ title }}</h1>
<p class="page-subtitle">Customize the visual appearance of the application.</p>
</div>
</div>
<div class="settings-grid">
<div class="settings-card">
<h2>Color Theme</h2>
<p>Select your preferred color scheme.</p>
<p class="settings-card-note">Theme customization coming soon</p>
</div>
<div class="settings-card">
<h2>Layout Options</h2>
<p>Configure sidebar and navigation preferences.</p>
<p class="settings-card-note">Layout options coming soon</p>
</div>
<div class="settings-card">
<h2>Accessibility</h2>
<p>Adjust settings for better accessibility.</p>
<p class="settings-card-note">Accessibility settings coming soon</p>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,316 @@
from __future__ import annotations
from dataclasses import dataclass, field
from decimal import Decimal
import re
from types import SimpleNamespace
from typing import Any, Dict, Iterable, Tuple
import pytest
from scripts import init_db
@pytest.fixture(autouse=True)
def clear_seed_admin_env(monkeypatch: pytest.MonkeyPatch) -> None:
"""Remove environment overrides so defaults are deterministic during tests."""
for name in (
"CALMINER_SEED_ADMIN_EMAIL",
"CALMINER_SEED_ADMIN_USERNAME",
"CALMINER_SEED_ADMIN_PASSWORD",
):
monkeypatch.delenv(name, raising=False)
@dataclass
class FakeState:
enums: set[str] = field(default_factory=set)
tables: set[str] = field(default_factory=set)
roles: dict[int, Dict[str, Any]] = field(default_factory=dict)
users: dict[str, Dict[str, Any]] = field(default_factory=dict)
user_roles: set[Tuple[int, int]] = field(default_factory=set)
pricing_settings: dict[str, Dict[str, Any]] = field(default_factory=dict)
projects: dict[str, Dict[str, Any]] = field(default_factory=dict)
scenarios: dict[Tuple[int, str], Dict[str, Any]
] = field(default_factory=dict)
financial_inputs: dict[Tuple[int, str],
Dict[str, Any]] = field(default_factory=dict)
sequences: Dict[str, int] = field(default_factory=lambda: {
"users": 0,
"projects": 0,
"scenarios": 0,
"financial_inputs": 0,
})
class FakeResult:
def __init__(self, rows: Iterable[Any]) -> None:
self._rows = list(rows)
def fetchone(self) -> Any | None:
return self._rows[0] if self._rows else None
class FakeConnection:
def __init__(self, state: FakeState) -> None:
self.state = state
def execute(self, statement: Any, params: dict[str, Any] | None = None) -> FakeResult:
params = params or {}
sql = str(statement).strip()
lower_sql = sql.lower()
if lower_sql.startswith("do $$"):
match = re.search(r"create type\s+(\w+)\s+as enum", lower_sql)
if match:
self.state.enums.add(match.group(1))
return FakeResult([])
if lower_sql.startswith("create table if not exists"):
match = re.search(r"create table if not exists\s+(\w+)", lower_sql)
if match:
self.state.tables.add(match.group(1))
return FakeResult([])
if lower_sql.startswith("insert into roles"):
role_id = params["id"]
record = self.state.roles.get(role_id, {"id": role_id})
record.update(
name=params["name"],
display_name=params["display_name"],
description=params.get("description"),
)
self.state.roles[role_id] = record
return FakeResult([])
if lower_sql.startswith("insert into users"):
username = params["username"]
record = self.state.users.get(username)
if record is None:
self.state.sequences["users"] += 1
record = {
"id": self.state.sequences["users"], "username": username}
record.update(
email=params["email"],
password_hash=params["password_hash"],
is_active=params["is_active"],
is_superuser=params["is_superuser"],
)
self.state.users[username] = record
return FakeResult([])
if lower_sql.startswith("select id from users where username"):
username = params["username"]
record = self.state.users.get(username)
rows = [SimpleNamespace(id=record["id"])] if record else []
return FakeResult(rows)
if lower_sql.startswith("insert into user_roles"):
key = (int(params["user_id"]), int(params["role_id"]))
self.state.user_roles.add(key)
return FakeResult([])
if lower_sql.startswith("insert into pricing_settings"):
slug = params["slug"]
record = self.state.pricing_settings.get(slug, {"slug": slug})
record.update(
name=params["name"],
description=params.get("description"),
default_currency=params.get("default_currency"),
default_payable_pct=float(params["default_payable_pct"]),
moisture_threshold_pct=float(params["moisture_threshold_pct"]),
moisture_penalty_per_pct=float(
params["moisture_penalty_per_pct"]),
)
self.state.pricing_settings[slug] = record
return FakeResult([])
if lower_sql.startswith("insert into projects"):
name = params["name"]
record = self.state.projects.get(name)
if record is None:
self.state.sequences["projects"] += 1
record = {"id": self.state.sequences["projects"], "name": name}
record.update(
location=params.get("location"),
operation_type=params["operation_type"],
description=params.get("description"),
pricing_settings_id=params.get("pricing_settings_id"),
)
self.state.projects[name] = record
return FakeResult([])
if lower_sql.startswith("select id from projects where name"):
project = self.state.projects.get(params["name"])
rows = [SimpleNamespace(id=project["id"])] if project else []
return FakeResult(rows)
if lower_sql.startswith("insert into scenarios"):
key = (int(params["project_id"]), params["name"])
record = self.state.scenarios.get(key)
if record is None:
self.state.sequences["scenarios"] += 1
record = {
"id": self.state.sequences["scenarios"],
"project_id": int(params["project_id"]),
"name": params["name"],
}
record.update(
description=params.get("description"),
status=params.get("status"),
discount_rate=params.get("discount_rate"),
currency=params.get("currency"),
primary_resource=params.get("primary_resource"),
)
self.state.scenarios[key] = record
return FakeResult([])
if lower_sql.startswith("select id from scenarios where project_id"):
key = (int(params["project_id"]), params["name"])
scenario = self.state.scenarios.get(key)
rows = [SimpleNamespace(id=scenario["id"])] if scenario else []
return FakeResult(rows)
if lower_sql.startswith("insert into financial_inputs"):
key = (int(params["scenario_id"]), params["name"])
record = self.state.financial_inputs.get(key)
if record is None:
self.state.sequences["financial_inputs"] += 1
record = {
"id": self.state.sequences["financial_inputs"],
"scenario_id": int(params["scenario_id"]),
"name": params["name"],
}
amount = params["amount"]
if not isinstance(amount, Decimal):
amount = Decimal(str(amount))
record.update(
category=params["category"],
cost_bucket=params.get("cost_bucket"),
amount=amount,
currency=params.get("currency"),
notes=params.get("notes"),
)
self.state.financial_inputs[key] = record
return FakeResult([])
if "from pg_enum" in lower_sql and "enumlabel" in lower_sql:
type_name_param = params.get("type_name")
if type_name_param is None:
return FakeResult([])
type_name = str(type_name_param)
values = init_db.ENUM_DEFINITIONS.get(type_name, [])
rows = [SimpleNamespace(enumlabel=value) for value in values]
return FakeResult(rows)
if lower_sql.startswith("alter type") and "rename value" in lower_sql:
return FakeResult([])
raise NotImplementedError(
f"Unhandled SQL during test execution: {sql}")
class FakeTransaction:
def __init__(self, state: FakeState) -> None:
self.state = state
def __enter__(self) -> FakeConnection: # noqa: D401 - simple context helper
return FakeConnection(self.state)
def __exit__(self, exc_type, exc, tb) -> bool:
return False
class FakeEngine:
def __init__(self) -> None:
self.state = FakeState()
self.begin_calls = 0
def begin(self) -> FakeTransaction: # noqa: D401 - simple context helper
self.begin_calls += 1
return FakeTransaction(self.state)
@pytest.fixture()
def fake_engine(monkeypatch: pytest.MonkeyPatch) -> FakeEngine:
engine = FakeEngine()
def _fake_create_engine(database_url: str | None = None) -> FakeEngine: # noqa: ARG001 - signature parity
return engine
monkeypatch.setattr(init_db, "_create_engine", _fake_create_engine)
return engine
def test_init_db_seeds_demo_data_idempotently(fake_engine: FakeEngine) -> None:
init_db.init_db(database_url="postgresql://fake")
state = fake_engine.state
expected_enum_names = set(init_db.ENUM_DEFINITIONS.keys())
assert state.enums == expected_enum_names
expected_role_ids = {role["id"] for role in init_db.DEFAULT_ROLES}
assert set(state.roles.keys()) == expected_role_ids
assert "admin" in state.users
admin_record = state.users["admin"]
assert admin_record["email"] == init_db.DEFAULT_ADMIN["email"]
assert state.user_roles == {(admin_record["id"], 1)}
assert set(state.pricing_settings.keys()) == {
init_db.DEFAULT_PRICING["slug"]}
expected_project_names = {
project.name for project in init_db.DEFAULT_PROJECTS}
assert set(state.projects.keys()) == expected_project_names
assert len(state.scenarios) == len(init_db.DEFAULT_SCENARIOS)
assert len(state.financial_inputs) == len(init_db.DEFAULT_FINANCIAL_INPUTS)
snapshot = {
"projects": {name: data.copy() for name, data in state.projects.items()},
"scenario_keys": set(state.scenarios.keys()),
"financial_keys": set(state.financial_inputs.keys()),
"user_roles": set(state.user_roles),
"admin_id": admin_record["id"],
}
init_db.init_db(database_url="postgresql://fake")
state_after = fake_engine.state
assert set(state_after.roles.keys()) == expected_role_ids
assert len(state_after.users) == 1
assert state_after.users["admin"]["id"] == snapshot["admin_id"]
assert set(state_after.projects.keys()) == set(snapshot["projects"].keys())
assert set(state_after.scenarios.keys()) == snapshot["scenario_keys"]
assert set(state_after.financial_inputs.keys()
) == snapshot["financial_keys"]
assert state_after.user_roles == snapshot["user_roles"]
def test_enum_seed_values_align_with_definitions() -> None:
ddl_blob = " ".join(init_db.TABLE_DDLS).lower()
for enum_name, values in init_db.ENUM_DEFINITIONS.items():
assert enum_name in ddl_blob
if enum_name == "miningoperationtype":
for project in init_db.DEFAULT_PROJECTS:
assert project.operation_type in values
if enum_name == "scenariostatus":
for scenario in init_db.DEFAULT_SCENARIOS:
assert scenario.status in values
if enum_name == "resourcetype":
for scenario in init_db.DEFAULT_SCENARIOS:
if scenario.primary_resource is not None:
assert scenario.primary_resource in values
if enum_name == "financialcategory":
for item in init_db.DEFAULT_FINANCIAL_INPUTS:
assert item.category in values
if enum_name == "costbucket":
for item in init_db.DEFAULT_FINANCIAL_INPUTS:
if item.cost_bucket is not None:
assert item.cost_bucket in values
if enum_name == "distributiontype":
# Simulation parameters reference this type in the schema.
assert "distributiontype" in ddl_blob
if enum_name == "stochasticvariable":
assert "stochasticvariable" in ddl_blob

View File

@@ -14,7 +14,6 @@ from models import Role, User, UserRole
from dependencies import get_auth_session, require_current_user
from services.security import hash_password
from services.session import AuthSession, SessionTokens
from tests.conftest import app
from tests.utils.security import random_password, random_token
COOKIE_SOURCE = "cookie"

View File

@@ -0,0 +1,30 @@
from fastapi.testclient import TestClient
from main import app
def test_login_form_post_does_not_trigger_json_error():
"""POST form-encoded data to /login and assert middleware doesn't return
the JSON "Invalid JSON payload" error which indicates the middleware
attempted to parse non-JSON bodies.
"""
client = TestClient(app)
resp = client.post(
"/login",
data={"username": "no-such-user", "password": "x"},
headers={"Accept": "text/html"},
)
content_type = resp.headers.get("content-type", "")
# If middleware raised the JSON error we'd get an application/json body
# with detail == "Invalid JSON payload". Ensure that's not the case.
if content_type.startswith("application/json"):
body = resp.json()
assert body.get("detail") != "Invalid JSON payload", (
"Middleware attempted to parse non-JSON body as JSON and failed"
)
# At minimum the endpoint should not error with the JSON payload message.
assert True

View File

@@ -0,0 +1,22 @@
from sqlalchemy import Enum as SQLEnum
from models.enums import (
MiningOperationType,
ScenarioStatus,
FinancialCategory,
DistributionType,
)
def test_enum_members_and_sql_names():
# Verify enum members exist
assert MiningOperationType.OTHER.value == "other"
assert ScenarioStatus.DRAFT.value == "draft"
assert FinancialCategory.REVENUE.value == "revenue"
assert DistributionType.NORMAL.value == "normal"
# Verify SQLAlchemy SQLEnum name mapping is consistent
assert SQLEnum(MiningOperationType, name="miningoperationtype").name == "miningoperationtype"
assert SQLEnum(ScenarioStatus, name="scenariostatus").name == "scenariostatus"
assert SQLEnum(FinancialCategory, name="financialcategory").name == "financialcategory"
assert SQLEnum(DistributionType, name="distributiontype").name == "distributiontype"