c17f41aaf8
- Introduced new tables for audit events and runtime state snapshots in the database schema. - Created data classes for AuditRecord and RuntimeStateRecord to represent the new entities. - Implemented AuditRepository and RuntimeStateRepository for inserting and retrieving records. - Enhanced the dashboard to include an audit trail section, displaying recent audit events. - Added tests for the new audit repository and runtime lifecycle functionalities. - Updated settings validation to ensure proper configuration for alerting features. - Integrated alert notifications across various components, including execution sequencer and loss limits.
100 lines
2.8 KiB
Python
100 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from arbitrade.risk.stop_conditions import StopConditionsGuard
|
|
|
|
|
|
class _FakeAlertNotifier:
|
|
def __init__(self) -> None:
|
|
self.events: list[dict[str, Any]] = []
|
|
|
|
async def notify(
|
|
self,
|
|
*,
|
|
category: str,
|
|
severity: str,
|
|
title: str,
|
|
message: str,
|
|
details: dict[str, str] | None = None,
|
|
) -> bool:
|
|
self.events.append(
|
|
{
|
|
"category": category,
|
|
"severity": severity,
|
|
"title": title,
|
|
"message": message,
|
|
"details": details or {},
|
|
}
|
|
)
|
|
return True
|
|
|
|
|
|
def test_stop_conditions_guard_halts_on_source_latency_breach() -> None:
|
|
guard = StopConditionsGuard(max_source_latency_ms=50.0)
|
|
|
|
guard.observe_latency(source_latency_ms=75.0, apply_latency_ms=1.0)
|
|
|
|
assert guard.is_halted
|
|
assert guard.halted_reason == "source_latency_limit_breached"
|
|
|
|
|
|
def test_stop_conditions_guard_halts_on_apply_latency_breach() -> None:
|
|
guard = StopConditionsGuard(max_apply_latency_ms=2.0)
|
|
|
|
guard.observe_latency(source_latency_ms=None, apply_latency_ms=3.5)
|
|
|
|
assert guard.is_halted
|
|
assert guard.halted_reason == "apply_latency_limit_breached"
|
|
|
|
|
|
def test_stop_conditions_guard_halts_on_consecutive_failures() -> None:
|
|
guard = StopConditionsGuard(max_consecutive_failures=2)
|
|
|
|
guard.register_failure()
|
|
assert not guard.is_halted
|
|
|
|
guard.register_failure()
|
|
|
|
assert guard.is_halted
|
|
assert guard.halted_reason == "consecutive_failures_limit_breached"
|
|
|
|
|
|
def test_stop_conditions_guard_resets_failures_after_success() -> None:
|
|
guard = StopConditionsGuard(max_consecutive_failures=3)
|
|
|
|
guard.register_failure()
|
|
guard.register_success()
|
|
guard.register_failure()
|
|
|
|
assert guard.consecutive_failures == 1
|
|
assert not guard.is_halted
|
|
|
|
|
|
def test_stop_conditions_guard_rejects_invalid_configuration() -> None:
|
|
with pytest.raises(ValueError, match="max_source_latency_ms"):
|
|
StopConditionsGuard(max_source_latency_ms=0.0)
|
|
|
|
with pytest.raises(ValueError, match="max_apply_latency_ms"):
|
|
StopConditionsGuard(max_apply_latency_ms=-1.0)
|
|
|
|
with pytest.raises(ValueError, match="max_consecutive_failures"):
|
|
StopConditionsGuard(max_consecutive_failures=0)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_conditions_guard_emits_alert_on_failure_threshold() -> None:
|
|
notifier = _FakeAlertNotifier()
|
|
guard = StopConditionsGuard(max_consecutive_failures=1, alert_notifier=notifier)
|
|
|
|
guard.register_failure()
|
|
await asyncio.sleep(0)
|
|
|
|
assert guard.is_halted
|
|
assert len(notifier.events) == 1
|
|
assert notifier.events[0]["category"] == "threshold"
|
|
assert notifier.events[0]["title"] == "Consecutive failures limit breached"
|