feat: implement configuration management with database support

This commit is contained in:
2026-06-02 15:11:56 +02:00
parent 1b21f2443a
commit 6b5973a0bb
4 changed files with 756 additions and 0 deletions
+2
View File
@@ -9,6 +9,7 @@ from arbitrade.alerting.notifier import build_notifier_from_settings
from arbitrade.api.control_state import DashboardControlState
from arbitrade.api.routes import public_router, router
from arbitrade.config.settings import Settings
from arbitrade.config.service import ConfigurationService
from arbitrade.logging_setup import configure_logging
from arbitrade.metrics import MetricsCalculator
from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state
@@ -35,6 +36,7 @@ def create_app(settings: Settings) -> FastAPI:
app.state.audit_repository = AuditRepository(db)
app.state.runtime_state_repository = RuntimeStateRepository(db)
app.state.alert_notifier = build_notifier_from_settings(settings)
app.state.configuration_service = ConfigurationService(settings, db, AuditRepository(db))
app.state.backtest_recent_reports = []
app.state.dashboard_controls = DashboardControlState(
is_running=not settings.kill_switch_active,
+166
View File
@@ -0,0 +1,166 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, cast
import orjson
from pydantic import BaseModel, Field
from arbitrade.config.settings import Settings
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.repositories import AuditRepository, ConfigSettingRepository
class ConfigSection(BaseModel):
id: int | None = None
name: str
description: str | None = None
updated_at: datetime | None = None
class ConfigSetting(BaseModel):
key: str
section: str
value_json: str
value_type: str
is_secret: bool = False
is_runtime_reloadable: bool = False
updated_at: datetime | None = None
updated_by: str | None = None
class ConfigPairing(BaseModel):
id: int | None = None
base_asset: str
quote_asset: str
enabled: bool = True
source: str
created_at: datetime | None = None
updated_at: datetime | None = None
class ConfigPairFee(BaseModel):
pairing_id: int
market_type: str # 'crypto_crypto' or 'crypto_fiat'
maker_fee_rate: float
taker_fee_rate: float
updated_at: datetime | None = None
class ConfigBacktestingDefaults(BaseModel):
starting_balances: dict[str, float] | None = None
trade_capital: float | None = None
min_profit_threshold: float | None = None
slippage_bps: int | None = None
execution_latency_ms: int | None = None
class ConfigurationService:
"""Manages application configuration from environment and database sources."""
def __init__(self, settings: Settings, store: DuckDBStore, audit_repo: AuditRepository) -> None:
self._settings = settings
self._store = store
self._audit_repo = audit_repo
self._config_version = 0
self._loaded_settings: dict[str, Any] = {}
self._setting_repo = ConfigSettingRepository(store)
self._load_database_settings()
def _load_database_settings(self) -> None:
"""Load user settings from database and merge with defaults."""
# Load all settings from database
db_settings = self._setting_repo.list_settings()
# Convert to dictionary for easy access
for setting in db_settings:
# Parse JSON value based on type
if setting.value_type == "str":
parsed_value = setting.value_json
elif setting.value_type == "int":
parsed_value = int(setting.value_json)
elif setting.value_type == "float":
parsed_value = float(setting.value_json)
elif setting.value_type == "bool":
parsed_value = setting.value_json.lower() == "true"
elif setting.value_type == "list":
parsed_value = orjson.loads(setting.value_json)
elif setting.value_type == "dict":
parsed_value = orjson.loads(setting.value_json)
else:
parsed_value = setting.value_json
self._loaded_settings[setting.key] = parsed_value
# Merge with environment settings (environment takes precedence)
# This is a simplified approach - in reality we'd want to merge properly
# but for now we'll just load DB settings and let environment override them
# Initialize with default values from settings model
self._initialize_default_settings()
def _initialize_default_settings(self) -> None:
"""Initialize default settings from the Settings model."""
# This is a placeholder - in a real implementation we'd map
# the Settings model fields to config keys
pass
def get_setting(self, key: str, default: Any = None) -> Any:
"""Get a configuration setting value."""
return self._loaded_settings.get(key, default)
def set_setting(self, key: str, value: Any, updated_by: str | None = None) -> None:
"""Set a configuration setting value and persist to database."""
# Convert value to JSON string and determine type
if isinstance(value, str):
value_json = value
value_type = "str"
elif isinstance(value, int):
value_json = str(value)
value_type = "int"
elif isinstance(value, float):
value_json = str(value)
value_type = "float"
elif isinstance(value, bool):
value_json = str(value).lower()
value_type = "bool"
elif isinstance(value, list):
value_json = orjson.dumps(value).decode('utf-8')
value_type = "list"
elif isinstance(value, dict):
value_json = orjson.dumps(value).decode('utf-8')
value_type = "dict"
else:
value_json = str(value)
value_type = "str"
# Create or update setting
setting = ConfigSetting(
key=key,
section="general", # Default section
value_json=value_json,
value_type=value_type,
is_secret=False,
is_runtime_reloadable=False,
updated_by=updated_by
)
# Check if setting exists
existing_setting = self._setting_repo.get_setting(key)
if existing_setting:
# Update existing setting
updated_setting = self._setting_repo.update_setting(key, setting)
else:
# Create new setting
updated_setting = self._setting_repo.create_setting(setting)
# Update in-memory cache
self._loaded_settings[key] = value
# Increment version for hot reloading
self._config_version += 1
def get_all_settings(self) -> dict[str, Any]:
"""Get all configuration settings."""
return self._loaded_settings.copy()
+47
View File
@@ -17,6 +17,53 @@ CREATE TABLE IF NOT EXISTS schema_migrations (
applied_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS config_sections (
id INTEGER PRIMARY KEY,
name VARCHAR UNIQUE NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS config_settings (
key VARCHAR PRIMARY KEY,
section VARCHAR NOT NULL,
value_json TEXT NOT NULL,
value_type VARCHAR NOT NULL,
is_secret BOOLEAN DEFAULT FALSE,
is_runtime_reloadable BOOLEAN DEFAULT FALSE,
updated_at TIMESTAMP DEFAULT current_timestamp,
updated_by VARCHAR
);
CREATE TABLE IF NOT EXISTS config_pairings (
id INTEGER PRIMARY KEY,
base_asset VARCHAR NOT NULL,
quote_asset VARCHAR NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
source VARCHAR NOT NULL,
created_at TIMESTAMP DEFAULT current_timestamp,
updated_at TIMESTAMP DEFAULT current_timestamp,
UNIQUE(base_asset, quote_asset)
);
CREATE TABLE IF NOT EXISTS config_pair_fees (
pairing_id INTEGER NOT NULL,
market_type VARCHAR NOT NULL, -- 'crypto_crypto' or 'crypto_fiat'
maker_fee_rate DOUBLE NOT NULL,
taker_fee_rate DOUBLE NOT NULL,
updated_at TIMESTAMP DEFAULT current_timestamp,
FOREIGN KEY (pairing_id) REFERENCES config_pairings(id)
);
CREATE TABLE IF NOT EXISTS config_backtesting_defaults (
id INTEGER PRIMARY KEY,
starting_balances JSON,
trade_capital DOUBLE,
min_profit_threshold DOUBLE,
slippage_bps INTEGER,
execution_latency_ms INTEGER
);
CREATE TABLE IF NOT EXISTS opportunities (
id UUID DEFAULT uuid(),
detected_at TIMESTAMP NOT NULL,
+541
View File
@@ -6,6 +6,7 @@ from typing import Any
import orjson
from arbitrade.config.service import ConfigBacktestingDefaults, ConfigPairing, ConfigSection, ConfigSetting, ConfigPairFee
from arbitrade.storage.db import DuckDBStore
@@ -376,3 +377,543 @@ class RuntimeStateRepository:
last_known_balances=balances,
note=str(row[6]) if row[6] is not None else None,
)
# Configuration repository classes
class ConfigSectionRepository:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
def create_section(self, section: ConfigSection) -> ConfigSection:
"""Create a new configuration section."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
INSERT INTO config_sections (name, description)
VALUES (?, ?)
RETURNING id, name, description, updated_at
""",
(section.name, section.description),
)
row = cursor.fetchone()
if row:
return ConfigSection(
id=row[0],
name=row[1],
description=row[2],
updated_at=row[3]
)
raise ValueError("Failed to create section")
def get_section(self, name: str) -> ConfigSection | None:
"""Get a configuration section by name."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
SELECT id, name, description, updated_at
FROM config_sections
WHERE name = ?
""",
(name,),
)
row = cursor.fetchone()
if row:
return ConfigSection(
id=row[0],
name=row[1],
description=row[2],
updated_at=row[3]
)
return None
def list_sections(self) -> list[ConfigSection]:
"""List all configuration sections."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
SELECT id, name, description, updated_at
FROM config_sections
ORDER BY name
"""
)
return [
ConfigSection(
id=row[0],
name=row[1],
description=row[2],
updated_at=row[3]
)
for row in cursor.fetchall()
]
class ConfigSettingRepository:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
def create_setting(self, setting: ConfigSetting) -> ConfigSetting:
"""Create a new configuration setting."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
INSERT INTO config_settings (key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_by)
VALUES (?, ?, ?, ?, ?, ?, ?)
RETURNING key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
""",
(
setting.key,
setting.section,
setting.value_json,
setting.value_type,
setting.is_secret,
setting.is_runtime_reloadable,
setting.updated_by,
),
)
row = cursor.fetchone()
if row:
return ConfigSetting(
key=row[0],
section=row[1],
value_json=row[2],
value_type=row[3],
is_secret=bool(row[4]),
is_runtime_reloadable=bool(row[5]),
updated_at=row[6],
updated_by=row[7]
)
raise ValueError("Failed to create setting")
def get_setting(self, key: str) -> ConfigSetting | None:
"""Get a configuration setting by key."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
FROM config_settings
WHERE key = ?
""",
(key,),
)
row = cursor.fetchone()
if row:
return ConfigSetting(
key=row[0],
section=row[1],
value_json=row[2],
value_type=row[3],
is_secret=bool(row[4]),
is_runtime_reloadable=bool(row[5]),
updated_at=row[6],
updated_by=row[7]
)
return None
def update_setting(self, key: str, setting: ConfigSetting) -> ConfigSetting:
"""Update an existing configuration setting."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
UPDATE config_settings
SET section = ?, value_json = ?, value_type = ?, is_secret = ?, is_runtime_reloadable = ?, updated_by = ?
WHERE key = ?
RETURNING key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
""",
(
setting.section,
setting.value_json,
setting.value_type,
setting.is_secret,
setting.is_runtime_reloadable,
setting.updated_by,
key,
),
)
row = cursor.fetchone()
if row:
return ConfigSetting(
key=row[0],
section=row[1],
value_json=row[2],
value_type=row[3],
is_secret=bool(row[4]),
is_runtime_reloadable=bool(row[5]),
updated_at=row[6],
updated_by=row[7]
)
raise ValueError("Failed to update setting")
def delete_setting(self, key: str) -> bool:
"""Delete a configuration setting."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
DELETE FROM config_settings
WHERE key = ?
""",
(key,),
)
return cursor.rowcount > 0
def list_settings(self, section: str | None = None) -> list[ConfigSetting]:
"""List all configuration settings, optionally filtered by section."""
with self._store.connect() as conn:
if section:
cursor = conn.execute(
"""
SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
FROM config_settings
WHERE section = ?
ORDER BY key
""",
(section,),
)
else:
cursor = conn.execute(
"""
SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
FROM config_settings
ORDER BY key
"""
)
return [
ConfigSetting(
key=row[0],
section=row[1],
value_json=row[2],
value_type=row[3],
is_secret=bool(row[4]),
is_runtime_reloadable=bool(row[5]),
updated_at=row[6],
updated_by=row[7]
)
for row in cursor.fetchall()
]
class ConfigPairingRepository:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
def create_pairing(self, pairing: ConfigPairing) -> ConfigPairing:
"""Create a new currency pairing."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
INSERT INTO config_pairings (base_asset, quote_asset, enabled, source)
VALUES (?, ?, ?, ?)
RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at
""",
(
pairing.base_asset,
pairing.quote_asset,
pairing.enabled,
pairing.source,
),
)
row = cursor.fetchone()
if row:
return ConfigPairing(
id=row[0],
base_asset=row[1],
quote_asset=row[2],
enabled=bool(row[3]),
source=row[4],
created_at=row[5],
updated_at=row[6]
)
raise ValueError("Failed to create pairing")
def get_pairing(self, base_asset: str, quote_asset: str) -> ConfigPairing | None:
"""Get a currency pairing by base and quote assets."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at
FROM config_pairings
WHERE base_asset = ? AND quote_asset = ?
""",
(base_asset, quote_asset),
)
row = cursor.fetchone()
if row:
return ConfigPairing(
id=row[0],
base_asset=row[1],
quote_asset=row[2],
enabled=bool(row[3]),
source=row[4],
created_at=row[5],
updated_at=row[6]
)
return None
def update_pairing(self, base_asset: str, quote_asset: str, pairing: ConfigPairing) -> ConfigPairing:
"""Update an existing currency pairing."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
UPDATE config_pairings
SET enabled = ?, source = ?
WHERE base_asset = ? AND quote_asset = ?
RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at
""",
(
pairing.enabled,
pairing.source,
base_asset,
quote_asset,
),
)
row = cursor.fetchone()
if row:
return ConfigPairing(
id=row[0],
base_asset=row[1],
quote_asset=row[2],
enabled=bool(row[3]),
source=row[4],
created_at=row[5],
updated_at=row[6]
)
raise ValueError("Failed to update pairing")
def delete_pairing(self, base_asset: str, quote_asset: str) -> bool:
"""Delete a currency pairing."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
DELETE FROM config_pairings
WHERE base_asset = ? AND quote_asset = ?
""",
(base_asset, quote_asset),
)
return cursor.rowcount > 0
def list_pairings(self) -> list[ConfigPairing]:
"""List all currency pairings."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at
FROM config_pairings
ORDER BY base_asset, quote_asset
"""
)
return [
ConfigPairing(
id=row[0],
base_asset=row[1],
quote_asset=row[2],
enabled=bool(row[3]),
source=row[4],
created_at=row[5],
updated_at=row[6]
)
for row in cursor.fetchall()
]
class ConfigPairFeeRepository:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
def create_pair_fee(self, pair_fee: ConfigPairFee) -> ConfigPairFee:
"""Create a new pairing fee record."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
INSERT INTO config_pair_fees (pairing_id, market_type, maker_fee_rate, taker_fee_rate)
VALUES (?, ?, ?, ?)
RETURNING pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at
""",
(
pair_fee.pairing_id,
pair_fee.market_type,
pair_fee.maker_fee_rate,
pair_fee.taker_fee_rate,
),
)
row = cursor.fetchone()
if row:
return ConfigPairFee(
pairing_id=row[0],
market_type=row[1],
maker_fee_rate=row[2],
taker_fee_rate=row[3],
updated_at=row[4]
)
raise ValueError("Failed to create pair fee")
def get_pair_fee(self, pairing_id: int, market_type: str) -> ConfigPairFee | None:
"""Get a pairing fee by pairing ID and market type."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
SELECT pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at
FROM config_pair_fees
WHERE pairing_id = ? AND market_type = ?
""",
(pairing_id, market_type),
)
row = cursor.fetchone()
if row:
return ConfigPairFee(
pairing_id=row[0],
market_type=row[1],
maker_fee_rate=row[2],
taker_fee_rate=row[3],
updated_at=row[4]
)
return None
def update_pair_fee(self, pairing_id: int, market_type: str, pair_fee: ConfigPairFee) -> ConfigPairFee:
"""Update an existing pairing fee."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
UPDATE config_pair_fees
SET maker_fee_rate = ?, taker_fee_rate = ?
WHERE pairing_id = ? AND market_type = ?
RETURNING pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at
""",
(
pair_fee.maker_fee_rate,
pair_fee.taker_fee_rate,
pairing_id,
market_type,
),
)
row = cursor.fetchone()
if row:
return ConfigPairFee(
pairing_id=row[0],
market_type=row[1],
maker_fee_rate=row[2],
taker_fee_rate=row[3],
updated_at=row[4]
)
raise ValueError("Failed to update pair fee")
def delete_pair_fee(self, pairing_id: int, market_type: str) -> bool:
"""Delete a pairing fee."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
DELETE FROM config_pair_fees
WHERE pairing_id = ? AND market_type = ?
""",
(pairing_id, market_type),
)
return cursor.rowcount > 0
def list_pair_fees(self, pairing_id: int) -> list[ConfigPairFee]:
"""List all fees for a pairing."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
SELECT pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at
FROM config_pair_fees
WHERE pairing_id = ?
ORDER BY market_type
""",
(pairing_id,),
)
return [
ConfigPairFee(
pairing_id=row[0],
market_type=row[1],
maker_fee_rate=row[2],
taker_fee_rate=row[3],
updated_at=row[4]
)
for row in cursor.fetchall()
]
class ConfigBacktestingDefaultsRepository:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults:
"""Create new backtesting defaults."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
INSERT INTO config_backtesting_defaults (starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms)
VALUES (?, ?, ?, ?, ?)
RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
""",
(
orjson.dumps(defaults.starting_balances).decode(
'utf-8') if defaults.starting_balances else None,
defaults.trade_capital,
defaults.min_profit_threshold,
defaults.slippage_bps,
defaults.execution_latency_ms,
),
)
row = cursor.fetchone()
if row:
return ConfigBacktestingDefaults(
starting_balances=orjson.loads(row[1]) if row[1] else None,
trade_capital=row[2],
min_profit_threshold=row[3],
slippage_bps=row[4],
execution_latency_ms=row[5]
)
raise ValueError("Failed to create backtesting defaults")
def get_defaults(self) -> ConfigBacktestingDefaults | None:
"""Get the current backtesting defaults."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
SELECT id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
FROM config_backtesting_defaults
ORDER BY id DESC
LIMIT 1
"""
)
row = cursor.fetchone()
if row:
return ConfigBacktestingDefaults(
starting_balances=orjson.loads(row[1]) if row[1] else None,
trade_capital=row[2],
min_profit_threshold=row[3],
slippage_bps=row[4],
execution_latency_ms=row[5]
)
return None
def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults:
"""Update the backtesting defaults."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
UPDATE config_backtesting_defaults
SET starting_balances = ?, trade_capital = ?, min_profit_threshold = ?, slippage_bps = ?, execution_latency_ms = ?
WHERE id = (
SELECT id FROM config_backtesting_defaults ORDER BY id DESC LIMIT 1
)
RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
""",
(
orjson.dumps(defaults.starting_balances).decode(
'utf-8') if defaults.starting_balances else None,
defaults.trade_capital,
defaults.min_profit_threshold,
defaults.slippage_bps,
defaults.execution_latency_ms,
),
)
row = cursor.fetchone()
if row:
return ConfigBacktestingDefaults(
starting_balances=orjson.loads(row[1]) if row[1] else None,
trade_capital=row[2],
min_profit_threshold=row[3],
slippage_bps=row[4],
execution_latency_ms=row[5]
)
raise ValueError("Failed to update backtesting defaults")