c17f41aaf8
- Introduced new tables for audit events and runtime state snapshots in the database schema. - Created data classes for AuditRecord and RuntimeStateRecord to represent the new entities. - Implemented AuditRepository and RuntimeStateRepository for inserting and retrieving records. - Enhanced the dashboard to include an audit trail section, displaying recent audit events. - Added tests for the new audit repository and runtime lifecycle functionalities. - Updated settings validation to ensure proper configuration for alerting features. - Integrated alert notifications across various components, including execution sequencer and loss limits.
129 lines
3.8 KiB
Python
129 lines
3.8 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from arbitrade.detection.engine import OpportunityEvent
|
|
from arbitrade.execution.sequencer import TriangularExecutionSequencer
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class _FakeAlertNotifier:
|
|
events: list[dict[str, str]] = field(default_factory=list)
|
|
|
|
async def notify(
|
|
self,
|
|
*,
|
|
category: str,
|
|
severity: str,
|
|
title: str,
|
|
message: str,
|
|
details: dict[str, str] | None = None,
|
|
) -> bool:
|
|
self.events.append(
|
|
{
|
|
"category": category,
|
|
"severity": severity,
|
|
"title": title,
|
|
"message": message,
|
|
**(details or {}),
|
|
}
|
|
)
|
|
return True
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class _FakeRestClient:
|
|
fail_at_call: int | None = None
|
|
calls: list[dict[str, Any]] = field(default_factory=list)
|
|
|
|
async def place_market_order(self, *, pair: str, side: str, volume: float) -> dict[str, Any]:
|
|
call_number = len(self.calls) + 1
|
|
if self.fail_at_call is not None and call_number == self.fail_at_call:
|
|
raise RuntimeError("simulated failure")
|
|
|
|
payload = {"pair": pair, "side": side, "volume": volume}
|
|
self.calls.append(payload)
|
|
return {"txid": [f"tx-{call_number}"]}
|
|
|
|
|
|
def _sample_event(cycle: str = "USD->BTC->ETH->USD") -> OpportunityEvent:
|
|
return OpportunityEvent(
|
|
detected_at=datetime.now(UTC),
|
|
cycle=cycle,
|
|
updated_pair="BTC/USD",
|
|
gross_rate=1.02,
|
|
net_rate=1.01,
|
|
gross_pct=2.0,
|
|
net_pct=1.0,
|
|
est_profit=1.0,
|
|
allocated_capital=10.0,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_triangular_sequencer_executes_legs_in_order() -> None:
|
|
client = _FakeRestClient()
|
|
notifier = _FakeAlertNotifier()
|
|
sequencer = TriangularExecutionSequencer(
|
|
client,
|
|
available_pairs=["BTC/USD", "ETH/BTC", "ETH/USD"],
|
|
alert_notifier=notifier,
|
|
)
|
|
|
|
result = await sequencer.execute(_sample_event())
|
|
|
|
assert result.success
|
|
assert result.completed_legs == 3
|
|
assert [call["pair"] for call in client.calls] == ["BTC/USD", "ETH/BTC", "ETH/USD"]
|
|
assert [call["side"] for call in client.calls] == ["buy", "buy", "sell"]
|
|
assert len(notifier.events) == 1
|
|
assert notifier.events[0]["category"] == "trade"
|
|
assert notifier.events[0]["title"] == "Trade execution completed"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_triangular_sequencer_stops_on_failed_leg() -> None:
|
|
client = _FakeRestClient(fail_at_call=2)
|
|
notifier = _FakeAlertNotifier()
|
|
sequencer = TriangularExecutionSequencer(
|
|
client,
|
|
available_pairs=["BTC/USD", "ETH/BTC", "ETH/USD"],
|
|
alert_notifier=notifier,
|
|
)
|
|
|
|
result = await sequencer.execute(_sample_event())
|
|
|
|
assert not result.success
|
|
assert result.completed_legs == 1
|
|
assert result.failure_reason is not None
|
|
assert len(client.calls) == 1
|
|
assert len(notifier.events) == 1
|
|
assert notifier.events[0]["category"] == "error"
|
|
assert notifier.events[0]["title"] == "Trade execution failed"
|
|
|
|
|
|
def test_triangular_sequencer_rejects_non_closed_cycle() -> None:
|
|
client = _FakeRestClient()
|
|
sequencer = TriangularExecutionSequencer(
|
|
client,
|
|
available_pairs=["BTC/USD", "ETH/BTC", "ETH/USD"],
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="closed triangular path"):
|
|
sequencer._build_legs(_sample_event(cycle="USD->BTC->ETH"))
|
|
|
|
|
|
def test_triangular_sequencer_rejects_missing_pair() -> None:
|
|
client = _FakeRestClient()
|
|
sequencer = TriangularExecutionSequencer(
|
|
client,
|
|
available_pairs=["BTC/USD", "ETH/BTC"],
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="No tradable pair"):
|
|
sequencer._build_legs(_sample_event())
|