Files
arbitrade/src/arbitrade/exchange/kraken_ws.py
T

213 lines
7.4 KiB
Python

from __future__ import annotations
import asyncio
import time
from collections.abc import AsyncIterator
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
import orjson
import structlog
import websockets
from arbitrade.alerting.notifier import AlertSeverity, SupportsAlerts
from arbitrade.config.settings import Settings
from arbitrade.exchange.models import BookDelta, BookLevel
_LOG = structlog.get_logger(__name__)
@dataclass(slots=True)
class WsMessage:
received_at: datetime
payload: dict[str, Any]
class KrakenWsClient:
def __init__(self, settings: Settings, *, alert_notifier: SupportsAlerts | None = None) -> None:
self._settings = settings
self._last_message_at: datetime | None = None
self._stop = asyncio.Event()
self._alert_notifier = alert_notifier
self._has_connected_once = False
self._was_disconnected = False
self._subscribed_symbols: list[str] = []
@property
def is_stale(self) -> bool:
if self._last_message_at is None:
return True
return (
datetime.now(UTC) - self._last_message_at
).total_seconds() > self._settings.ws_max_staleness_seconds
async def stop(self) -> None:
self._stop.set()
def set_subscribed_symbols(self, symbols: list[str]) -> None:
"""Set the list of symbols to subscribe to on (re)connect."""
self._subscribed_symbols = list(symbols)
async def _subscribe(self, ws: Any) -> None:
"""Send Kraken WS v2 subscribe message for book channel."""
if not self._subscribed_symbols:
_LOG.warning("kraken_ws_no_symbols_to_subscribe")
return
depth = 10
if hasattr(self._settings, "kraken_ws_book_depth"):
depth = self._settings.kraken_ws_book_depth
msg = orjson.dumps(
{
"method": "subscribe",
"params": {
"channel": "book",
"symbol": self._subscribed_symbols,
"depth": depth,
},
}
)
await ws.send(msg)
_LOG.info(
"kraken_ws_subscribed",
symbol_count=len(self._subscribed_symbols),
symbols=self._subscribed_symbols,
)
async def connect_stream(self) -> AsyncIterator[WsMessage]:
delay = 1.0
while not self._stop.is_set():
try:
url = self._settings.kraken_ws_url
async with websockets.connect(url, max_size=2_000_000) as ws:
_LOG.info("kraken_ws_connected", url=url)
if self._has_connected_once and self._was_disconnected:
await self._notify(
category="system",
severity="info",
title="WebSocket reconnected",
message="Kraken WebSocket connection restored.",
details={"url": url},
)
self._has_connected_once = True
self._was_disconnected = False
delay = 1.0
await self._subscribe(ws)
async for raw in self._recv_loop(ws):
yield raw
except Exception as exc:
log = (
"kraken_ws_disconnected_first_time"
if not self._has_connected_once
else "kraken_ws_disconnected"
)
_LOG.warning(log, error=str(exc), reconnect_in=delay)
self._was_disconnected = True
await self._notify(
category="system",
severity="warning",
title="WebSocket disconnected",
message="Kraken WebSocket disconnected, reconnect scheduled.",
details={
"error": str(exc),
"reconnect_in_seconds": f"{delay}",
},
)
await asyncio.sleep(delay)
delay = min(delay * 2, 30.0)
async def _recv_loop(self, ws: Any) -> AsyncIterator[WsMessage]:
while not self._stop.is_set():
t0 = time.perf_counter()
try:
raw = await asyncio.wait_for(
ws.recv(), timeout=self._settings.ws_heartbeat_timeout_seconds
)
except TimeoutError:
await self._notify(
category="system",
severity="critical",
title="WebSocket staleness abort",
message="No WebSocket heartbeat within configured timeout; reconnecting.",
details={
"heartbeat_timeout_seconds": (
f"{self._settings.ws_heartbeat_timeout_seconds}"
),
},
)
raise
parse_start = time.perf_counter()
payload = orjson.loads(raw)
self._last_message_at = datetime.now(UTC)
_LOG.debug(
"kraken_ws_message",
recv_latency_ms=(parse_start - t0) * 1000,
parse_latency_ms=(time.perf_counter() - parse_start) * 1000,
)
if isinstance(payload, dict):
yield WsMessage(received_at=self._last_message_at, payload=payload)
async def _notify(
self,
*,
category: str,
severity: AlertSeverity,
title: str,
message: str,
details: dict[str, str] | None = None,
) -> None:
if self._alert_notifier is None:
return
await self._alert_notifier.notify(
category=category,
severity=severity,
title=title,
message=message,
details=details,
)
@staticmethod
def parse_book_delta(message: dict[str, Any]) -> BookDelta | None:
# Kraken v2 book update shape can vary by channel; keep parser defensive.
channel = str(message.get("channel", ""))
if "book" not in channel:
return None
symbol = str(message.get("symbol", ""))
data = message.get("data")
if not isinstance(data, list) or not data:
return None
first = data[0]
if not isinstance(first, dict):
return None
bids = [
BookLevel(price=float(level["price"]), volume=float(level["qty"]))
for level in first.get("bids", [])
if isinstance(level, dict) and "price" in level and "qty" in level
]
asks = [
BookLevel(price=float(level["price"]), volume=float(level["qty"]))
for level in first.get("asks", [])
if isinstance(level, dict) and "price" in level and "qty" in level
]
checksum: int | None = None
raw_checksum = first.get("checksum")
if isinstance(raw_checksum, int):
checksum = raw_checksum
source_timestamp_ms: int | None = None
if isinstance(first.get("timestamp"), int):
source_timestamp_ms = first["timestamp"]
return BookDelta(
symbol=symbol,
bids=bids,
asks=asks,
checksum=checksum,
source_timestamp_ms=source_timestamp_ms,
)