This commit is contained in:
2025-11-09 16:49:27 +01:00
parent 22ddfb671d
commit d807a50f77
96 changed files with 3 additions and 9689 deletions

View File

@@ -1,79 +0,0 @@
from statistics import mean, median, pstdev
from typing import Any, Dict, Iterable, List, Mapping, Union, cast
def _extract_results(simulation_results: Iterable[object]) -> List[float]:
values: List[float] = []
for item in simulation_results:
if not isinstance(item, Mapping):
continue
mapping_item = cast(Mapping[str, Any], item)
value = mapping_item.get("result")
if isinstance(value, (int, float)):
values.append(float(value))
return values
def _percentile(values: List[float], percentile: float) -> float:
if not values:
return 0.0
sorted_values = sorted(values)
if len(sorted_values) == 1:
return sorted_values[0]
index = (percentile / 100) * (len(sorted_values) - 1)
lower = int(index)
upper = min(lower + 1, len(sorted_values) - 1)
weight = index - lower
return sorted_values[lower] * (1 - weight) + sorted_values[upper] * weight
def generate_report(
simulation_results: List[Dict[str, float]],
) -> Dict[str, Union[float, int]]:
"""Aggregate basic statistics for simulation outputs."""
values = _extract_results(simulation_results)
if not values:
return {
"count": 0,
"mean": 0.0,
"median": 0.0,
"min": 0.0,
"max": 0.0,
"std_dev": 0.0,
"variance": 0.0,
"percentile_10": 0.0,
"percentile_90": 0.0,
"percentile_5": 0.0,
"percentile_95": 0.0,
"value_at_risk_95": 0.0,
"expected_shortfall_95": 0.0,
}
summary: Dict[str, Union[float, int]] = {
"count": len(values),
"mean": mean(values),
"median": median(values),
"min": min(values),
"max": max(values),
"percentile_10": _percentile(values, 10),
"percentile_90": _percentile(values, 90),
"percentile_5": _percentile(values, 5),
"percentile_95": _percentile(values, 95),
}
std_dev = pstdev(values) if len(values) > 1 else 0.0
summary["std_dev"] = std_dev
summary["variance"] = std_dev**2
var_95 = summary["percentile_5"]
summary["value_at_risk_95"] = var_95
tail_values = [value for value in values if value <= var_95]
if tail_values:
summary["expected_shortfall_95"] = mean(tail_values)
else:
summary["expected_shortfall_95"] = var_95
return summary

View File

