refactor: streamline WebSocket connection logging and improve data handling in repositories

This commit is contained in:
2026-06-04 22:28:57 +02:00
parent 4c59a0e4cb
commit 3f4b9a4012
2 changed files with 49 additions and 38 deletions
+10 -8
View File
@@ -78,18 +78,16 @@ class KrakenWsClient:
delay = 1.0 delay = 1.0
while not self._stop.is_set(): while not self._stop.is_set():
try: try:
async with websockets.connect( url = self._settings.kraken_ws_url
self._settings.kraken_ws_url, max_size=2_000_000 async with websockets.connect(url, max_size=2_000_000) as ws:
) as ws: _LOG.info("kraken_ws_connected", url=url)
_LOG.info("kraken_ws_connected",
url=self._settings.kraken_ws_url)
if self._has_connected_once and self._was_disconnected: if self._has_connected_once and self._was_disconnected:
await self._notify( await self._notify(
category="system", category="system",
severity="info", severity="info",
title="WebSocket reconnected", title="WebSocket reconnected",
message="Kraken WebSocket connection restored.", message="Kraken WebSocket connection restored.",
details={"url": self._settings.kraken_ws_url}, details={"url": url},
) )
self._has_connected_once = True self._has_connected_once = True
self._was_disconnected = False self._was_disconnected = False
@@ -98,8 +96,12 @@ class KrakenWsClient:
async for raw in self._recv_loop(ws): async for raw in self._recv_loop(ws):
yield raw yield raw
except Exception as exc: except Exception as exc:
_LOG.warning("kraken_ws_disconnected", log = (
error=str(exc), reconnect_in=delay) "kraken_ws_disconnected_first_time"
if not self._has_connected_once
else "kraken_ws_disconnected"
)
_LOG.warning(log, error=str(exc), reconnect_in=delay)
self._was_disconnected = True self._was_disconnected = True
await self._notify( await self._notify(
category="system", category="system",
+39 -30
View File
@@ -421,6 +421,9 @@ class ConfigSectionRepository:
return ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3]) return ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3])
return None return None
def _build_section_from_row(self, row: tuple) -> ConfigSection:
return ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3])
def list_sections(self) -> list[ConfigSection]: def list_sections(self) -> list[ConfigSection]:
"""List all configuration sections.""" """List all configuration sections."""
with self._store.connect() as conn: with self._store.connect() as conn:
@@ -429,11 +432,13 @@ class ConfigSectionRepository:
FROM config_sections FROM config_sections
ORDER BY name ORDER BY name
""") """)
return [ c = cursor.fetchall()
ConfigSection(id=row[0], name=row[1], r = []
description=row[2], updated_at=row[3]) for row in c:
for row in cursor.fetchall() s = self._build_section_from_row(row)
] r.append(s)
return r
class ConfigSettingRepository: class ConfigSettingRepository:
@@ -758,6 +763,11 @@ class ConfigBacktestingDefaultsRepository:
def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults:
"""Create new backtesting defaults.""" """Create new backtesting defaults."""
with self._store.connect() as conn: with self._store.connect() as conn:
balances_json = (
orjson.dumps(defaults.starting_balances).decode("utf-8")
if defaults.starting_balances
else None
)
cursor = conn.execute( cursor = conn.execute(
""" """
INSERT INTO config_backtesting_defaults (starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms) INSERT INTO config_backtesting_defaults (starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms)
@@ -765,12 +775,7 @@ class ConfigBacktestingDefaultsRepository:
RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
""", """,
( (
( balances_json,
orjson.dumps(
defaults.starting_balances).decode("utf-8")
if defaults.starting_balances
else None
),
defaults.trade_capital, defaults.trade_capital,
defaults.min_profit_threshold, defaults.min_profit_threshold,
defaults.slippage_bps, defaults.slippage_bps,
@@ -811,6 +816,11 @@ class ConfigBacktestingDefaultsRepository:
def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults:
"""Update the backtesting defaults.""" """Update the backtesting defaults."""
with self._store.connect() as conn: with self._store.connect() as conn:
starting_balances_json = (
orjson.dumps(defaults.starting_balances).decode("utf-8")
if defaults.starting_balances
else None
)
cursor = conn.execute( cursor = conn.execute(
""" """
UPDATE config_backtesting_defaults UPDATE config_backtesting_defaults
@@ -821,12 +831,7 @@ class ConfigBacktestingDefaultsRepository:
RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
""", """,
( (
( starting_balances_json,
orjson.dumps(
defaults.starting_balances).decode("utf-8")
if defaults.starting_balances
else None
),
defaults.trade_capital, defaults.trade_capital,
defaults.min_profit_threshold, defaults.min_profit_threshold,
defaults.slippage_bps, defaults.slippage_bps,
@@ -862,6 +867,16 @@ class KrakenAccountSnapshotRepository:
def insert_snapshot(self, snapshot: KrakenAccountSnapshot) -> None: def insert_snapshot(self, snapshot: KrakenAccountSnapshot) -> None:
with self._store.connect() as conn: with self._store.connect() as conn:
trade_balance_json = (
orjson.dumps(snapshot.trade_balance_raw).decode("utf-8")
if snapshot.trade_balance_raw
else None
)
fee_schedule_json = (
orjson.dumps(snapshot.fee_schedule_raw).decode("utf-8")
if snapshot.fee_schedule_raw
else None
)
conn.execute( conn.execute(
""" """
INSERT INTO kraken_account_snapshots INSERT INTO kraken_account_snapshots
@@ -875,17 +890,8 @@ class KrakenAccountSnapshotRepository:
snapshot.maker_fee, snapshot.maker_fee,
snapshot.taker_fee, snapshot.taker_fee,
snapshot.thirty_day_volume, snapshot.thirty_day_volume,
( trade_balance_json,
orjson.dumps( fee_schedule_json,
snapshot.trade_balance_raw).decode("utf-8")
if snapshot.trade_balance_raw
else None
),
(
orjson.dumps(snapshot.fee_schedule_raw).decode("utf-8")
if snapshot.fee_schedule_raw
else None
),
), ),
) )
@@ -932,14 +938,17 @@ class BacktestJobRepository:
self, events_path: str, config: dict[str, Any] | None = None self, events_path: str, config: dict[str, Any] | None = None
) -> BacktestJobRecord: ) -> BacktestJobRecord:
with self._store.connect() as conn: with self._store.connect() as conn:
if config is not None:
job_config_json = orjson.dumps(config).decode("utf-8")
else:
raise ValueError("Config is required.")
row = conn.execute( row = conn.execute(
""" """
INSERT INTO backtest_jobs (events_path, config) INSERT INTO backtest_jobs (events_path, config)
VALUES (?, ?) VALUES (?, ?)
RETURNING id, status, events_path, config, created_at RETURNING id, status, events_path, config, created_at
""", """,
(events_path, orjson.dumps(config).decode( (events_path, job_config_json),
"utf-8") if config else None),
).fetchone() ).fetchone()
if row is None: if row is None:
raise ValueError("Failed to create backtest job") raise ValueError("Failed to create backtest job")