from __future__ import annotations import asyncio from dataclasses import dataclass, field from typing import Any import orjson import pytest from arbitrade.config.settings import Settings from arbitrade.exchange.kraken_ws import KrakenWsClient @dataclass(slots=True) class _FakeAlertNotifier: events: list[dict[str, str]] = field(default_factory=list) async def notify( self, *, category: str, severity: str, title: str, message: str, details: dict[str, str] | None = None, ) -> bool: self.events.append( { "category": category, "severity": severity, "title": title, "message": message, **(details or {}), } ) return True class _FakeWebSocket: def __init__(self, messages: list[Any]) -> None: self._messages = messages async def recv(self) -> str: if not self._messages: await asyncio.sleep(0) return orjson.dumps({"channel": "heartbeat"}).decode("utf-8") next_item = self._messages.pop(0) if isinstance(next_item, Exception): raise next_item return next_item class _FakeConnectContext: def __init__(self, ws: _FakeWebSocket) -> None: self._ws = ws async def __aenter__(self) -> _FakeWebSocket: return self._ws async def __aexit__(self, exc_type: object, exc: object, tb: object) -> bool: return False def test_parse_book_delta() -> None: client = KrakenWsClient(Settings()) message = { "channel": "book", "symbol": "BTC/USD", "data": [ { "bids": [{"price": "100.0", "qty": "1.2"}], "asks": [{"price": "100.5", "qty": "0.8"}], "checksum": 123, "timestamp": 1717232000000, } ], } delta = client.parse_book_delta(message) assert delta is not None assert delta.symbol == "BTC/USD" assert len(delta.bids) == 1 assert len(delta.asks) == 1 assert delta.checksum == 123 @pytest.mark.asyncio async def test_connect_stream_emits_disconnect_and_reconnect_alerts( monkeypatch: pytest.MonkeyPatch, ) -> None: notifier = _FakeAlertNotifier() settings = Settings(_env_file=None, WS_HEARTBEAT_TIMEOUT_SECONDS=1.0) client = KrakenWsClient(settings, alert_notifier=notifier) first_payload = orjson.dumps( {"channel": "book", "symbol": "BTC/USD", "data": [{"bids": [], "asks": []}]} ).decode("utf-8") second_payload = orjson.dumps( {"channel": "book", "symbol": "ETH/USD", "data": [{"bids": [], "asks": []}]} ).decode("utf-8") sessions = [ _FakeWebSocket([first_payload, RuntimeError("socket dropped")]), _FakeWebSocket([second_payload]), ] def _fake_connect(*_args: object, **_kwargs: object) -> _FakeConnectContext: return _FakeConnectContext(sessions.pop(0)) monkeypatch.setattr("arbitrade.exchange.kraken_ws.websockets.connect", _fake_connect) stream = client.connect_stream() first = await anext(stream) second = await anext(stream) await client.stop() await stream.aclose() assert first.payload["symbol"] == "BTC/USD" assert second.payload["symbol"] == "ETH/USD" titles = [event["title"] for event in notifier.events] assert "WebSocket disconnected" in titles assert "WebSocket reconnected" in titles @pytest.mark.asyncio async def test_recv_loop_emits_staleness_alert_on_timeout() -> None: notifier = _FakeAlertNotifier() settings = Settings(_env_file=None, WS_HEARTBEAT_TIMEOUT_SECONDS=0.001) client = KrakenWsClient(settings, alert_notifier=notifier) class _NeverReturnsWebSocket: async def recv(self) -> str: await asyncio.sleep(1) return "{}" with pytest.raises(TimeoutError): await anext(client._recv_loop(_NeverReturnsWebSocket())) assert len(notifier.events) == 1 assert notifier.events[0]["title"] == "WebSocket staleness abort"