935 lines
32 KiB
Python
935 lines
32 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import orjson
|
|
|
|
from arbitrade.config.service import ConfigBacktestingDefaults, ConfigPairing, ConfigSection, ConfigSetting, ConfigPairFee
|
|
from arbitrade.storage.db import DuckDBStore
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class MarketSnapshotRecord:
|
|
snapshot_at: datetime
|
|
symbol: str
|
|
source: str
|
|
payload: dict[str, Any]
|
|
latency_ms: float | None
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class OpportunityRecord:
|
|
detected_at: datetime
|
|
cycle: str
|
|
gross_pct: float
|
|
net_pct: float
|
|
est_profit: float
|
|
executed: bool = False
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class TradeRecord:
|
|
trade_ref: str
|
|
started_at: datetime
|
|
finished_at: datetime | None
|
|
status: str
|
|
realized_pnl: float | None
|
|
estimated_pnl: float | None
|
|
capital_used: float | None
|
|
cycle: str | None = None
|
|
leg_count: int | None = None
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class OrderRecord:
|
|
trade_ref: str
|
|
order_ref: str
|
|
leg_index: int
|
|
pair: str
|
|
side: str
|
|
volume: float
|
|
user_ref: int | None
|
|
status: str | None
|
|
filled_volume: float | None
|
|
avg_price: float | None
|
|
raw_response: dict[str, Any]
|
|
recorded_at: datetime
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class PnLRecord:
|
|
trade_ref: str
|
|
recorded_at: datetime
|
|
kind: str
|
|
pnl_usd: float
|
|
source: str
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class AuditRecord:
|
|
occurred_at: datetime
|
|
actor: str
|
|
event_type: str
|
|
decision: str
|
|
payload: dict[str, Any] | None = None
|
|
correlation_id: str | None = None
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class RuntimeStateRecord:
|
|
snapshot_at: datetime
|
|
is_running: bool
|
|
kill_switch_active: bool
|
|
kill_switch_reason: str | None
|
|
open_trade_count: int
|
|
last_known_balances: dict[str, Any] | None = None
|
|
note: str | None = None
|
|
|
|
|
|
class MarketSnapshotRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def insert(self, record: MarketSnapshotRecord) -> None:
|
|
with self._store.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO market_snapshots (snapshot_at, symbol, source, payload, latency_ms)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
record.snapshot_at,
|
|
record.symbol,
|
|
record.source,
|
|
orjson.dumps(record.payload).decode("utf-8"),
|
|
record.latency_ms,
|
|
],
|
|
)
|
|
|
|
|
|
class OpportunityRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def insert(self, record: OpportunityRecord) -> None:
|
|
with self._store.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO opportunities (
|
|
detected_at,
|
|
cycle,
|
|
gross_pct,
|
|
net_pct,
|
|
est_profit,
|
|
executed
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
record.detected_at,
|
|
record.cycle,
|
|
record.gross_pct,
|
|
record.net_pct,
|
|
record.est_profit,
|
|
record.executed,
|
|
],
|
|
)
|
|
|
|
|
|
class TradeRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def insert(self, record: TradeRecord) -> None:
|
|
with self._store.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO trades (
|
|
trade_ref,
|
|
started_at,
|
|
finished_at,
|
|
status,
|
|
realized_pnl,
|
|
estimated_pnl,
|
|
capital_used,
|
|
cycle,
|
|
leg_count
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
record.trade_ref,
|
|
record.started_at,
|
|
record.finished_at,
|
|
record.status,
|
|
record.realized_pnl,
|
|
record.estimated_pnl,
|
|
record.capital_used,
|
|
record.cycle,
|
|
record.leg_count,
|
|
],
|
|
)
|
|
|
|
|
|
class OrderRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def insert(self, record: OrderRecord) -> None:
|
|
with self._store.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO orders (
|
|
trade_ref,
|
|
order_ref,
|
|
leg_index,
|
|
pair,
|
|
side,
|
|
volume,
|
|
user_ref,
|
|
status,
|
|
filled_volume,
|
|
avg_price,
|
|
raw_response,
|
|
recorded_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
record.trade_ref,
|
|
record.order_ref,
|
|
record.leg_index,
|
|
record.pair,
|
|
record.side,
|
|
record.volume,
|
|
record.user_ref,
|
|
record.status,
|
|
record.filled_volume,
|
|
record.avg_price,
|
|
orjson.dumps(record.raw_response).decode("utf-8"),
|
|
record.recorded_at,
|
|
],
|
|
)
|
|
|
|
|
|
class PnLRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def insert(self, record: PnLRecord) -> None:
|
|
with self._store.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO pnl_events (
|
|
trade_ref,
|
|
recorded_at,
|
|
kind,
|
|
pnl_usd,
|
|
source
|
|
)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
record.trade_ref,
|
|
record.recorded_at,
|
|
record.kind,
|
|
record.pnl_usd,
|
|
record.source,
|
|
],
|
|
)
|
|
|
|
|
|
class AuditRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def insert(self, record: AuditRecord) -> None:
|
|
with self._store.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO audit_events (
|
|
occurred_at,
|
|
actor,
|
|
event_type,
|
|
decision,
|
|
payload,
|
|
correlation_id
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
record.occurred_at,
|
|
record.actor,
|
|
record.event_type,
|
|
record.decision,
|
|
(
|
|
None
|
|
if record.payload is None
|
|
else orjson.dumps(record.payload).decode("utf-8")
|
|
),
|
|
record.correlation_id,
|
|
],
|
|
)
|
|
|
|
def list_recent(self, *, limit: int = 25) -> list[AuditRecord]:
|
|
with self._store.connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT occurred_at, actor, event_type, decision, payload, correlation_id
|
|
FROM audit_events
|
|
ORDER BY occurred_at DESC
|
|
LIMIT ?
|
|
""",
|
|
[limit],
|
|
).fetchall()
|
|
|
|
records: list[AuditRecord] = []
|
|
for row in rows:
|
|
payload: dict[str, Any] | None = None
|
|
raw_payload = row[4]
|
|
if isinstance(raw_payload, str) and raw_payload:
|
|
decoded = orjson.loads(raw_payload)
|
|
if isinstance(decoded, dict):
|
|
payload = {str(k): decoded[k] for k in decoded}
|
|
|
|
records.append(
|
|
AuditRecord(
|
|
occurred_at=row[0],
|
|
actor=str(row[1]),
|
|
event_type=str(row[2]),
|
|
decision=str(row[3]),
|
|
payload=payload,
|
|
correlation_id=str(row[5]) if row[5] is not None else None,
|
|
)
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
class RuntimeStateRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def insert(self, record: RuntimeStateRecord) -> None:
|
|
with self._store.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO runtime_state_snapshots (
|
|
snapshot_at,
|
|
is_running,
|
|
kill_switch_active,
|
|
kill_switch_reason,
|
|
open_trade_count,
|
|
last_known_balances,
|
|
note
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
record.snapshot_at,
|
|
record.is_running,
|
|
record.kill_switch_active,
|
|
record.kill_switch_reason,
|
|
record.open_trade_count,
|
|
(
|
|
None
|
|
if record.last_known_balances is None
|
|
else orjson.dumps(record.last_known_balances).decode("utf-8")
|
|
),
|
|
record.note,
|
|
],
|
|
)
|
|
|
|
def latest(self) -> RuntimeStateRecord | None:
|
|
with self._store.connect() as conn:
|
|
row = conn.execute("""
|
|
SELECT
|
|
snapshot_at,
|
|
is_running,
|
|
kill_switch_active,
|
|
kill_switch_reason,
|
|
open_trade_count,
|
|
last_known_balances,
|
|
note
|
|
FROM runtime_state_snapshots
|
|
ORDER BY snapshot_at DESC
|
|
LIMIT 1
|
|
""").fetchone()
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
balances: dict[str, Any] | None = None
|
|
raw_balances = row[5]
|
|
if isinstance(raw_balances, str) and raw_balances:
|
|
decoded = orjson.loads(raw_balances)
|
|
if isinstance(decoded, dict):
|
|
balances = {str(key): decoded[key] for key in decoded}
|
|
|
|
return RuntimeStateRecord(
|
|
snapshot_at=row[0],
|
|
is_running=bool(row[1]),
|
|
kill_switch_active=bool(row[2]),
|
|
kill_switch_reason=str(row[3]) if row[3] is not None else None,
|
|
open_trade_count=int(row[4]),
|
|
last_known_balances=balances,
|
|
note=str(row[6]) if row[6] is not None else None,
|
|
)
|
|
|
|
|
|
# Configuration repository classes
|
|
class ConfigSectionRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def create_section(self, section: ConfigSection) -> ConfigSection:
|
|
"""Create a new configuration section."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO config_sections (name, description)
|
|
VALUES (?, ?)
|
|
RETURNING id, name, description, updated_at
|
|
""",
|
|
(section.name, section.description),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigSection(
|
|
id=row[0],
|
|
name=row[1],
|
|
description=row[2],
|
|
updated_at=row[3]
|
|
)
|
|
raise ValueError("Failed to create section")
|
|
|
|
def get_section(self, name: str) -> ConfigSection | None:
|
|
"""Get a configuration section by name."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id, name, description, updated_at
|
|
FROM config_sections
|
|
WHERE name = ?
|
|
""",
|
|
(name,),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigSection(
|
|
id=row[0],
|
|
name=row[1],
|
|
description=row[2],
|
|
updated_at=row[3]
|
|
)
|
|
return None
|
|
|
|
def list_sections(self) -> list[ConfigSection]:
|
|
"""List all configuration sections."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id, name, description, updated_at
|
|
FROM config_sections
|
|
ORDER BY name
|
|
"""
|
|
)
|
|
return [
|
|
ConfigSection(
|
|
id=row[0],
|
|
name=row[1],
|
|
description=row[2],
|
|
updated_at=row[3]
|
|
)
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
|
|
class ConfigSettingRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def create_setting(self, setting: ConfigSetting) -> ConfigSetting:
|
|
"""Create a new configuration setting."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO config_settings (key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
RETURNING key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
|
|
""",
|
|
(
|
|
setting.key,
|
|
setting.section,
|
|
setting.value_json,
|
|
setting.value_type,
|
|
setting.is_secret,
|
|
setting.is_runtime_reloadable,
|
|
setting.updated_by,
|
|
),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigSetting(
|
|
key=row[0],
|
|
section=row[1],
|
|
value_json=row[2],
|
|
value_type=row[3],
|
|
is_secret=bool(row[4]),
|
|
is_runtime_reloadable=bool(row[5]),
|
|
updated_at=row[6],
|
|
updated_by=row[7]
|
|
)
|
|
raise ValueError("Failed to create setting")
|
|
|
|
def get_setting(self, key: str) -> ConfigSetting | None:
|
|
"""Get a configuration setting by key."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
|
|
FROM config_settings
|
|
WHERE key = ?
|
|
""",
|
|
(key,),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigSetting(
|
|
key=row[0],
|
|
section=row[1],
|
|
value_json=row[2],
|
|
value_type=row[3],
|
|
is_secret=bool(row[4]),
|
|
is_runtime_reloadable=bool(row[5]),
|
|
updated_at=row[6],
|
|
updated_by=row[7]
|
|
)
|
|
return None
|
|
|
|
def update_setting(self, key: str, setting: ConfigSetting) -> ConfigSetting:
|
|
"""Update an existing configuration setting."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE config_settings
|
|
SET section = ?, value_json = ?, value_type = ?, is_secret = ?, is_runtime_reloadable = ?, updated_by = ?
|
|
WHERE key = ?
|
|
RETURNING key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
|
|
""",
|
|
(
|
|
setting.section,
|
|
setting.value_json,
|
|
setting.value_type,
|
|
setting.is_secret,
|
|
setting.is_runtime_reloadable,
|
|
setting.updated_by,
|
|
key,
|
|
),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigSetting(
|
|
key=row[0],
|
|
section=row[1],
|
|
value_json=row[2],
|
|
value_type=row[3],
|
|
is_secret=bool(row[4]),
|
|
is_runtime_reloadable=bool(row[5]),
|
|
updated_at=row[6],
|
|
updated_by=row[7]
|
|
)
|
|
raise ValueError("Failed to update setting")
|
|
|
|
def delete_setting(self, key: str) -> bool:
|
|
"""Delete a configuration setting."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
DELETE FROM config_settings
|
|
WHERE key = ?
|
|
""",
|
|
(key,),
|
|
)
|
|
return cursor.rowcount > 0
|
|
|
|
def list_settings(self, section: str | None = None) -> list[ConfigSetting]:
|
|
"""List all configuration settings, optionally filtered by section."""
|
|
with self._store.connect() as conn:
|
|
if section:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
|
|
FROM config_settings
|
|
WHERE section = ?
|
|
ORDER BY key
|
|
""",
|
|
(section,),
|
|
)
|
|
else:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
|
|
FROM config_settings
|
|
ORDER BY key
|
|
"""
|
|
)
|
|
return [
|
|
ConfigSetting(
|
|
key=row[0],
|
|
section=row[1],
|
|
value_json=row[2],
|
|
value_type=row[3],
|
|
is_secret=bool(row[4]),
|
|
is_runtime_reloadable=bool(row[5]),
|
|
updated_at=row[6],
|
|
updated_by=row[7]
|
|
)
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
def get_latest_updated_at(self) -> datetime | None:
|
|
"""Get the latest updated_at timestamp from config_settings table."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT MAX(updated_at) as latest_updated_at
|
|
FROM config_settings
|
|
"""
|
|
)
|
|
row = cursor.fetchone()
|
|
if row and row[0]:
|
|
# Convert string timestamp to datetime
|
|
return datetime.fromisoformat(row[0].replace('Z', '+00:00'))
|
|
return None
|
|
|
|
|
|
class ConfigPairingRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def create_pairing(self, pairing: ConfigPairing) -> ConfigPairing:
|
|
"""Create a new currency pairing."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO config_pairings (base_asset, quote_asset, enabled, source)
|
|
VALUES (?, ?, ?, ?)
|
|
RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at
|
|
""",
|
|
(
|
|
pairing.base_asset,
|
|
pairing.quote_asset,
|
|
pairing.enabled,
|
|
pairing.source,
|
|
),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigPairing(
|
|
id=row[0],
|
|
base_asset=row[1],
|
|
quote_asset=row[2],
|
|
enabled=bool(row[3]),
|
|
source=row[4],
|
|
created_at=row[5],
|
|
updated_at=row[6]
|
|
)
|
|
raise ValueError("Failed to create pairing")
|
|
|
|
def get_pairing(self, base_asset: str, quote_asset: str) -> ConfigPairing | None:
|
|
"""Get a currency pairing by base and quote assets."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at
|
|
FROM config_pairings
|
|
WHERE base_asset = ? AND quote_asset = ?
|
|
""",
|
|
(base_asset, quote_asset),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigPairing(
|
|
id=row[0],
|
|
base_asset=row[1],
|
|
quote_asset=row[2],
|
|
enabled=bool(row[3]),
|
|
source=row[4],
|
|
created_at=row[5],
|
|
updated_at=row[6]
|
|
)
|
|
return None
|
|
|
|
def update_pairing(self, base_asset: str, quote_asset: str, pairing: ConfigPairing) -> ConfigPairing:
|
|
"""Update an existing currency pairing."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE config_pairings
|
|
SET enabled = ?, source = ?
|
|
WHERE base_asset = ? AND quote_asset = ?
|
|
RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at
|
|
""",
|
|
(
|
|
pairing.enabled,
|
|
pairing.source,
|
|
base_asset,
|
|
quote_asset,
|
|
),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigPairing(
|
|
id=row[0],
|
|
base_asset=row[1],
|
|
quote_asset=row[2],
|
|
enabled=bool(row[3]),
|
|
source=row[4],
|
|
created_at=row[5],
|
|
updated_at=row[6]
|
|
)
|
|
raise ValueError("Failed to update pairing")
|
|
|
|
def delete_pairing(self, base_asset: str, quote_asset: str) -> bool:
|
|
"""Delete a currency pairing."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
DELETE FROM config_pairings
|
|
WHERE base_asset = ? AND quote_asset = ?
|
|
""",
|
|
(base_asset, quote_asset),
|
|
)
|
|
return cursor.rowcount > 0
|
|
|
|
def list_pairings(self) -> list[ConfigPairing]:
|
|
"""List all currency pairings."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at
|
|
FROM config_pairings
|
|
ORDER BY base_asset, quote_asset
|
|
"""
|
|
)
|
|
return [
|
|
ConfigPairing(
|
|
id=row[0],
|
|
base_asset=row[1],
|
|
quote_asset=row[2],
|
|
enabled=bool(row[3]),
|
|
source=row[4],
|
|
created_at=row[5],
|
|
updated_at=row[6]
|
|
)
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
|
|
class ConfigPairFeeRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def create_pair_fee(self, pair_fee: ConfigPairFee) -> ConfigPairFee:
|
|
"""Create a new pairing fee record."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO config_pair_fees (pairing_id, market_type, maker_fee_rate, taker_fee_rate)
|
|
VALUES (?, ?, ?, ?)
|
|
RETURNING pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at
|
|
""",
|
|
(
|
|
pair_fee.pairing_id,
|
|
pair_fee.market_type,
|
|
pair_fee.maker_fee_rate,
|
|
pair_fee.taker_fee_rate,
|
|
),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigPairFee(
|
|
pairing_id=row[0],
|
|
market_type=row[1],
|
|
maker_fee_rate=row[2],
|
|
taker_fee_rate=row[3],
|
|
updated_at=row[4]
|
|
)
|
|
raise ValueError("Failed to create pair fee")
|
|
|
|
def get_pair_fee(self, pairing_id: int, market_type: str) -> ConfigPairFee | None:
|
|
"""Get a pairing fee by pairing ID and market type."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at
|
|
FROM config_pair_fees
|
|
WHERE pairing_id = ? AND market_type = ?
|
|
""",
|
|
(pairing_id, market_type),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigPairFee(
|
|
pairing_id=row[0],
|
|
market_type=row[1],
|
|
maker_fee_rate=row[2],
|
|
taker_fee_rate=row[3],
|
|
updated_at=row[4]
|
|
)
|
|
return None
|
|
|
|
def update_pair_fee(self, pairing_id: int, market_type: str, pair_fee: ConfigPairFee) -> ConfigPairFee:
|
|
"""Update an existing pairing fee."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE config_pair_fees
|
|
SET maker_fee_rate = ?, taker_fee_rate = ?
|
|
WHERE pairing_id = ? AND market_type = ?
|
|
RETURNING pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at
|
|
""",
|
|
(
|
|
pair_fee.maker_fee_rate,
|
|
pair_fee.taker_fee_rate,
|
|
pairing_id,
|
|
market_type,
|
|
),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigPairFee(
|
|
pairing_id=row[0],
|
|
market_type=row[1],
|
|
maker_fee_rate=row[2],
|
|
taker_fee_rate=row[3],
|
|
updated_at=row[4]
|
|
)
|
|
raise ValueError("Failed to update pair fee")
|
|
|
|
def delete_pair_fee(self, pairing_id: int, market_type: str) -> bool:
|
|
"""Delete a pairing fee."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
DELETE FROM config_pair_fees
|
|
WHERE pairing_id = ? AND market_type = ?
|
|
""",
|
|
(pairing_id, market_type),
|
|
)
|
|
return cursor.rowcount > 0
|
|
|
|
def list_pair_fees(self, pairing_id: int) -> list[ConfigPairFee]:
|
|
"""List all fees for a pairing."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at
|
|
FROM config_pair_fees
|
|
WHERE pairing_id = ?
|
|
ORDER BY market_type
|
|
""",
|
|
(pairing_id,),
|
|
)
|
|
return [
|
|
ConfigPairFee(
|
|
pairing_id=row[0],
|
|
market_type=row[1],
|
|
maker_fee_rate=row[2],
|
|
taker_fee_rate=row[3],
|
|
updated_at=row[4]
|
|
)
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
|
|
class ConfigBacktestingDefaultsRepository:
|
|
def __init__(self, store: DuckDBStore) -> None:
|
|
self._store = store
|
|
|
|
def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults:
|
|
"""Create new backtesting defaults."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO config_backtesting_defaults (starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
|
|
""",
|
|
(
|
|
orjson.dumps(defaults.starting_balances).decode(
|
|
'utf-8') if defaults.starting_balances else None,
|
|
defaults.trade_capital,
|
|
defaults.min_profit_threshold,
|
|
defaults.slippage_bps,
|
|
defaults.execution_latency_ms,
|
|
),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigBacktestingDefaults(
|
|
starting_balances=orjson.loads(row[1]) if row[1] else None,
|
|
trade_capital=row[2],
|
|
min_profit_threshold=row[3],
|
|
slippage_bps=row[4],
|
|
execution_latency_ms=row[5]
|
|
)
|
|
raise ValueError("Failed to create backtesting defaults")
|
|
|
|
def get_defaults(self) -> ConfigBacktestingDefaults | None:
|
|
"""Get the current backtesting defaults."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
|
|
FROM config_backtesting_defaults
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
"""
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigBacktestingDefaults(
|
|
starting_balances=orjson.loads(row[1]) if row[1] else None,
|
|
trade_capital=row[2],
|
|
min_profit_threshold=row[3],
|
|
slippage_bps=row[4],
|
|
execution_latency_ms=row[5]
|
|
)
|
|
return None
|
|
|
|
def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults:
|
|
"""Update the backtesting defaults."""
|
|
with self._store.connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE config_backtesting_defaults
|
|
SET starting_balances = ?, trade_capital = ?, min_profit_threshold = ?, slippage_bps = ?, execution_latency_ms = ?
|
|
WHERE id = (
|
|
SELECT id FROM config_backtesting_defaults ORDER BY id DESC LIMIT 1
|
|
)
|
|
RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
|
|
""",
|
|
(
|
|
orjson.dumps(defaults.starting_balances).decode(
|
|
'utf-8') if defaults.starting_balances else None,
|
|
defaults.trade_capital,
|
|
defaults.min_profit_threshold,
|
|
defaults.slippage_bps,
|
|
defaults.execution_latency_ms,
|
|
),
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return ConfigBacktestingDefaults(
|
|
starting_balances=orjson.loads(row[1]) if row[1] else None,
|
|
trade_capital=row[2],
|
|
min_profit_threshold=row[3],
|
|
slippage_bps=row[4],
|
|
execution_latency_ms=row[5]
|
|
)
|
|
raise ValueError("Failed to update backtesting defaults")
|