@@ -1,59 +0,0 @@
from datetime import datetime, timedelta
from typing import Any, Union
from fastapi import HTTPException, status, Depends
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from config.database import get_db
ACCESS_TOKEN_EXPIRE_MINUTES = 30
SECRET_KEY = "your-secret-key" # Change this in production
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="users/login")
def create_access_token(
subject: Union[str, Any], expires_delta: Union[timedelta, None] = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
from models.user import User
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
return user

View File

@@ -1,230 +0,0 @@
from __future__ import annotations
import os
import re
from typing import Dict, Mapping
from sqlalchemy.orm import Session
from models.application_setting import ApplicationSetting
from models.theme_setting import ThemeSetting # Import ThemeSetting model
CSS_COLOR_CATEGORY = "theme"
CSS_COLOR_VALUE_TYPE = "color"
CSS_ENV_PREFIX = "CALMINER_THEME_"
CSS_COLOR_DEFAULTS: Dict[str, str] = {
"--color-background": "#f4f5f7",
"--color-surface": "#ffffff",
"--color-text-primary": "#2a1f33",
"--color-text-secondary": "#624769",
"--color-text-muted": "#64748b",
"--color-text-subtle": "#94a3b8",
"--color-text-invert": "#ffffff",
"--color-text-dark": "#0f172a",
"--color-text-strong": "#111827",
"--color-primary": "#5f320d",
"--color-primary-strong": "#7e4c13",
"--color-primary-stronger": "#837c15",
"--color-accent": "#bff838",
"--color-border": "#e2e8f0",
"--color-border-strong": "#cbd5e1",
"--color-highlight": "#eef2ff",
"--color-panel-shadow": "rgba(15, 23, 42, 0.08)",
"--color-panel-shadow-deep": "rgba(15, 23, 42, 0.12)",
"--color-surface-alt": "#f8fafc",
"--color-success": "#047857",
"--color-error": "#b91c1c",
}
_COLOR_VALUE_PATTERN = re.compile(
r"^(#([0-9a-fA-F]{3}|[0-9a-fA-F]{6}|[0-9a-fA-F]{8})|rgba?\([^)]+\)|hsla?\([^)]+\))$",
re.IGNORECASE,
)
def ensure_css_color_settings(db: Session) -> Dict[str, ApplicationSetting]:
"""Ensure the CSS color defaults exist in the settings table."""
existing = (
db.query(ApplicationSetting)
.filter(ApplicationSetting.key.in_(CSS_COLOR_DEFAULTS.keys()))
.all()
)
by_key = {setting.key: setting for setting in existing}
created = False
for key, default_value in CSS_COLOR_DEFAULTS.items():
if key in by_key:
continue
setting = ApplicationSetting(
key=key,
value=default_value,
value_type=CSS_COLOR_VALUE_TYPE,
category=CSS_COLOR_CATEGORY,
description=f"CSS variable {key}",
is_editable=True,
)
db.add(setting)
by_key[key] = setting
created = True
if created:
db.commit()
for key, setting in by_key.items():
db.refresh(setting)
return by_key
def get_css_color_settings(db: Session) -> Dict[str, str]:
"""Return CSS color variables, filling missing values with defaults."""
settings = ensure_css_color_settings(db)
values: Dict[str, str] = {
key: settings[key].value if key in settings else default
for key, default in CSS_COLOR_DEFAULTS.items()
}
env_overrides = read_css_color_env_overrides(os.environ)
if env_overrides:
values.update(env_overrides)
return values
def update_css_color_settings(
db: Session, updates: Mapping[str, str]
) -> Dict[str, str]:
"""Persist provided CSS color overrides and return the final values."""
if not updates:
return get_css_color_settings(db)
invalid_keys = sorted(set(updates.keys()) - set(CSS_COLOR_DEFAULTS.keys()))
if invalid_keys:
invalid_list = ", ".join(invalid_keys)
raise ValueError(f"Unsupported CSS variables: {invalid_list}")
normalized: Dict[str, str] = {}
for key, value in updates.items():
normalized[key] = _normalize_color_value(value)
settings = ensure_css_color_settings(db)
changed = False
for key, value in normalized.items():
setting = settings[key]
if setting.value != value:
setting.value = value
changed = True
if setting.value_type != CSS_COLOR_VALUE_TYPE:
setting.value_type = CSS_COLOR_VALUE_TYPE
changed = True
if setting.category != CSS_COLOR_CATEGORY:
setting.category = CSS_COLOR_CATEGORY
changed = True
if not setting.is_editable:
setting.is_editable = True
changed = True
if changed:
db.commit()
for key in normalized.keys():
db.refresh(settings[key])
return get_css_color_settings(db)
def read_css_color_env_overrides(
env: Mapping[str, str] | None = None,
) -> Dict[str, str]:
"""Return validated CSS overrides sourced from environment variables."""
if env is None:
env = os.environ
overrides: Dict[str, str] = {}
for css_key in CSS_COLOR_DEFAULTS.keys():
env_name = css_key_to_env_var(css_key)
raw_value = env.get(env_name)
if raw_value is None:
continue
overrides[css_key] = _normalize_color_value(raw_value)
return overrides
def _normalize_color_value(value: str) -> str:
if not isinstance(value, str):
raise ValueError("Color value must be a string")
trimmed = value.strip()
if not trimmed:
raise ValueError("Color value cannot be empty")
if not _COLOR_VALUE_PATTERN.match(trimmed):
raise ValueError(
"Color value must be a hex code or an rgb/rgba/hsl/hsla expression"
)
_validate_functional_color(trimmed)
return trimmed
def _validate_functional_color(value: str) -> None:
lowered = value.lower()
if lowered.startswith("rgb(") or lowered.startswith("hsl("):
_ensure_component_count(value, expected=3)
elif lowered.startswith("rgba(") or lowered.startswith("hsla("):
_ensure_component_count(value, expected=4)
def _ensure_component_count(value: str, expected: int) -> None:
if not value.endswith(")"):
raise ValueError(
"Color function expressions must end with a closing parenthesis"
)
inner = value[value.index("(") + 1: -1]
parts = [segment.strip() for segment in inner.split(",")]
if len(parts) != expected:
raise ValueError(
"Color function expressions must provide the expected number of components"
)
if any(not component for component in parts):
raise ValueError("Color function components cannot be empty")
def css_key_to_env_var(css_key: str) -> str:
sanitized = css_key.lstrip("-").replace("-", "_").upper()
return f"{CSS_ENV_PREFIX}{sanitized}"
def list_css_env_override_rows(
env: Mapping[str, str] | None = None,
) -> list[Dict[str, str]]:
overrides = read_css_color_env_overrides(env)
rows: list[Dict[str, str]] = []
for css_key, value in overrides.items():
rows.append(
{
"css_key": css_key,
"env_var": css_key_to_env_var(css_key),
"value": value,
}
)
return rows
def save_theme_settings(db: Session, theme_data: dict):
theme = db.query(ThemeSetting).first() or ThemeSetting()
for key, value in theme_data.items():
setattr(theme, key, value)
db.add(theme)
db.commit()
db.refresh(theme)
return theme
def get_theme_settings(db: Session):
theme = db.query(ThemeSetting).first()
if theme:
return {c.name: getattr(theme, c.name) for c in theme.__table__.columns}
return {}

View File

@@ -1,144 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from random import Random
from typing import Dict, List, Literal, Optional, Sequence
DEFAULT_STD_DEV_RATIO = 0.1
DEFAULT_UNIFORM_SPAN_RATIO = 0.15
DistributionType = Literal["normal", "uniform", "triangular"]
@dataclass
class SimulationParameter:
name: str
base_value: float
distribution: DistributionType
std_dev: Optional[float] = None
minimum: Optional[float] = None
maximum: Optional[float] = None
mode: Optional[float] = None
def _ensure_positive_span(span: float, fallback: float) -> float:
return span if span and span > 0 else fallback
def _compile_parameters(
parameters: Sequence[Dict[str, float]],
) -> List[SimulationParameter]:
compiled: List[SimulationParameter] = []
for index, item in enumerate(parameters):
if "value" not in item:
raise ValueError(f"Parameter at index {index} must include 'value'")
name = str(item.get("name", f"param_{index}"))
base_value = float(item["value"])
distribution = str(item.get("distribution", "normal")).lower()
if distribution not in {"normal", "uniform", "triangular"}:
raise ValueError(
f"Parameter '{name}' has unsupported distribution '{distribution}'"
)
span_default = abs(base_value) * DEFAULT_UNIFORM_SPAN_RATIO or 1.0
if distribution == "normal":
std_dev = item.get("std_dev")
std_dev_value = (
float(std_dev)
if std_dev is not None
else abs(base_value) * DEFAULT_STD_DEV_RATIO or 1.0
)
compiled.append(
SimulationParameter(
name=name,
base_value=base_value,
distribution="normal",
std_dev=_ensure_positive_span(std_dev_value, 1.0),
)
)
continue
minimum = item.get("min")
maximum = item.get("max")
if minimum is None or maximum is None:
minimum = base_value - span_default
maximum = base_value + span_default
minimum = float(minimum)
maximum = float(maximum)
if minimum >= maximum:
raise ValueError(
f"Parameter '{name}' requires 'min' < 'max' for {distribution} distribution"
)
if distribution == "uniform":
compiled.append(
SimulationParameter(
name=name,
base_value=base_value,
distribution="uniform",
minimum=minimum,
maximum=maximum,
)
)
else: # triangular
mode = item.get("mode")
if mode is None:
mode = base_value
mode_value = float(mode)
if not (minimum <= mode_value <= maximum):
raise ValueError(
f"Parameter '{name}' mode must be within min/max bounds for triangular distribution"
)
compiled.append(
SimulationParameter(
name=name,
base_value=base_value,
distribution="triangular",
minimum=minimum,
maximum=maximum,
mode=mode_value,
)
)
return compiled
def _sample_parameter(rng: Random, param: SimulationParameter) -> float:
if param.distribution == "normal":
assert param.std_dev is not None
return rng.normalvariate(param.base_value, param.std_dev)
if param.distribution == "uniform":
assert param.minimum is not None and param.maximum is not None
return rng.uniform(param.minimum, param.maximum)
# triangular
assert (
param.minimum is not None
and param.maximum is not None
and param.mode is not None
)
return rng.triangular(param.minimum, param.maximum, param.mode)
def run_simulation(
parameters: Sequence[Dict[str, float]],
iterations: int = 1000,
seed: Optional[int] = None,
) -> List[Dict[str, float]]:
"""Run a lightweight Monte Carlo simulation using configurable distributions."""
if iterations <= 0:
return []
compiled_params = _compile_parameters(parameters)
if not compiled_params:
return []
rng = Random(seed)
results: List[Dict[str, float]] = []
for iteration in range(1, iterations + 1):
total = 0.0
for param in compiled_params:
sample = _sample_parameter(rng, param)
total += sample
results.append({"iteration": iteration, "result": total})
return results