c17f41aaf8
- Introduced new tables for audit events and runtime state snapshots in the database schema. - Created data classes for AuditRecord and RuntimeStateRecord to represent the new entities. - Implemented AuditRepository and RuntimeStateRepository for inserting and retrieving records. - Enhanced the dashboard to include an audit trail section, displaying recent audit events. - Added tests for the new audit repository and runtime lifecycle functionalities. - Updated settings validation to ensure proper configuration for alerting features. - Integrated alert notifications across various components, including execution sequencer and loss limits.
141 lines
4.2 KiB
Python
141 lines
4.2 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime
|
|
|
|
import pytest
|
|
|
|
from arbitrade.api.app import create_app
|
|
from arbitrade.config.settings import Settings
|
|
from arbitrade.runtime.lifecycle import (
|
|
graceful_shutdown,
|
|
persist_runtime_snapshot,
|
|
restore_runtime_state,
|
|
)
|
|
from arbitrade.storage.repositories import RuntimeStateRecord
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class _FakeWorker:
|
|
stopped: bool = False
|
|
|
|
async def stop(self) -> None:
|
|
self.stopped = True
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class _FakeStartupReconciler:
|
|
called: bool = False
|
|
|
|
async def reconcile_open_trades(self) -> None:
|
|
self.called = True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_persist_runtime_snapshot_writes_record(tmp_path) -> None:
|
|
app = create_app(Settings(_env_file=None, DUCKDB_PATH=tmp_path / "runtime.duckdb"))
|
|
|
|
app.state.dashboard_controls.is_running = True
|
|
app.state.dashboard_controls.kill_switch.deactivate()
|
|
|
|
snapshot = persist_runtime_snapshot(app, note="unit-test")
|
|
|
|
assert snapshot is not None
|
|
assert snapshot.note == "unit-test"
|
|
|
|
latest = app.state.runtime_state_repository.latest()
|
|
assert latest is not None
|
|
assert latest.note == "unit-test"
|
|
assert latest.is_running is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restore_runtime_state_applies_snapshot(tmp_path) -> None:
|
|
app = create_app(Settings(_env_file=None, DUCKDB_PATH=tmp_path / "restore.duckdb"))
|
|
app.state.runtime_state_repository.insert(
|
|
RuntimeStateRecord(
|
|
snapshot_at=datetime.now(UTC),
|
|
is_running=False,
|
|
kill_switch_active=True,
|
|
kill_switch_reason="manual-stop",
|
|
open_trade_count=0,
|
|
last_known_balances={"USD": 100.0},
|
|
note="seed",
|
|
)
|
|
)
|
|
|
|
report = await restore_runtime_state(app)
|
|
|
|
assert report.restored_from_snapshot is True
|
|
assert app.state.dashboard_controls.is_running is False
|
|
assert app.state.dashboard_controls.kill_switch.is_active is True
|
|
assert app.state.dashboard_controls.kill_switch.reason == "manual-stop"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restore_runtime_state_enables_restart_guard_for_open_trades(tmp_path) -> None:
|
|
app = create_app(Settings(_env_file=None, DUCKDB_PATH=tmp_path / "open-trades.duckdb"))
|
|
|
|
with app.state.store.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO trades (
|
|
trade_ref,
|
|
started_at,
|
|
finished_at,
|
|
status,
|
|
realized_pnl,
|
|
estimated_pnl,
|
|
capital_used,
|
|
cycle,
|
|
leg_count
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
"open-trade-1",
|
|
datetime.now(UTC),
|
|
None,
|
|
"open",
|
|
None,
|
|
1.0,
|
|
100.0,
|
|
"USD->BTC->ETH->USD",
|
|
3,
|
|
],
|
|
)
|
|
|
|
report = await restore_runtime_state(app)
|
|
|
|
assert report.open_trades_detected == 1
|
|
assert report.restart_guard_active is True
|
|
assert app.state.dashboard_controls.is_running is False
|
|
assert app.state.dashboard_controls.kill_switch.is_active is True
|
|
assert app.state.dashboard_controls.kill_switch.reason == "recovery_open_trades_detected"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_graceful_shutdown_drains_workers_and_persists_snapshot(tmp_path) -> None:
|
|
app = create_app(Settings(_env_file=None, DUCKDB_PATH=tmp_path / "shutdown.duckdb"))
|
|
worker = _FakeWorker()
|
|
app.state.background_workers = [worker]
|
|
app.state.dashboard_controls.is_running = True
|
|
|
|
await graceful_shutdown(app)
|
|
|
|
assert worker.stopped is True
|
|
assert app.state.dashboard_controls.is_running is False
|
|
latest = app.state.runtime_state_repository.latest()
|
|
assert latest is not None
|
|
assert latest.note == "graceful_shutdown"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restore_runtime_state_calls_startup_reconciler(tmp_path) -> None:
|
|
app = create_app(Settings(_env_file=None, DUCKDB_PATH=tmp_path / "reconciler.duckdb"))
|
|
reconciler = _FakeStartupReconciler()
|
|
app.state.startup_reconciler = reconciler
|
|
|
|
await restore_runtime_state(app)
|
|
|
|
assert reconciler.called is True
|