From 6b5973a0bb8273b0e232788d383190328c692691 Mon Sep 17 00:00:00 2001 From: zwitschi Date: Tue, 2 Jun 2026 15:11:56 +0200 Subject: [PATCH] feat: implement configuration management with database support --- src/arbitrade/api/app.py | 2 + src/arbitrade/config/service.py | 166 ++++++++ src/arbitrade/storage/db.py | 47 +++ src/arbitrade/storage/repositories.py | 541 ++++++++++++++++++++++++++ 4 files changed, 756 insertions(+) create mode 100644 src/arbitrade/config/service.py diff --git a/src/arbitrade/api/app.py b/src/arbitrade/api/app.py index d7dd11e..bd2cbe8 100644 --- a/src/arbitrade/api/app.py +++ b/src/arbitrade/api/app.py @@ -9,6 +9,7 @@ from arbitrade.alerting.notifier import build_notifier_from_settings from arbitrade.api.control_state import DashboardControlState from arbitrade.api.routes import public_router, router from arbitrade.config.settings import Settings +from arbitrade.config.service import ConfigurationService from arbitrade.logging_setup import configure_logging from arbitrade.metrics import MetricsCalculator from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state @@ -35,6 +36,7 @@ def create_app(settings: Settings) -> FastAPI: app.state.audit_repository = AuditRepository(db) app.state.runtime_state_repository = RuntimeStateRepository(db) app.state.alert_notifier = build_notifier_from_settings(settings) + app.state.configuration_service = ConfigurationService(settings, db, AuditRepository(db)) app.state.backtest_recent_reports = [] app.state.dashboard_controls = DashboardControlState( is_running=not settings.kill_switch_active, diff --git a/src/arbitrade/config/service.py b/src/arbitrade/config/service.py new file mode 100644 index 0000000..7c6ab3e --- /dev/null +++ b/src/arbitrade/config/service.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, cast + +import orjson +from pydantic import BaseModel, Field + +from arbitrade.config.settings import Settings +from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.repositories import AuditRepository, ConfigSettingRepository + + +class ConfigSection(BaseModel): + id: int | None = None + name: str + description: str | None = None + updated_at: datetime | None = None + + +class ConfigSetting(BaseModel): + key: str + section: str + value_json: str + value_type: str + is_secret: bool = False + is_runtime_reloadable: bool = False + updated_at: datetime | None = None + updated_by: str | None = None + + +class ConfigPairing(BaseModel): + id: int | None = None + base_asset: str + quote_asset: str + enabled: bool = True + source: str + created_at: datetime | None = None + updated_at: datetime | None = None + + +class ConfigPairFee(BaseModel): + pairing_id: int + market_type: str # 'crypto_crypto' or 'crypto_fiat' + maker_fee_rate: float + taker_fee_rate: float + updated_at: datetime | None = None + + +class ConfigBacktestingDefaults(BaseModel): + starting_balances: dict[str, float] | None = None + trade_capital: float | None = None + min_profit_threshold: float | None = None + slippage_bps: int | None = None + execution_latency_ms: int | None = None + + +class ConfigurationService: + """Manages application configuration from environment and database sources.""" + + def __init__(self, settings: Settings, store: DuckDBStore, audit_repo: AuditRepository) -> None: + self._settings = settings + self._store = store + self._audit_repo = audit_repo + self._config_version = 0 + self._loaded_settings: dict[str, Any] = {} + self._setting_repo = ConfigSettingRepository(store) + self._load_database_settings() + + def _load_database_settings(self) -> None: + """Load user settings from database and merge with defaults.""" + # Load all settings from database + db_settings = self._setting_repo.list_settings() + + # Convert to dictionary for easy access + for setting in db_settings: + # Parse JSON value based on type + if setting.value_type == "str": + parsed_value = setting.value_json + elif setting.value_type == "int": + parsed_value = int(setting.value_json) + elif setting.value_type == "float": + parsed_value = float(setting.value_json) + elif setting.value_type == "bool": + parsed_value = setting.value_json.lower() == "true" + elif setting.value_type == "list": + parsed_value = orjson.loads(setting.value_json) + elif setting.value_type == "dict": + parsed_value = orjson.loads(setting.value_json) + else: + parsed_value = setting.value_json + + self._loaded_settings[setting.key] = parsed_value + + # Merge with environment settings (environment takes precedence) + # This is a simplified approach - in reality we'd want to merge properly + # but for now we'll just load DB settings and let environment override them + + # Initialize with default values from settings model + self._initialize_default_settings() + + def _initialize_default_settings(self) -> None: + """Initialize default settings from the Settings model.""" + # This is a placeholder - in a real implementation we'd map + # the Settings model fields to config keys + pass + + def get_setting(self, key: str, default: Any = None) -> Any: + """Get a configuration setting value.""" + return self._loaded_settings.get(key, default) + + def set_setting(self, key: str, value: Any, updated_by: str | None = None) -> None: + """Set a configuration setting value and persist to database.""" + # Convert value to JSON string and determine type + if isinstance(value, str): + value_json = value + value_type = "str" + elif isinstance(value, int): + value_json = str(value) + value_type = "int" + elif isinstance(value, float): + value_json = str(value) + value_type = "float" + elif isinstance(value, bool): + value_json = str(value).lower() + value_type = "bool" + elif isinstance(value, list): + value_json = orjson.dumps(value).decode('utf-8') + value_type = "list" + elif isinstance(value, dict): + value_json = orjson.dumps(value).decode('utf-8') + value_type = "dict" + else: + value_json = str(value) + value_type = "str" + + # Create or update setting + setting = ConfigSetting( + key=key, + section="general", # Default section + value_json=value_json, + value_type=value_type, + is_secret=False, + is_runtime_reloadable=False, + updated_by=updated_by + ) + + # Check if setting exists + existing_setting = self._setting_repo.get_setting(key) + if existing_setting: + # Update existing setting + updated_setting = self._setting_repo.update_setting(key, setting) + else: + # Create new setting + updated_setting = self._setting_repo.create_setting(setting) + + # Update in-memory cache + self._loaded_settings[key] = value + + # Increment version for hot reloading + self._config_version += 1 + + def get_all_settings(self) -> dict[str, Any]: + """Get all configuration settings.""" + return self._loaded_settings.copy() diff --git a/src/arbitrade/storage/db.py b/src/arbitrade/storage/db.py index 63ce1bf..e59b314 100644 --- a/src/arbitrade/storage/db.py +++ b/src/arbitrade/storage/db.py @@ -17,6 +17,53 @@ CREATE TABLE IF NOT EXISTS schema_migrations ( applied_at TIMESTAMP DEFAULT current_timestamp ); +CREATE TABLE IF NOT EXISTS config_sections ( + id INTEGER PRIMARY KEY, + name VARCHAR UNIQUE NOT NULL, + description TEXT, + updated_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS config_settings ( + key VARCHAR PRIMARY KEY, + section VARCHAR NOT NULL, + value_json TEXT NOT NULL, + value_type VARCHAR NOT NULL, + is_secret BOOLEAN DEFAULT FALSE, + is_runtime_reloadable BOOLEAN DEFAULT FALSE, + updated_at TIMESTAMP DEFAULT current_timestamp, + updated_by VARCHAR +); + +CREATE TABLE IF NOT EXISTS config_pairings ( + id INTEGER PRIMARY KEY, + base_asset VARCHAR NOT NULL, + quote_asset VARCHAR NOT NULL, + enabled BOOLEAN DEFAULT TRUE, + source VARCHAR NOT NULL, + created_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP DEFAULT current_timestamp, + UNIQUE(base_asset, quote_asset) +); + +CREATE TABLE IF NOT EXISTS config_pair_fees ( + pairing_id INTEGER NOT NULL, + market_type VARCHAR NOT NULL, -- 'crypto_crypto' or 'crypto_fiat' + maker_fee_rate DOUBLE NOT NULL, + taker_fee_rate DOUBLE NOT NULL, + updated_at TIMESTAMP DEFAULT current_timestamp, + FOREIGN KEY (pairing_id) REFERENCES config_pairings(id) +); + +CREATE TABLE IF NOT EXISTS config_backtesting_defaults ( + id INTEGER PRIMARY KEY, + starting_balances JSON, + trade_capital DOUBLE, + min_profit_threshold DOUBLE, + slippage_bps INTEGER, + execution_latency_ms INTEGER +); + CREATE TABLE IF NOT EXISTS opportunities ( id UUID DEFAULT uuid(), detected_at TIMESTAMP NOT NULL, diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index 5977f61..7f64805 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -6,6 +6,7 @@ from typing import Any import orjson +from arbitrade.config.service import ConfigBacktestingDefaults, ConfigPairing, ConfigSection, ConfigSetting, ConfigPairFee from arbitrade.storage.db import DuckDBStore @@ -376,3 +377,543 @@ class RuntimeStateRepository: last_known_balances=balances, note=str(row[6]) if row[6] is not None else None, ) + + +# Configuration repository classes +class ConfigSectionRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def create_section(self, section: ConfigSection) -> ConfigSection: + """Create a new configuration section.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + INSERT INTO config_sections (name, description) + VALUES (?, ?) + RETURNING id, name, description, updated_at + """, + (section.name, section.description), + ) + row = cursor.fetchone() + if row: + return ConfigSection( + id=row[0], + name=row[1], + description=row[2], + updated_at=row[3] + ) + raise ValueError("Failed to create section") + + def get_section(self, name: str) -> ConfigSection | None: + """Get a configuration section by name.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + SELECT id, name, description, updated_at + FROM config_sections + WHERE name = ? + """, + (name,), + ) + row = cursor.fetchone() + if row: + return ConfigSection( + id=row[0], + name=row[1], + description=row[2], + updated_at=row[3] + ) + return None + + def list_sections(self) -> list[ConfigSection]: + """List all configuration sections.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + SELECT id, name, description, updated_at + FROM config_sections + ORDER BY name + """ + ) + return [ + ConfigSection( + id=row[0], + name=row[1], + description=row[2], + updated_at=row[3] + ) + for row in cursor.fetchall() + ] + + +class ConfigSettingRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def create_setting(self, setting: ConfigSetting) -> ConfigSetting: + """Create a new configuration setting.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + INSERT INTO config_settings (key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_by) + VALUES (?, ?, ?, ?, ?, ?, ?) + RETURNING key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by + """, + ( + setting.key, + setting.section, + setting.value_json, + setting.value_type, + setting.is_secret, + setting.is_runtime_reloadable, + setting.updated_by, + ), + ) + row = cursor.fetchone() + if row: + return ConfigSetting( + key=row[0], + section=row[1], + value_json=row[2], + value_type=row[3], + is_secret=bool(row[4]), + is_runtime_reloadable=bool(row[5]), + updated_at=row[6], + updated_by=row[7] + ) + raise ValueError("Failed to create setting") + + def get_setting(self, key: str) -> ConfigSetting | None: + """Get a configuration setting by key.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by + FROM config_settings + WHERE key = ? + """, + (key,), + ) + row = cursor.fetchone() + if row: + return ConfigSetting( + key=row[0], + section=row[1], + value_json=row[2], + value_type=row[3], + is_secret=bool(row[4]), + is_runtime_reloadable=bool(row[5]), + updated_at=row[6], + updated_by=row[7] + ) + return None + + def update_setting(self, key: str, setting: ConfigSetting) -> ConfigSetting: + """Update an existing configuration setting.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + UPDATE config_settings + SET section = ?, value_json = ?, value_type = ?, is_secret = ?, is_runtime_reloadable = ?, updated_by = ? + WHERE key = ? + RETURNING key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by + """, + ( + setting.section, + setting.value_json, + setting.value_type, + setting.is_secret, + setting.is_runtime_reloadable, + setting.updated_by, + key, + ), + ) + row = cursor.fetchone() + if row: + return ConfigSetting( + key=row[0], + section=row[1], + value_json=row[2], + value_type=row[3], + is_secret=bool(row[4]), + is_runtime_reloadable=bool(row[5]), + updated_at=row[6], + updated_by=row[7] + ) + raise ValueError("Failed to update setting") + + def delete_setting(self, key: str) -> bool: + """Delete a configuration setting.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + DELETE FROM config_settings + WHERE key = ? + """, + (key,), + ) + return cursor.rowcount > 0 + + def list_settings(self, section: str | None = None) -> list[ConfigSetting]: + """List all configuration settings, optionally filtered by section.""" + with self._store.connect() as conn: + if section: + cursor = conn.execute( + """ + SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by + FROM config_settings + WHERE section = ? + ORDER BY key + """, + (section,), + ) + else: + cursor = conn.execute( + """ + SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by + FROM config_settings + ORDER BY key + """ + ) + return [ + ConfigSetting( + key=row[0], + section=row[1], + value_json=row[2], + value_type=row[3], + is_secret=bool(row[4]), + is_runtime_reloadable=bool(row[5]), + updated_at=row[6], + updated_by=row[7] + ) + for row in cursor.fetchall() + ] + + +class ConfigPairingRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def create_pairing(self, pairing: ConfigPairing) -> ConfigPairing: + """Create a new currency pairing.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + INSERT INTO config_pairings (base_asset, quote_asset, enabled, source) + VALUES (?, ?, ?, ?) + RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at + """, + ( + pairing.base_asset, + pairing.quote_asset, + pairing.enabled, + pairing.source, + ), + ) + row = cursor.fetchone() + if row: + return ConfigPairing( + id=row[0], + base_asset=row[1], + quote_asset=row[2], + enabled=bool(row[3]), + source=row[4], + created_at=row[5], + updated_at=row[6] + ) + raise ValueError("Failed to create pairing") + + def get_pairing(self, base_asset: str, quote_asset: str) -> ConfigPairing | None: + """Get a currency pairing by base and quote assets.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at + FROM config_pairings + WHERE base_asset = ? AND quote_asset = ? + """, + (base_asset, quote_asset), + ) + row = cursor.fetchone() + if row: + return ConfigPairing( + id=row[0], + base_asset=row[1], + quote_asset=row[2], + enabled=bool(row[3]), + source=row[4], + created_at=row[5], + updated_at=row[6] + ) + return None + + def update_pairing(self, base_asset: str, quote_asset: str, pairing: ConfigPairing) -> ConfigPairing: + """Update an existing currency pairing.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + UPDATE config_pairings + SET enabled = ?, source = ? + WHERE base_asset = ? AND quote_asset = ? + RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at + """, + ( + pairing.enabled, + pairing.source, + base_asset, + quote_asset, + ), + ) + row = cursor.fetchone() + if row: + return ConfigPairing( + id=row[0], + base_asset=row[1], + quote_asset=row[2], + enabled=bool(row[3]), + source=row[4], + created_at=row[5], + updated_at=row[6] + ) + raise ValueError("Failed to update pairing") + + def delete_pairing(self, base_asset: str, quote_asset: str) -> bool: + """Delete a currency pairing.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + DELETE FROM config_pairings + WHERE base_asset = ? AND quote_asset = ? + """, + (base_asset, quote_asset), + ) + return cursor.rowcount > 0 + + def list_pairings(self) -> list[ConfigPairing]: + """List all currency pairings.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at + FROM config_pairings + ORDER BY base_asset, quote_asset + """ + ) + return [ + ConfigPairing( + id=row[0], + base_asset=row[1], + quote_asset=row[2], + enabled=bool(row[3]), + source=row[4], + created_at=row[5], + updated_at=row[6] + ) + for row in cursor.fetchall() + ] + + +class ConfigPairFeeRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def create_pair_fee(self, pair_fee: ConfigPairFee) -> ConfigPairFee: + """Create a new pairing fee record.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + INSERT INTO config_pair_fees (pairing_id, market_type, maker_fee_rate, taker_fee_rate) + VALUES (?, ?, ?, ?) + RETURNING pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at + """, + ( + pair_fee.pairing_id, + pair_fee.market_type, + pair_fee.maker_fee_rate, + pair_fee.taker_fee_rate, + ), + ) + row = cursor.fetchone() + if row: + return ConfigPairFee( + pairing_id=row[0], + market_type=row[1], + maker_fee_rate=row[2], + taker_fee_rate=row[3], + updated_at=row[4] + ) + raise ValueError("Failed to create pair fee") + + def get_pair_fee(self, pairing_id: int, market_type: str) -> ConfigPairFee | None: + """Get a pairing fee by pairing ID and market type.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + SELECT pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at + FROM config_pair_fees + WHERE pairing_id = ? AND market_type = ? + """, + (pairing_id, market_type), + ) + row = cursor.fetchone() + if row: + return ConfigPairFee( + pairing_id=row[0], + market_type=row[1], + maker_fee_rate=row[2], + taker_fee_rate=row[3], + updated_at=row[4] + ) + return None + + def update_pair_fee(self, pairing_id: int, market_type: str, pair_fee: ConfigPairFee) -> ConfigPairFee: + """Update an existing pairing fee.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + UPDATE config_pair_fees + SET maker_fee_rate = ?, taker_fee_rate = ? + WHERE pairing_id = ? AND market_type = ? + RETURNING pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at + """, + ( + pair_fee.maker_fee_rate, + pair_fee.taker_fee_rate, + pairing_id, + market_type, + ), + ) + row = cursor.fetchone() + if row: + return ConfigPairFee( + pairing_id=row[0], + market_type=row[1], + maker_fee_rate=row[2], + taker_fee_rate=row[3], + updated_at=row[4] + ) + raise ValueError("Failed to update pair fee") + + def delete_pair_fee(self, pairing_id: int, market_type: str) -> bool: + """Delete a pairing fee.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + DELETE FROM config_pair_fees + WHERE pairing_id = ? AND market_type = ? + """, + (pairing_id, market_type), + ) + return cursor.rowcount > 0 + + def list_pair_fees(self, pairing_id: int) -> list[ConfigPairFee]: + """List all fees for a pairing.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + SELECT pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at + FROM config_pair_fees + WHERE pairing_id = ? + ORDER BY market_type + """, + (pairing_id,), + ) + return [ + ConfigPairFee( + pairing_id=row[0], + market_type=row[1], + maker_fee_rate=row[2], + taker_fee_rate=row[3], + updated_at=row[4] + ) + for row in cursor.fetchall() + ] + + +class ConfigBacktestingDefaultsRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: + """Create new backtesting defaults.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + INSERT INTO config_backtesting_defaults (starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms) + VALUES (?, ?, ?, ?, ?) + RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms + """, + ( + orjson.dumps(defaults.starting_balances).decode( + 'utf-8') if defaults.starting_balances else None, + defaults.trade_capital, + defaults.min_profit_threshold, + defaults.slippage_bps, + defaults.execution_latency_ms, + ), + ) + row = cursor.fetchone() + if row: + return ConfigBacktestingDefaults( + starting_balances=orjson.loads(row[1]) if row[1] else None, + trade_capital=row[2], + min_profit_threshold=row[3], + slippage_bps=row[4], + execution_latency_ms=row[5] + ) + raise ValueError("Failed to create backtesting defaults") + + def get_defaults(self) -> ConfigBacktestingDefaults | None: + """Get the current backtesting defaults.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + SELECT id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms + FROM config_backtesting_defaults + ORDER BY id DESC + LIMIT 1 + """ + ) + row = cursor.fetchone() + if row: + return ConfigBacktestingDefaults( + starting_balances=orjson.loads(row[1]) if row[1] else None, + trade_capital=row[2], + min_profit_threshold=row[3], + slippage_bps=row[4], + execution_latency_ms=row[5] + ) + return None + + def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: + """Update the backtesting defaults.""" + with self._store.connect() as conn: + cursor = conn.execute( + """ + UPDATE config_backtesting_defaults + SET starting_balances = ?, trade_capital = ?, min_profit_threshold = ?, slippage_bps = ?, execution_latency_ms = ? + WHERE id = ( + SELECT id FROM config_backtesting_defaults ORDER BY id DESC LIMIT 1 + ) + RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms + """, + ( + orjson.dumps(defaults.starting_balances).decode( + 'utf-8') if defaults.starting_balances else None, + defaults.trade_capital, + defaults.min_profit_threshold, + defaults.slippage_bps, + defaults.execution_latency_ms, + ), + ) + row = cursor.fetchone() + if row: + return ConfigBacktestingDefaults( + starting_balances=orjson.loads(row[1]) if row[1] else None, + trade_capital=row[2], + min_profit_threshold=row[3], + slippage_bps=row[4], + execution_latency_ms=row[5] + ) + raise ValueError("Failed to update backtesting defaults")