feat: Add deterministic replay backtesting engine and related documentation
CI / lint-test-build (push) Failing after 14s

This commit is contained in:
2026-06-01 14:57:42 +02:00
parent cc11082ea7
commit 7c86e838fa
6 changed files with 540 additions and 0 deletions
+3
View File
@@ -17,6 +17,7 @@
- Added strict settings validators for auth pairing, Kraken credential pairing, alert severity bounds, and key-scope policy.
- Added synthetic latency profiler scenarios and CLI scripts for baseline generation and regression checks.
- Added latency baseline/threshold artifacts and CI latency guardrail enforcement.
- Added deterministic replay backtesting engine, CLI script, and unit coverage for JSONL event replay.
### Changed
@@ -25,6 +26,7 @@
- WebSocket client now emits system alerts for disconnect/reconnect and heartbeat staleness timeout events.
- Added explicit Kraken API key permission configuration (`KRAKEN_API_KEY_PERMISSIONS`) and docs for least-privilege key usage.
- Optimized dashboard metrics aggregation to use DuckDB SQL aggregates/quantiles instead of Python row scans.
- Added backtesting usage and replay format documentation to README.
### Removed
@@ -54,3 +56,4 @@
- Added lifecycle tests for snapshot persistence, worker draining, recovery restore, and startup reconciliation hook.
- Added unit coverage for security-related settings validation paths.
- Added latency guardrail unit coverage and documented measured metrics aggregation speedup (`1.14x`).
- Added deterministic replay tests that exercise the backtesting path without depending on live services.
+19
View File
@@ -359,6 +359,25 @@ Profile scenarios:
- `execution_spike`
- `reconnect_storm`
## Backtesting
Run a deterministic replay backtest from a JSONL event stream:
```powershell
python scripts/backtest_replay.py --events path\to\replay.jsonl --starting-balances USD=1000.0
```
Replay event format:
```json
{"timestamp":"2026-06-01T12:00:00Z","symbol":"BTC/USD","bids":[[100.0,1.0]],"asks":[[101.0,1.0]]}
```
Notes:
- Events are replayed in timestamp order.
- The replay engine reuses the production detector, pre-trade validation, trade limits, and execution sequencer.
- The simulated execution path applies configurable slippage and execution latency so reports include deterministic trade/miss statistics.
Latency baseline and threshold artifacts:
- `ops/performance/latency_baseline.json`
+84
View File
@@ -0,0 +1,84 @@
from __future__ import annotations
import argparse
import asyncio
from collections.abc import Mapping
from datetime import UTC, datetime
from pathlib import Path
from arbitrade.backtesting.replay import BacktestConfig, BacktestReplayEngine, load_replay_events
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
def _build_graph() -> tuple[dict[str, list[TriangularCycle]], list[str]]:
graph = CurrencyGraph()
graph.add_pair("USD", "BTC", "BTC/USD")
graph.add_pair("BTC", "ETH", "ETH/BTC")
graph.add_pair("ETH", "USD", "ETH/USD")
cycles = graph.triangular_cycles()
return graph.index_cycles_by_pair(cycles), ["BTC/USD", "ETH/BTC", "ETH/USD"]
def _parse_balances(raw: str) -> Mapping[str, float]:
balances: dict[str, float] = {}
for entry in raw.split(","):
if not entry.strip():
continue
asset, value = entry.split("=", 1)
balances[asset.strip().upper()] = float(value)
return balances
def main() -> int:
parser = argparse.ArgumentParser(
description="Run a deterministic replay backtest.")
parser.add_argument("--events", type=Path, required=True)
parser.add_argument("--starting-balances", type=str, default="USD=1000.0")
parser.add_argument("--trade-capital", type=float, default=100.0)
parser.add_argument("--fee-rate", type=float, default=0.0026)
parser.add_argument("--slippage-bps", type=float, default=4.0)
parser.add_argument("--execution-latency-ms", type=float, default=20.0)
args = parser.parse_args()
cycles_by_pair, available_pairs = _build_graph()
events = load_replay_events(args.events)
config = BacktestConfig(
fee_rate=args.fee_rate,
trade_capital=args.trade_capital,
slippage_bps=args.slippage_bps,
execution_latency_ms=args.execution_latency_ms,
)
engine = BacktestReplayEngine(
cycles_by_pair=cycles_by_pair,
available_pairs=available_pairs,
config=config,
started_at=events[0].occurred_at if events else datetime.now(UTC),
)
report = asyncio.run(
engine.run(events, starting_balances=_parse_balances(
args.starting_balances))
)
print("Backtest report:")
print(f"- processed_events: {report.processed_events}")
print(f"- opportunities_seen: {report.opportunities_seen}")
print(f"- trades_executed: {report.trades_executed}")
print(
f"- win_rate: {report.win_rate if report.win_rate is not None else 'n/a'}")
print(
f"- fill_rate: {report.fill_rate if report.fill_rate is not None else 'n/a'}")
print(f"- realized_pnl_usd: {report.realized_pnl_usd:.4f}")
print(f"- max_drawdown_usd: {report.max_drawdown_usd:.4f}")
print(f"- miss_reasons: {dict(report.miss_reasons)}")
print(
"- execution_latency_ms: "
f"p50={report.execution_latency_p50_ms or 0.0:.4f}, "
f"p95={report.execution_latency_p95_ms or 0.0:.4f}, "
f"p99={report.execution_latency_p99_ms or 0.0:.4f}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+17
View File
@@ -0,0 +1,17 @@
from arbitrade.backtesting.replay import (
BacktestConfig,
BacktestReplayEngine,
BacktestReport,
ReplayBookEvent,
ReplayClock,
load_replay_events,
)
__all__ = [
"ReplayClock",
"ReplayBookEvent",
"BacktestConfig",
"BacktestReport",
"BacktestReplayEngine",
"load_replay_events",
]
+332
View File
@@ -0,0 +1,332 @@
from __future__ import annotations
import asyncio
from collections import Counter
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import orjson
from arbitrade.detection.engine import IncrementalCycleDetector, OpportunityEvent
from arbitrade.detection.graph import TriangularCycle
from arbitrade.exchange.models import BookLevel
from arbitrade.execution.sequencer import TriangularExecutionSequencer
from arbitrade.market_data.order_book import OrderBook
from arbitrade.risk.pre_trade import PreTradeValidator
from arbitrade.risk.trade_limits import TradeLimitsGuard
@dataclass(slots=True)
class ReplayClock:
_current: datetime
@classmethod
def at(cls, started_at: datetime) -> ReplayClock:
return cls(_current=started_at.astimezone(UTC))
@property
def now(self) -> datetime:
return self._current
def advance_to(self, next_time: datetime) -> None:
normalized = next_time.astimezone(UTC)
if normalized < self._current:
raise ValueError("Replay events must be monotonic by timestamp")
self._current = normalized
def advance_ms(self, milliseconds: float) -> None:
if milliseconds < 0.0:
raise ValueError("milliseconds must be >= 0")
self._current = self._current.fromtimestamp(
self._current.timestamp() + (milliseconds / 1000.0),
tz=UTC,
)
@dataclass(frozen=True, slots=True)
class ReplayBookEvent:
occurred_at: datetime
symbol: str
bids: tuple[BookLevel, ...]
asks: tuple[BookLevel, ...]
@dataclass(frozen=True, slots=True)
class BacktestConfig:
fee_rate: float = 0.0026
min_profit_threshold: float = 0.0005
trade_capital: float = 100.0
quote_asset: str = "USD"
slippage_bps: float = 4.0
execution_latency_ms: float = 20.0
max_depth_levels: int = 10
max_concurrent_trades: int = 1
min_order_size_by_pair: Mapping[str, float] | None = None
@dataclass(frozen=True, slots=True)
class BacktestReport:
started_at: datetime
finished_at: datetime
processed_events: int
opportunities_seen: int
trades_executed: int
win_rate: float | None
fill_rate: float | None
realized_pnl_usd: float
max_drawdown_usd: float
miss_reasons: Mapping[str, int]
execution_latency_p50_ms: float | None
execution_latency_p95_ms: float | None
execution_latency_p99_ms: float | None
class _SimulatedRestClient:
def __init__(
self, clock: ReplayClock, *, slippage_bps: float, execution_latency_ms: float
) -> None:
self._clock = clock
self._slippage_bps = slippage_bps
self._execution_latency_ms = execution_latency_ms
self._sequence = 0
self._last_fill_ratio = 1.0
self._last_trade_latency_ms = execution_latency_ms
@property
def last_fill_ratio(self) -> float:
return self._last_fill_ratio
@property
def last_trade_latency_ms(self) -> float:
return self._last_trade_latency_ms
async def place_market_order(self, *, pair: str, side: str, volume: float) -> dict[str, Any]:
self._sequence += 1
self._clock.advance_ms(self._execution_latency_ms)
await asyncio.sleep(0)
normalized_fill = max(0.85, 1.0 - (self._slippage_bps / 10000.0) * 8.0)
self._last_fill_ratio = normalized_fill
self._last_trade_latency_ms = self._execution_latency_ms
return {
"txid": [f"sim-{self._sequence}"],
"status": "closed",
"pair": pair,
"side": side,
"requested_volume": volume,
"filled_volume": volume * normalized_fill,
"simulated_at": self._clock.now.isoformat(),
}
def _percentile(values: Sequence[float], percentile: float) -> float | None:
if not values:
return None
ordered = sorted(values)
if percentile <= 0.0:
return ordered[0]
if percentile >= 100.0:
return ordered[-1]
rank = (len(ordered) - 1) * (percentile / 100.0)
lower = int(rank)
upper = min(lower + 1, len(ordered) - 1)
weight = rank - lower
return ordered[lower] * (1.0 - weight) + ordered[upper] * weight
def _parse_book_levels(raw_levels: Any) -> tuple[BookLevel, ...]:
if not isinstance(raw_levels, list):
raise ValueError("Book levels must be a list")
levels: list[BookLevel] = []
for raw_level in raw_levels:
if (
not isinstance(raw_level, list)
or len(raw_level) != 2
or not isinstance(raw_level[0], int | float)
or not isinstance(raw_level[1], int | float)
):
raise ValueError("Each level must be [price, volume]")
levels.append(BookLevel(price=float(
raw_level[0]), volume=float(raw_level[1])))
return tuple(levels)
def load_replay_events(path: Path) -> list[ReplayBookEvent]:
events: list[ReplayBookEvent] = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
parsed = orjson.loads(line)
if not isinstance(parsed, dict):
raise ValueError("Each JSONL row must be an object")
timestamp_raw = parsed.get("timestamp")
symbol_raw = parsed.get("symbol")
if not isinstance(timestamp_raw, str) or not isinstance(symbol_raw, str):
raise ValueError("Each event must include timestamp and symbol")
occurred_at = datetime.fromisoformat(
timestamp_raw.replace("Z", "+00:00")).astimezone(UTC)
events.append(
ReplayBookEvent(
occurred_at=occurred_at,
symbol=symbol_raw,
bids=_parse_book_levels(parsed.get("bids")),
asks=_parse_book_levels(parsed.get("asks")),
)
)
return sorted(events, key=lambda event: event.occurred_at)
class BacktestReplayEngine:
def __init__(
self,
*,
cycles_by_pair: Mapping[str, list[TriangularCycle]],
available_pairs: Sequence[str],
config: BacktestConfig,
started_at: datetime,
) -> None:
self._config = config
self._clock = ReplayClock.at(started_at)
self._books: dict[str, OrderBook] = {}
self._detector = IncrementalCycleDetector(
cycles_by_pair,
fee_rate=config.fee_rate,
max_depth_levels=config.max_depth_levels,
min_profit_threshold=config.min_profit_threshold,
min_order_size_by_pair=config.min_order_size_by_pair,
)
self._pre_trade = PreTradeValidator()
self._trade_limits = TradeLimitsGuard(
max_concurrent_trades=config.max_concurrent_trades)
self._simulated_rest = _SimulatedRestClient(
self._clock,
slippage_bps=config.slippage_bps,
execution_latency_ms=config.execution_latency_ms,
)
self._sequencer = TriangularExecutionSequencer(
self._simulated_rest,
available_pairs=available_pairs,
)
@staticmethod
def _exposure_for_event(event: OpportunityEvent) -> dict[str, float]:
currencies = [part for part in event.cycle.split("->") if part]
if len(currencies) < 2:
return {}
origin = currencies[0]
return {
currency: event.allocated_capital for currency in currencies[1:] if currency != origin
}
async def run(
self,
events: Sequence[ReplayBookEvent],
*,
starting_balances: Mapping[str, float],
) -> BacktestReport:
miss_reasons: Counter[str] = Counter()
processed_events = 0
opportunities_seen = 0
trades_executed = 0
realized_pnl = 0.0
equity = float(starting_balances.get(
self._config.quote_asset.upper(), 0.0))
peak_equity = equity
max_drawdown = 0.0
fill_samples: list[float] = []
realized_samples: list[float] = []
execution_latencies: list[float] = []
for event in events:
self._clock.advance_to(event.occurred_at)
processed_events += 1
book = self._books.setdefault(event.symbol.upper(), OrderBook())
book.apply_bids(event.bids)
book.apply_asks(event.asks)
opportunities = self._detector.opportunities_for_updated_pair(
event.symbol,
self._books,
base_capital=self._config.trade_capital,
)
opportunities_seen += len(opportunities)
for opportunity in opportunities:
required_by_asset = {
self._config.quote_asset.upper(): opportunity.allocated_capital
}
if not self._pre_trade.validate(
balances_by_asset=starting_balances,
required_by_asset=required_by_asset,
):
miss_reasons["insufficient_balance"] += 1
continue
exposure = self._exposure_for_event(opportunity)
if not self._trade_limits.is_trade_allowed(exposure):
miss_reasons["trade_limit"] += 1
continue
self._trade_limits.open_trade(exposure)
result = await self._sequencer.execute(opportunity)
self._trade_limits.close_trade(exposure)
execution_latencies.append(
self._simulated_rest.last_trade_latency_ms)
fill_samples.append(self._simulated_rest.last_fill_ratio)
if not result.success:
miss_reasons["execution_failed"] += 1
continue
slippage_cost = (
opportunity.allocated_capital
* (self._config.slippage_bps / 10000.0)
* max(result.completed_legs, 1)
)
realized_trade_pnl = opportunity.est_profit - slippage_cost
realized_samples.append(realized_trade_pnl)
realized_pnl += realized_trade_pnl
equity += realized_trade_pnl
peak_equity = max(peak_equity, equity)
max_drawdown = max(max_drawdown, peak_equity - equity)
trades_executed += 1
wins = sum(1 for pnl in realized_samples if pnl > 0.0)
win_rate = (wins / len(realized_samples)) if realized_samples else None
fill_rate = (sum(fill_samples) / len(fill_samples)
) if fill_samples else None
return BacktestReport(
started_at=events[0].occurred_at if events else self._clock.now,
finished_at=events[-1].occurred_at if events else self._clock.now,
processed_events=processed_events,
opportunities_seen=opportunities_seen,
trades_executed=trades_executed,
win_rate=win_rate,
fill_rate=fill_rate,
realized_pnl_usd=realized_pnl,
max_drawdown_usd=max_drawdown,
miss_reasons=dict(miss_reasons),
execution_latency_p50_ms=_percentile(execution_latencies, 50.0),
execution_latency_p95_ms=_percentile(execution_latencies, 95.0),
execution_latency_p99_ms=_percentile(execution_latencies, 99.0),
)
+85
View File
@@ -0,0 +1,85 @@
from __future__ import annotations
import asyncio
from datetime import UTC, datetime, timedelta
from pathlib import Path
from arbitrade.backtesting.replay import (
BacktestConfig,
BacktestReplayEngine,
ReplayBookEvent,
load_replay_events,
)
from arbitrade.detection.graph import CurrencyGraph
from arbitrade.exchange.models import BookLevel
def _build_cycles() -> tuple[dict[str, list], list[str]]:
graph = CurrencyGraph()
graph.add_pair("USD", "BTC", "BTC/USD")
graph.add_pair("BTC", "ETH", "ETH/BTC")
graph.add_pair("ETH", "USD", "ETH/USD")
cycles = graph.triangular_cycles()
return graph.index_cycles_by_pair(cycles), ["BTC/USD", "ETH/BTC", "ETH/USD"]
def test_load_replay_events_orders_jsonl_by_timestamp(tmp_path: Path) -> None:
path = tmp_path / "replay.jsonl"
path.write_text(
"\n".join(
[
'{"timestamp":"2026-06-01T12:00:02Z","symbol":"ETH/USD","bids":[[100.0,1.0]],"asks":[[101.0,1.0]]}',
'{"timestamp":"2026-06-01T12:00:01Z","symbol":"BTC/USD","bids":[[10.0,1.0]],"asks":[[11.0,1.0]]}',
]
),
encoding="utf-8",
)
events = load_replay_events(path)
assert [event.symbol for event in events] == ["BTC/USD", "ETH/USD"]
def test_backtest_replay_engine_runs_deterministically() -> None:
cycles_by_pair, available_pairs = _build_cycles()
started_at = datetime(2026, 6, 1, 12, 0, tzinfo=UTC)
replay_events = [
ReplayBookEvent(
occurred_at=started_at,
symbol="BTC/USD",
bids=(BookLevel(price=99.5, volume=10.0),),
asks=(BookLevel(price=100.0, volume=10.0),),
),
ReplayBookEvent(
occurred_at=started_at + timedelta(seconds=1),
symbol="ETH/BTC",
bids=(BookLevel(price=0.051, volume=10.0),),
asks=(BookLevel(price=0.050, volume=10.0),),
),
ReplayBookEvent(
occurred_at=started_at + timedelta(seconds=2),
symbol="ETH/USD",
bids=(BookLevel(price=110.0, volume=10.0),),
asks=(BookLevel(price=110.5, volume=10.0),),
),
]
engine = BacktestReplayEngine(
cycles_by_pair=cycles_by_pair,
available_pairs=available_pairs,
config=BacktestConfig(trade_capital=100.0,
slippage_bps=5.0, execution_latency_ms=10.0),
started_at=started_at,
)
report = asyncio.run(engine.run(
replay_events, starting_balances={"USD": 1000.0}))
assert report.processed_events == 3
assert report.opportunities_seen >= 0
assert report.trades_executed >= 0
assert isinstance(report.realized_pnl_usd, float)
assert report.max_drawdown_usd >= 0.0
assert report.execution_latency_p50_ms is None
assert report.execution_latency_p95_ms is None
assert report.execution_latency_p99_ms is None