Files
zwitschi c17f41aaf8 feat: add audit events and runtime state snapshots to database
- Introduced new tables for audit events and runtime state snapshots in the database schema.
- Created data classes for AuditRecord and RuntimeStateRecord to represent the new entities.
- Implemented AuditRepository and RuntimeStateRepository for inserting and retrieving records.
- Enhanced the dashboard to include an audit trail section, displaying recent audit events.
- Added tests for the new audit repository and runtime lifecycle functionalities.
- Updated settings validation to ensure proper configuration for alerting features.
- Integrated alert notifications across various components, including execution sequencer and loss limits.
2026-06-01 14:18:12 +02:00

142 lines
4.0 KiB
Python

from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from typing import Any
import orjson
import pytest
from arbitrade.config.settings import Settings
from arbitrade.exchange.kraken_ws import KrakenWsClient
@dataclass(slots=True)
class _FakeAlertNotifier:
events: list[dict[str, str]] = field(default_factory=list)
async def notify(
self,
*,
category: str,
severity: str,
title: str,
message: str,
details: dict[str, str] | None = None,
) -> bool:
self.events.append(
{
"category": category,
"severity": severity,
"title": title,
"message": message,
**(details or {}),
}
)
return True
class _FakeWebSocket:
def __init__(self, messages: list[Any]) -> None:
self._messages = messages
async def recv(self) -> str:
if not self._messages:
await asyncio.sleep(0)
return orjson.dumps({"channel": "heartbeat"}).decode("utf-8")
next_item = self._messages.pop(0)
if isinstance(next_item, Exception):
raise next_item
return next_item
class _FakeConnectContext:
def __init__(self, ws: _FakeWebSocket) -> None:
self._ws = ws
async def __aenter__(self) -> _FakeWebSocket:
return self._ws
async def __aexit__(self, exc_type: object, exc: object, tb: object) -> bool:
return False
def test_parse_book_delta() -> None:
client = KrakenWsClient(Settings())
message = {
"channel": "book",
"symbol": "BTC/USD",
"data": [
{
"bids": [{"price": "100.0", "qty": "1.2"}],
"asks": [{"price": "100.5", "qty": "0.8"}],
"checksum": 123,
"timestamp": 1717232000000,
}
],
}
delta = client.parse_book_delta(message)
assert delta is not None
assert delta.symbol == "BTC/USD"
assert len(delta.bids) == 1
assert len(delta.asks) == 1
assert delta.checksum == 123
@pytest.mark.asyncio
async def test_connect_stream_emits_disconnect_and_reconnect_alerts(
monkeypatch: pytest.MonkeyPatch,
) -> None:
notifier = _FakeAlertNotifier()
settings = Settings(_env_file=None, WS_HEARTBEAT_TIMEOUT_SECONDS=1.0)
client = KrakenWsClient(settings, alert_notifier=notifier)
first_payload = orjson.dumps(
{"channel": "book", "symbol": "BTC/USD", "data": [{"bids": [], "asks": []}]}
).decode("utf-8")
second_payload = orjson.dumps(
{"channel": "book", "symbol": "ETH/USD", "data": [{"bids": [], "asks": []}]}
).decode("utf-8")
sessions = [
_FakeWebSocket([first_payload, RuntimeError("socket dropped")]),
_FakeWebSocket([second_payload]),
]
def _fake_connect(*_args: object, **_kwargs: object) -> _FakeConnectContext:
return _FakeConnectContext(sessions.pop(0))
monkeypatch.setattr("arbitrade.exchange.kraken_ws.websockets.connect", _fake_connect)
stream = client.connect_stream()
first = await anext(stream)
second = await anext(stream)
await client.stop()
await stream.aclose()
assert first.payload["symbol"] == "BTC/USD"
assert second.payload["symbol"] == "ETH/USD"
titles = [event["title"] for event in notifier.events]
assert "WebSocket disconnected" in titles
assert "WebSocket reconnected" in titles
@pytest.mark.asyncio
async def test_recv_loop_emits_staleness_alert_on_timeout() -> None:
notifier = _FakeAlertNotifier()
settings = Settings(_env_file=None, WS_HEARTBEAT_TIMEOUT_SECONDS=0.001)
client = KrakenWsClient(settings, alert_notifier=notifier)
class _NeverReturnsWebSocket:
async def recv(self) -> str:
await asyncio.sleep(1)
return "{}"
with pytest.raises(TimeoutError):
await anext(client._recv_loop(_NeverReturnsWebSocket()))
assert len(notifier.events) == 1
assert notifier.events[0]["title"] == "WebSocket staleness abort"