From 3f4b9a40123692c0b9ffb3dfa8976dbe94c61830 Mon Sep 17 00:00:00 2001 From: zwitschi Date: Thu, 4 Jun 2026 22:28:57 +0200 Subject: [PATCH] refactor: streamline WebSocket connection logging and improve data handling in repositories --- src/arbitrade/exchange/kraken_ws.py | 18 +++---- src/arbitrade/storage/repositories.py | 69 +++++++++++++++------------ 2 files changed, 49 insertions(+), 38 deletions(-) diff --git a/src/arbitrade/exchange/kraken_ws.py b/src/arbitrade/exchange/kraken_ws.py index a250106..866315b 100644 --- a/src/arbitrade/exchange/kraken_ws.py +++ b/src/arbitrade/exchange/kraken_ws.py @@ -78,18 +78,16 @@ class KrakenWsClient: delay = 1.0 while not self._stop.is_set(): try: - async with websockets.connect( - self._settings.kraken_ws_url, max_size=2_000_000 - ) as ws: - _LOG.info("kraken_ws_connected", - url=self._settings.kraken_ws_url) + url = self._settings.kraken_ws_url + async with websockets.connect(url, max_size=2_000_000) as ws: + _LOG.info("kraken_ws_connected", url=url) if self._has_connected_once and self._was_disconnected: await self._notify( category="system", severity="info", title="WebSocket reconnected", message="Kraken WebSocket connection restored.", - details={"url": self._settings.kraken_ws_url}, + details={"url": url}, ) self._has_connected_once = True self._was_disconnected = False @@ -98,8 +96,12 @@ class KrakenWsClient: async for raw in self._recv_loop(ws): yield raw except Exception as exc: - _LOG.warning("kraken_ws_disconnected", - error=str(exc), reconnect_in=delay) + log = ( + "kraken_ws_disconnected_first_time" + if not self._has_connected_once + else "kraken_ws_disconnected" + ) + _LOG.warning(log, error=str(exc), reconnect_in=delay) self._was_disconnected = True await self._notify( category="system", diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index b35c5b1..ca7e15a 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -421,6 +421,9 @@ class ConfigSectionRepository: return ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3]) return None + def _build_section_from_row(self, row: tuple) -> ConfigSection: + return ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3]) + def list_sections(self) -> list[ConfigSection]: """List all configuration sections.""" with self._store.connect() as conn: @@ -429,11 +432,13 @@ class ConfigSectionRepository: FROM config_sections ORDER BY name """) - return [ - ConfigSection(id=row[0], name=row[1], - description=row[2], updated_at=row[3]) - for row in cursor.fetchall() - ] + c = cursor.fetchall() + r = [] + for row in c: + s = self._build_section_from_row(row) + r.append(s) + + return r class ConfigSettingRepository: @@ -758,6 +763,11 @@ class ConfigBacktestingDefaultsRepository: def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: """Create new backtesting defaults.""" with self._store.connect() as conn: + balances_json = ( + orjson.dumps(defaults.starting_balances).decode("utf-8") + if defaults.starting_balances + else None + ) cursor = conn.execute( """ INSERT INTO config_backtesting_defaults (starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms) @@ -765,12 +775,7 @@ class ConfigBacktestingDefaultsRepository: RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms """, ( - ( - orjson.dumps( - defaults.starting_balances).decode("utf-8") - if defaults.starting_balances - else None - ), + balances_json, defaults.trade_capital, defaults.min_profit_threshold, defaults.slippage_bps, @@ -811,6 +816,11 @@ class ConfigBacktestingDefaultsRepository: def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: """Update the backtesting defaults.""" with self._store.connect() as conn: + starting_balances_json = ( + orjson.dumps(defaults.starting_balances).decode("utf-8") + if defaults.starting_balances + else None + ) cursor = conn.execute( """ UPDATE config_backtesting_defaults @@ -821,12 +831,7 @@ class ConfigBacktestingDefaultsRepository: RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms """, ( - ( - orjson.dumps( - defaults.starting_balances).decode("utf-8") - if defaults.starting_balances - else None - ), + starting_balances_json, defaults.trade_capital, defaults.min_profit_threshold, defaults.slippage_bps, @@ -862,6 +867,16 @@ class KrakenAccountSnapshotRepository: def insert_snapshot(self, snapshot: KrakenAccountSnapshot) -> None: with self._store.connect() as conn: + trade_balance_json = ( + orjson.dumps(snapshot.trade_balance_raw).decode("utf-8") + if snapshot.trade_balance_raw + else None + ) + fee_schedule_json = ( + orjson.dumps(snapshot.fee_schedule_raw).decode("utf-8") + if snapshot.fee_schedule_raw + else None + ) conn.execute( """ INSERT INTO kraken_account_snapshots @@ -875,17 +890,8 @@ class KrakenAccountSnapshotRepository: snapshot.maker_fee, snapshot.taker_fee, snapshot.thirty_day_volume, - ( - orjson.dumps( - snapshot.trade_balance_raw).decode("utf-8") - if snapshot.trade_balance_raw - else None - ), - ( - orjson.dumps(snapshot.fee_schedule_raw).decode("utf-8") - if snapshot.fee_schedule_raw - else None - ), + trade_balance_json, + fee_schedule_json, ), ) @@ -932,14 +938,17 @@ class BacktestJobRepository: self, events_path: str, config: dict[str, Any] | None = None ) -> BacktestJobRecord: with self._store.connect() as conn: + if config is not None: + job_config_json = orjson.dumps(config).decode("utf-8") + else: + raise ValueError("Config is required.") row = conn.execute( """ INSERT INTO backtest_jobs (events_path, config) VALUES (?, ?) RETURNING id, status, events_path, config, created_at """, - (events_path, orjson.dumps(config).decode( - "utf-8") if config else None), + (events_path, job_config_json), ).fetchone() if row is None: raise ValueError("Failed to create backtest job")