c17f41aaf8
- Introduced new tables for audit events and runtime state snapshots in the database schema. - Created data classes for AuditRecord and RuntimeStateRecord to represent the new entities. - Implemented AuditRepository and RuntimeStateRepository for inserting and retrieving records. - Enhanced the dashboard to include an audit trail section, displaying recent audit events. - Added tests for the new audit repository and runtime lifecycle functionalities. - Updated settings validation to ensure proper configuration for alerting features. - Integrated alert notifications across various components, including execution sequencer and loss limits.
142 lines
4.0 KiB
Python
142 lines
4.0 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
import orjson
|
|
import pytest
|
|
|
|
from arbitrade.config.settings import Settings
|
|
from arbitrade.exchange.kraken_ws import KrakenWsClient
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class _FakeAlertNotifier:
|
|
events: list[dict[str, str]] = field(default_factory=list)
|
|
|
|
async def notify(
|
|
self,
|
|
*,
|
|
category: str,
|
|
severity: str,
|
|
title: str,
|
|
message: str,
|
|
details: dict[str, str] | None = None,
|
|
) -> bool:
|
|
self.events.append(
|
|
{
|
|
"category": category,
|
|
"severity": severity,
|
|
"title": title,
|
|
"message": message,
|
|
**(details or {}),
|
|
}
|
|
)
|
|
return True
|
|
|
|
|
|
class _FakeWebSocket:
|
|
def __init__(self, messages: list[Any]) -> None:
|
|
self._messages = messages
|
|
|
|
async def recv(self) -> str:
|
|
if not self._messages:
|
|
await asyncio.sleep(0)
|
|
return orjson.dumps({"channel": "heartbeat"}).decode("utf-8")
|
|
next_item = self._messages.pop(0)
|
|
if isinstance(next_item, Exception):
|
|
raise next_item
|
|
return next_item
|
|
|
|
|
|
class _FakeConnectContext:
|
|
def __init__(self, ws: _FakeWebSocket) -> None:
|
|
self._ws = ws
|
|
|
|
async def __aenter__(self) -> _FakeWebSocket:
|
|
return self._ws
|
|
|
|
async def __aexit__(self, exc_type: object, exc: object, tb: object) -> bool:
|
|
return False
|
|
|
|
|
|
def test_parse_book_delta() -> None:
|
|
client = KrakenWsClient(Settings())
|
|
message = {
|
|
"channel": "book",
|
|
"symbol": "BTC/USD",
|
|
"data": [
|
|
{
|
|
"bids": [{"price": "100.0", "qty": "1.2"}],
|
|
"asks": [{"price": "100.5", "qty": "0.8"}],
|
|
"checksum": 123,
|
|
"timestamp": 1717232000000,
|
|
}
|
|
],
|
|
}
|
|
|
|
delta = client.parse_book_delta(message)
|
|
|
|
assert delta is not None
|
|
assert delta.symbol == "BTC/USD"
|
|
assert len(delta.bids) == 1
|
|
assert len(delta.asks) == 1
|
|
assert delta.checksum == 123
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_stream_emits_disconnect_and_reconnect_alerts(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
notifier = _FakeAlertNotifier()
|
|
settings = Settings(_env_file=None, WS_HEARTBEAT_TIMEOUT_SECONDS=1.0)
|
|
client = KrakenWsClient(settings, alert_notifier=notifier)
|
|
|
|
first_payload = orjson.dumps(
|
|
{"channel": "book", "symbol": "BTC/USD", "data": [{"bids": [], "asks": []}]}
|
|
).decode("utf-8")
|
|
second_payload = orjson.dumps(
|
|
{"channel": "book", "symbol": "ETH/USD", "data": [{"bids": [], "asks": []}]}
|
|
).decode("utf-8")
|
|
|
|
sessions = [
|
|
_FakeWebSocket([first_payload, RuntimeError("socket dropped")]),
|
|
_FakeWebSocket([second_payload]),
|
|
]
|
|
|
|
def _fake_connect(*_args: object, **_kwargs: object) -> _FakeConnectContext:
|
|
return _FakeConnectContext(sessions.pop(0))
|
|
|
|
monkeypatch.setattr("arbitrade.exchange.kraken_ws.websockets.connect", _fake_connect)
|
|
|
|
stream = client.connect_stream()
|
|
first = await anext(stream)
|
|
second = await anext(stream)
|
|
await client.stop()
|
|
await stream.aclose()
|
|
|
|
assert first.payload["symbol"] == "BTC/USD"
|
|
assert second.payload["symbol"] == "ETH/USD"
|
|
titles = [event["title"] for event in notifier.events]
|
|
assert "WebSocket disconnected" in titles
|
|
assert "WebSocket reconnected" in titles
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_recv_loop_emits_staleness_alert_on_timeout() -> None:
|
|
notifier = _FakeAlertNotifier()
|
|
settings = Settings(_env_file=None, WS_HEARTBEAT_TIMEOUT_SECONDS=0.001)
|
|
client = KrakenWsClient(settings, alert_notifier=notifier)
|
|
|
|
class _NeverReturnsWebSocket:
|
|
async def recv(self) -> str:
|
|
await asyncio.sleep(1)
|
|
return "{}"
|
|
|
|
with pytest.raises(TimeoutError):
|
|
await anext(client._recv_loop(_NeverReturnsWebSocket()))
|
|
|
|
assert len(notifier.events) == 1
|
|
assert notifier.events[0]["title"] == "WebSocket staleness abort"
|