Files
arbitrade/src/arbitrade/storage/db.py
T

308 lines
10 KiB
Python

from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
import duckdb
import structlog
from arbitrade.config.settings import Settings
_LOG = structlog.get_logger(__name__)
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS config_sections (
id INTEGER PRIMARY KEY,
name VARCHAR UNIQUE NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS config_settings (
key VARCHAR PRIMARY KEY,
section VARCHAR NOT NULL,
value_json TEXT NOT NULL,
value_type VARCHAR NOT NULL,
is_secret BOOLEAN DEFAULT FALSE,
is_runtime_reloadable BOOLEAN DEFAULT FALSE,
updated_at TIMESTAMP DEFAULT current_timestamp,
updated_by VARCHAR
);
CREATE TABLE IF NOT EXISTS config_pairings (
id INTEGER PRIMARY KEY,
base_asset VARCHAR NOT NULL,
quote_asset VARCHAR NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
source VARCHAR NOT NULL,
created_at TIMESTAMP DEFAULT current_timestamp,
updated_at TIMESTAMP DEFAULT current_timestamp,
UNIQUE(base_asset, quote_asset)
);
CREATE TABLE IF NOT EXISTS config_backtesting_defaults (
id INTEGER PRIMARY KEY,
starting_balances JSON,
trade_capital DOUBLE,
min_profit_threshold DOUBLE,
slippage_bps INTEGER,
execution_latency_ms INTEGER,
fee_source VARCHAR DEFAULT 'api'
);
CREATE TABLE IF NOT EXISTS opportunities (
id UUID DEFAULT uuid(),
detected_at TIMESTAMP NOT NULL,
cycle VARCHAR NOT NULL,
gross_pct DOUBLE,
net_pct DOUBLE,
est_profit DOUBLE,
executed BOOLEAN DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS trades (
id UUID DEFAULT uuid(),
trade_ref VARCHAR NOT NULL,
started_at TIMESTAMP NOT NULL,
finished_at TIMESTAMP,
status VARCHAR NOT NULL,
realized_pnl DOUBLE,
estimated_pnl DOUBLE,
capital_used DOUBLE,
cycle VARCHAR,
leg_count INTEGER
);
CREATE TABLE IF NOT EXISTS orders (
id UUID DEFAULT uuid(),
trade_ref VARCHAR NOT NULL,
order_ref VARCHAR NOT NULL,
leg_index INTEGER NOT NULL,
pair VARCHAR NOT NULL,
side VARCHAR NOT NULL,
volume DOUBLE NOT NULL,
user_ref INTEGER,
status VARCHAR,
filled_volume DOUBLE,
avg_price DOUBLE,
raw_response JSON,
recorded_at TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS pnl_events (
id UUID DEFAULT uuid(),
trade_ref VARCHAR NOT NULL,
recorded_at TIMESTAMP NOT NULL,
kind VARCHAR NOT NULL,
pnl_usd DOUBLE NOT NULL,
source VARCHAR NOT NULL
);
CREATE TABLE IF NOT EXISTS portfolio_snapshots (
snapshot_at TIMESTAMP NOT NULL,
balances JSON,
total_value_usd DOUBLE
);
CREATE TABLE IF NOT EXISTS market_snapshots (
snapshot_at TIMESTAMP NOT NULL,
symbol VARCHAR NOT NULL,
source VARCHAR NOT NULL,
payload JSON NOT NULL,
latency_ms DOUBLE
);
CREATE TABLE IF NOT EXISTS audit_events (
id UUID DEFAULT uuid(),
occurred_at TIMESTAMP NOT NULL,
actor VARCHAR NOT NULL,
event_type VARCHAR NOT NULL,
decision VARCHAR NOT NULL,
payload JSON,
correlation_id VARCHAR
);
CREATE TABLE IF NOT EXISTS runtime_state_snapshots (
snapshot_at TIMESTAMP NOT NULL,
is_running BOOLEAN NOT NULL,
kill_switch_active BOOLEAN NOT NULL,
kill_switch_reason VARCHAR,
open_trade_count INTEGER NOT NULL,
last_known_balances JSON,
note VARCHAR
);
CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
snapshot_at TIMESTAMP NOT NULL,
fee_tier VARCHAR,
maker_fee DOUBLE,
taker_fee DOUBLE,
thirty_day_volume DOUBLE,
trade_balance_raw JSON,
fee_schedule_raw JSON
);
CREATE TABLE IF NOT EXISTS backtest_jobs (
id UUID DEFAULT uuid(),
status VARCHAR NOT NULL DEFAULT 'pending',
events_path VARCHAR NOT NULL,
config JSON,
report JSON,
error VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
started_at TIMESTAMP,
finished_at TIMESTAMP
);
"""
class DuckDBStore:
SCHEMA_VERSION = 5
def __init__(self, settings: Settings) -> None:
self._db_path = Path(settings.duckdb_path)
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._use_memory_fallback = False
@contextmanager
def connect(self) -> Iterator[duckdb.DuckDBPyConnection]:
try:
conn = duckdb.connect(str(self._db_path))
except duckdb.IOException:
if not self._use_memory_fallback:
_LOG.warning(
"duckdb_path_unavailable_falling_back_to_memory", path=str(self._db_path)
)
self._use_memory_fallback = True
conn = duckdb.connect(":memory:")
try:
yield conn
finally:
conn.close()
def _get_table_columns(self, conn, table_name: str) -> set[str]:
try:
rows = conn.execute(f"PRAGMA table_info({table_name})").fetchall()
return {str(row[1]) for row in rows}
except Exception:
return set()
def _table_exists(self, conn, table_name: str) -> bool:
try:
result = conn.execute(
f"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_name}'"
).fetchone()
return result[0] > 0
except Exception:
return False
def _ensure_column(self, conn, table_name: str, column_def: str) -> None:
"""Add a column to a table if it doesn't already exist."""
existing = self._get_table_columns(conn, table_name)
col_name = column_def.split()[0]
if col_name not in existing:
conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_def}")
def migrate(self) -> None:
with self.connect() as conn:
# Run CREATE TABLE IF NOT EXISTS for all tables
conn.execute(SCHEMA_SQL)
# Ensure schema_migrations table exists and get current version
if not self._table_exists(conn, "schema_migrations"):
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT current_timestamp
)
""")
# Get current schema version
try:
row = conn.execute(
"SELECT version FROM schema_migrations ORDER BY version DESC LIMIT 1"
).fetchone()
current_version = row[0] if row else 0
except Exception:
current_version = 0
# Apply migrations for each version
if current_version < 1:
# Migration v1: Add missing columns to trades table
# Note: DuckDB does not support ADD COLUMN with constraints
conn.execute(
"ALTER TABLE trades ADD COLUMN IF NOT EXISTS trade_ref VARCHAR")
conn.execute(
"ALTER TABLE trades ADD COLUMN IF NOT EXISTS estimated_pnl DOUBLE")
conn.execute(
"ALTER TABLE trades ADD COLUMN IF NOT EXISTS capital_used DOUBLE")
conn.execute(
"ALTER TABLE trades ADD COLUMN IF NOT EXISTS cycle VARCHAR")
conn.execute(
"ALTER TABLE trades ADD COLUMN IF NOT EXISTS leg_count INTEGER")
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (1)")
_LOG.info("migration_applied", version=1)
if current_version < 2:
# Migration v2: Ensure config_backtesting_defaults table
# config_backtesting_defaults already created by SCHEMA_SQL
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (2)")
_LOG.info("migration_applied", version=2)
if current_version < 3:
# Migration v3: Add kraken_account_snapshots table
conn.execute("""
CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
snapshot_at TIMESTAMP NOT NULL,
fee_tier VARCHAR,
maker_fee DOUBLE,
taker_fee DOUBLE,
thirty_day_volume DOUBLE,
trade_balance_raw JSON,
fee_schedule_raw JSON
)
""")
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (3)")
_LOG.info("migration_applied", version=3)
if current_version < 4:
# Migration v4: Add fee_source to backtesting defaults
conn.execute(
"ALTER TABLE config_backtesting_defaults ADD COLUMN IF NOT EXISTS fee_source VARCHAR DEFAULT 'api'")
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (4)")
_LOG.info("migration_applied", version=4)
if current_version < 5:
conn.execute("""
CREATE TABLE IF NOT EXISTS backtest_jobs (
id UUID DEFAULT uuid(),
status VARCHAR NOT NULL DEFAULT 'pending',
events_path VARCHAR NOT NULL,
config JSON,
report JSON,
error VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
started_at TIMESTAMP,
finished_at TIMESTAMP
)
""")
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (5)")
_LOG.info("migration_applied", version=5)
# Update version to current
conn.execute(
f"INSERT OR REPLACE INTO schema_migrations (version, applied_at) "
f"VALUES ({self.SCHEMA_VERSION}, current_timestamp)"
)