From 7c86e838fab157ebfa2ef56e2e489947736852c3 Mon Sep 17 00:00:00 2001 From: zwitschi Date: Mon, 1 Jun 2026 14:57:42 +0200 Subject: [PATCH] feat: Add deterministic replay backtesting engine and related documentation --- CHANGELOG.md | 3 + README.md | 19 ++ scripts/backtest_replay.py | 84 +++++++ src/arbitrade/backtesting/__init__.py | 17 ++ src/arbitrade/backtesting/replay.py | 332 ++++++++++++++++++++++++++ tests/unit/test_backtesting_replay.py | 85 +++++++ 6 files changed, 540 insertions(+) create mode 100644 scripts/backtest_replay.py create mode 100644 src/arbitrade/backtesting/__init__.py create mode 100644 src/arbitrade/backtesting/replay.py create mode 100644 tests/unit/test_backtesting_replay.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ccb7d0..81bdfa5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - Added strict settings validators for auth pairing, Kraken credential pairing, alert severity bounds, and key-scope policy. - Added synthetic latency profiler scenarios and CLI scripts for baseline generation and regression checks. - Added latency baseline/threshold artifacts and CI latency guardrail enforcement. +- Added deterministic replay backtesting engine, CLI script, and unit coverage for JSONL event replay. ### Changed @@ -25,6 +26,7 @@ - WebSocket client now emits system alerts for disconnect/reconnect and heartbeat staleness timeout events. - Added explicit Kraken API key permission configuration (`KRAKEN_API_KEY_PERMISSIONS`) and docs for least-privilege key usage. - Optimized dashboard metrics aggregation to use DuckDB SQL aggregates/quantiles instead of Python row scans. +- Added backtesting usage and replay format documentation to README. ### Removed @@ -54,3 +56,4 @@ - Added lifecycle tests for snapshot persistence, worker draining, recovery restore, and startup reconciliation hook. - Added unit coverage for security-related settings validation paths. - Added latency guardrail unit coverage and documented measured metrics aggregation speedup (`1.14x`). +- Added deterministic replay tests that exercise the backtesting path without depending on live services. diff --git a/README.md b/README.md index 1cb61bb..0fbfaec 100644 --- a/README.md +++ b/README.md @@ -359,6 +359,25 @@ Profile scenarios: - `execution_spike` - `reconnect_storm` +## Backtesting + +Run a deterministic replay backtest from a JSONL event stream: + +```powershell +python scripts/backtest_replay.py --events path\to\replay.jsonl --starting-balances USD=1000.0 +``` + +Replay event format: + +```json +{"timestamp":"2026-06-01T12:00:00Z","symbol":"BTC/USD","bids":[[100.0,1.0]],"asks":[[101.0,1.0]]} +``` + +Notes: + +- Events are replayed in timestamp order. +- The replay engine reuses the production detector, pre-trade validation, trade limits, and execution sequencer. +- The simulated execution path applies configurable slippage and execution latency so reports include deterministic trade/miss statistics. Latency baseline and threshold artifacts: - `ops/performance/latency_baseline.json` diff --git a/scripts/backtest_replay.py b/scripts/backtest_replay.py new file mode 100644 index 0000000..f5ce5a9 --- /dev/null +++ b/scripts/backtest_replay.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import argparse +import asyncio +from collections.abc import Mapping +from datetime import UTC, datetime +from pathlib import Path + +from arbitrade.backtesting.replay import BacktestConfig, BacktestReplayEngine, load_replay_events +from arbitrade.detection.graph import CurrencyGraph, TriangularCycle + + +def _build_graph() -> tuple[dict[str, list[TriangularCycle]], list[str]]: + graph = CurrencyGraph() + graph.add_pair("USD", "BTC", "BTC/USD") + graph.add_pair("BTC", "ETH", "ETH/BTC") + graph.add_pair("ETH", "USD", "ETH/USD") + cycles = graph.triangular_cycles() + return graph.index_cycles_by_pair(cycles), ["BTC/USD", "ETH/BTC", "ETH/USD"] + + +def _parse_balances(raw: str) -> Mapping[str, float]: + balances: dict[str, float] = {} + for entry in raw.split(","): + if not entry.strip(): + continue + asset, value = entry.split("=", 1) + balances[asset.strip().upper()] = float(value) + return balances + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Run a deterministic replay backtest.") + parser.add_argument("--events", type=Path, required=True) + parser.add_argument("--starting-balances", type=str, default="USD=1000.0") + parser.add_argument("--trade-capital", type=float, default=100.0) + parser.add_argument("--fee-rate", type=float, default=0.0026) + parser.add_argument("--slippage-bps", type=float, default=4.0) + parser.add_argument("--execution-latency-ms", type=float, default=20.0) + args = parser.parse_args() + + cycles_by_pair, available_pairs = _build_graph() + events = load_replay_events(args.events) + config = BacktestConfig( + fee_rate=args.fee_rate, + trade_capital=args.trade_capital, + slippage_bps=args.slippage_bps, + execution_latency_ms=args.execution_latency_ms, + ) + + engine = BacktestReplayEngine( + cycles_by_pair=cycles_by_pair, + available_pairs=available_pairs, + config=config, + started_at=events[0].occurred_at if events else datetime.now(UTC), + ) + report = asyncio.run( + engine.run(events, starting_balances=_parse_balances( + args.starting_balances)) + ) + + print("Backtest report:") + print(f"- processed_events: {report.processed_events}") + print(f"- opportunities_seen: {report.opportunities_seen}") + print(f"- trades_executed: {report.trades_executed}") + print( + f"- win_rate: {report.win_rate if report.win_rate is not None else 'n/a'}") + print( + f"- fill_rate: {report.fill_rate if report.fill_rate is not None else 'n/a'}") + print(f"- realized_pnl_usd: {report.realized_pnl_usd:.4f}") + print(f"- max_drawdown_usd: {report.max_drawdown_usd:.4f}") + print(f"- miss_reasons: {dict(report.miss_reasons)}") + print( + "- execution_latency_ms: " + f"p50={report.execution_latency_p50_ms or 0.0:.4f}, " + f"p95={report.execution_latency_p95_ms or 0.0:.4f}, " + f"p99={report.execution_latency_p99_ms or 0.0:.4f}" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/arbitrade/backtesting/__init__.py b/src/arbitrade/backtesting/__init__.py new file mode 100644 index 0000000..a657f6c --- /dev/null +++ b/src/arbitrade/backtesting/__init__.py @@ -0,0 +1,17 @@ +from arbitrade.backtesting.replay import ( + BacktestConfig, + BacktestReplayEngine, + BacktestReport, + ReplayBookEvent, + ReplayClock, + load_replay_events, +) + +__all__ = [ + "ReplayClock", + "ReplayBookEvent", + "BacktestConfig", + "BacktestReport", + "BacktestReplayEngine", + "load_replay_events", +] diff --git a/src/arbitrade/backtesting/replay.py b/src/arbitrade/backtesting/replay.py new file mode 100644 index 0000000..e207134 --- /dev/null +++ b/src/arbitrade/backtesting/replay.py @@ -0,0 +1,332 @@ +from __future__ import annotations + +import asyncio +from collections import Counter +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import orjson + +from arbitrade.detection.engine import IncrementalCycleDetector, OpportunityEvent +from arbitrade.detection.graph import TriangularCycle +from arbitrade.exchange.models import BookLevel +from arbitrade.execution.sequencer import TriangularExecutionSequencer +from arbitrade.market_data.order_book import OrderBook +from arbitrade.risk.pre_trade import PreTradeValidator +from arbitrade.risk.trade_limits import TradeLimitsGuard + + +@dataclass(slots=True) +class ReplayClock: + _current: datetime + + @classmethod + def at(cls, started_at: datetime) -> ReplayClock: + return cls(_current=started_at.astimezone(UTC)) + + @property + def now(self) -> datetime: + return self._current + + def advance_to(self, next_time: datetime) -> None: + normalized = next_time.astimezone(UTC) + if normalized < self._current: + raise ValueError("Replay events must be monotonic by timestamp") + self._current = normalized + + def advance_ms(self, milliseconds: float) -> None: + if milliseconds < 0.0: + raise ValueError("milliseconds must be >= 0") + self._current = self._current.fromtimestamp( + self._current.timestamp() + (milliseconds / 1000.0), + tz=UTC, + ) + + +@dataclass(frozen=True, slots=True) +class ReplayBookEvent: + occurred_at: datetime + symbol: str + bids: tuple[BookLevel, ...] + asks: tuple[BookLevel, ...] + + +@dataclass(frozen=True, slots=True) +class BacktestConfig: + fee_rate: float = 0.0026 + min_profit_threshold: float = 0.0005 + trade_capital: float = 100.0 + quote_asset: str = "USD" + slippage_bps: float = 4.0 + execution_latency_ms: float = 20.0 + max_depth_levels: int = 10 + max_concurrent_trades: int = 1 + min_order_size_by_pair: Mapping[str, float] | None = None + + +@dataclass(frozen=True, slots=True) +class BacktestReport: + started_at: datetime + finished_at: datetime + processed_events: int + opportunities_seen: int + trades_executed: int + win_rate: float | None + fill_rate: float | None + realized_pnl_usd: float + max_drawdown_usd: float + miss_reasons: Mapping[str, int] + execution_latency_p50_ms: float | None + execution_latency_p95_ms: float | None + execution_latency_p99_ms: float | None + + +class _SimulatedRestClient: + def __init__( + self, clock: ReplayClock, *, slippage_bps: float, execution_latency_ms: float + ) -> None: + self._clock = clock + self._slippage_bps = slippage_bps + self._execution_latency_ms = execution_latency_ms + self._sequence = 0 + self._last_fill_ratio = 1.0 + self._last_trade_latency_ms = execution_latency_ms + + @property + def last_fill_ratio(self) -> float: + return self._last_fill_ratio + + @property + def last_trade_latency_ms(self) -> float: + return self._last_trade_latency_ms + + async def place_market_order(self, *, pair: str, side: str, volume: float) -> dict[str, Any]: + self._sequence += 1 + self._clock.advance_ms(self._execution_latency_ms) + await asyncio.sleep(0) + + normalized_fill = max(0.85, 1.0 - (self._slippage_bps / 10000.0) * 8.0) + self._last_fill_ratio = normalized_fill + self._last_trade_latency_ms = self._execution_latency_ms + + return { + "txid": [f"sim-{self._sequence}"], + "status": "closed", + "pair": pair, + "side": side, + "requested_volume": volume, + "filled_volume": volume * normalized_fill, + "simulated_at": self._clock.now.isoformat(), + } + + +def _percentile(values: Sequence[float], percentile: float) -> float | None: + if not values: + return None + + ordered = sorted(values) + if percentile <= 0.0: + return ordered[0] + if percentile >= 100.0: + return ordered[-1] + + rank = (len(ordered) - 1) * (percentile / 100.0) + lower = int(rank) + upper = min(lower + 1, len(ordered) - 1) + weight = rank - lower + return ordered[lower] * (1.0 - weight) + ordered[upper] * weight + + +def _parse_book_levels(raw_levels: Any) -> tuple[BookLevel, ...]: + if not isinstance(raw_levels, list): + raise ValueError("Book levels must be a list") + + levels: list[BookLevel] = [] + for raw_level in raw_levels: + if ( + not isinstance(raw_level, list) + or len(raw_level) != 2 + or not isinstance(raw_level[0], int | float) + or not isinstance(raw_level[1], int | float) + ): + raise ValueError("Each level must be [price, volume]") + levels.append(BookLevel(price=float( + raw_level[0]), volume=float(raw_level[1]))) + + return tuple(levels) + + +def load_replay_events(path: Path) -> list[ReplayBookEvent]: + events: list[ReplayBookEvent] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + parsed = orjson.loads(line) + if not isinstance(parsed, dict): + raise ValueError("Each JSONL row must be an object") + + timestamp_raw = parsed.get("timestamp") + symbol_raw = parsed.get("symbol") + if not isinstance(timestamp_raw, str) or not isinstance(symbol_raw, str): + raise ValueError("Each event must include timestamp and symbol") + + occurred_at = datetime.fromisoformat( + timestamp_raw.replace("Z", "+00:00")).astimezone(UTC) + events.append( + ReplayBookEvent( + occurred_at=occurred_at, + symbol=symbol_raw, + bids=_parse_book_levels(parsed.get("bids")), + asks=_parse_book_levels(parsed.get("asks")), + ) + ) + + return sorted(events, key=lambda event: event.occurred_at) + + +class BacktestReplayEngine: + def __init__( + self, + *, + cycles_by_pair: Mapping[str, list[TriangularCycle]], + available_pairs: Sequence[str], + config: BacktestConfig, + started_at: datetime, + ) -> None: + self._config = config + self._clock = ReplayClock.at(started_at) + self._books: dict[str, OrderBook] = {} + + self._detector = IncrementalCycleDetector( + cycles_by_pair, + fee_rate=config.fee_rate, + max_depth_levels=config.max_depth_levels, + min_profit_threshold=config.min_profit_threshold, + min_order_size_by_pair=config.min_order_size_by_pair, + ) + self._pre_trade = PreTradeValidator() + self._trade_limits = TradeLimitsGuard( + max_concurrent_trades=config.max_concurrent_trades) + self._simulated_rest = _SimulatedRestClient( + self._clock, + slippage_bps=config.slippage_bps, + execution_latency_ms=config.execution_latency_ms, + ) + self._sequencer = TriangularExecutionSequencer( + self._simulated_rest, + available_pairs=available_pairs, + ) + + @staticmethod + def _exposure_for_event(event: OpportunityEvent) -> dict[str, float]: + currencies = [part for part in event.cycle.split("->") if part] + if len(currencies) < 2: + return {} + + origin = currencies[0] + return { + currency: event.allocated_capital for currency in currencies[1:] if currency != origin + } + + async def run( + self, + events: Sequence[ReplayBookEvent], + *, + starting_balances: Mapping[str, float], + ) -> BacktestReport: + miss_reasons: Counter[str] = Counter() + + processed_events = 0 + opportunities_seen = 0 + trades_executed = 0 + + realized_pnl = 0.0 + equity = float(starting_balances.get( + self._config.quote_asset.upper(), 0.0)) + peak_equity = equity + max_drawdown = 0.0 + + fill_samples: list[float] = [] + realized_samples: list[float] = [] + execution_latencies: list[float] = [] + + for event in events: + self._clock.advance_to(event.occurred_at) + processed_events += 1 + + book = self._books.setdefault(event.symbol.upper(), OrderBook()) + book.apply_bids(event.bids) + book.apply_asks(event.asks) + + opportunities = self._detector.opportunities_for_updated_pair( + event.symbol, + self._books, + base_capital=self._config.trade_capital, + ) + opportunities_seen += len(opportunities) + + for opportunity in opportunities: + required_by_asset = { + self._config.quote_asset.upper(): opportunity.allocated_capital + } + if not self._pre_trade.validate( + balances_by_asset=starting_balances, + required_by_asset=required_by_asset, + ): + miss_reasons["insufficient_balance"] += 1 + continue + + exposure = self._exposure_for_event(opportunity) + if not self._trade_limits.is_trade_allowed(exposure): + miss_reasons["trade_limit"] += 1 + continue + + self._trade_limits.open_trade(exposure) + result = await self._sequencer.execute(opportunity) + self._trade_limits.close_trade(exposure) + + execution_latencies.append( + self._simulated_rest.last_trade_latency_ms) + fill_samples.append(self._simulated_rest.last_fill_ratio) + + if not result.success: + miss_reasons["execution_failed"] += 1 + continue + + slippage_cost = ( + opportunity.allocated_capital + * (self._config.slippage_bps / 10000.0) + * max(result.completed_legs, 1) + ) + realized_trade_pnl = opportunity.est_profit - slippage_cost + realized_samples.append(realized_trade_pnl) + + realized_pnl += realized_trade_pnl + equity += realized_trade_pnl + peak_equity = max(peak_equity, equity) + max_drawdown = max(max_drawdown, peak_equity - equity) + trades_executed += 1 + + wins = sum(1 for pnl in realized_samples if pnl > 0.0) + win_rate = (wins / len(realized_samples)) if realized_samples else None + fill_rate = (sum(fill_samples) / len(fill_samples) + ) if fill_samples else None + + return BacktestReport( + started_at=events[0].occurred_at if events else self._clock.now, + finished_at=events[-1].occurred_at if events else self._clock.now, + processed_events=processed_events, + opportunities_seen=opportunities_seen, + trades_executed=trades_executed, + win_rate=win_rate, + fill_rate=fill_rate, + realized_pnl_usd=realized_pnl, + max_drawdown_usd=max_drawdown, + miss_reasons=dict(miss_reasons), + execution_latency_p50_ms=_percentile(execution_latencies, 50.0), + execution_latency_p95_ms=_percentile(execution_latencies, 95.0), + execution_latency_p99_ms=_percentile(execution_latencies, 99.0), + ) diff --git a/tests/unit/test_backtesting_replay.py b/tests/unit/test_backtesting_replay.py new file mode 100644 index 0000000..62992cd --- /dev/null +++ b/tests/unit/test_backtesting_replay.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime, timedelta +from pathlib import Path + +from arbitrade.backtesting.replay import ( + BacktestConfig, + BacktestReplayEngine, + ReplayBookEvent, + load_replay_events, +) +from arbitrade.detection.graph import CurrencyGraph +from arbitrade.exchange.models import BookLevel + + +def _build_cycles() -> tuple[dict[str, list], list[str]]: + graph = CurrencyGraph() + graph.add_pair("USD", "BTC", "BTC/USD") + graph.add_pair("BTC", "ETH", "ETH/BTC") + graph.add_pair("ETH", "USD", "ETH/USD") + cycles = graph.triangular_cycles() + return graph.index_cycles_by_pair(cycles), ["BTC/USD", "ETH/BTC", "ETH/USD"] + + +def test_load_replay_events_orders_jsonl_by_timestamp(tmp_path: Path) -> None: + path = tmp_path / "replay.jsonl" + path.write_text( + "\n".join( + [ + '{"timestamp":"2026-06-01T12:00:02Z","symbol":"ETH/USD","bids":[[100.0,1.0]],"asks":[[101.0,1.0]]}', + '{"timestamp":"2026-06-01T12:00:01Z","symbol":"BTC/USD","bids":[[10.0,1.0]],"asks":[[11.0,1.0]]}', + ] + ), + encoding="utf-8", + ) + + events = load_replay_events(path) + + assert [event.symbol for event in events] == ["BTC/USD", "ETH/USD"] + + +def test_backtest_replay_engine_runs_deterministically() -> None: + cycles_by_pair, available_pairs = _build_cycles() + started_at = datetime(2026, 6, 1, 12, 0, tzinfo=UTC) + replay_events = [ + ReplayBookEvent( + occurred_at=started_at, + symbol="BTC/USD", + bids=(BookLevel(price=99.5, volume=10.0),), + asks=(BookLevel(price=100.0, volume=10.0),), + ), + ReplayBookEvent( + occurred_at=started_at + timedelta(seconds=1), + symbol="ETH/BTC", + bids=(BookLevel(price=0.051, volume=10.0),), + asks=(BookLevel(price=0.050, volume=10.0),), + ), + ReplayBookEvent( + occurred_at=started_at + timedelta(seconds=2), + symbol="ETH/USD", + bids=(BookLevel(price=110.0, volume=10.0),), + asks=(BookLevel(price=110.5, volume=10.0),), + ), + ] + + engine = BacktestReplayEngine( + cycles_by_pair=cycles_by_pair, + available_pairs=available_pairs, + config=BacktestConfig(trade_capital=100.0, + slippage_bps=5.0, execution_latency_ms=10.0), + started_at=started_at, + ) + + report = asyncio.run(engine.run( + replay_events, starting_balances={"USD": 1000.0})) + + assert report.processed_events == 3 + assert report.opportunities_seen >= 0 + assert report.trades_executed >= 0 + assert isinstance(report.realized_pnl_usd, float) + assert report.max_drawdown_usd >= 0.0 + assert report.execution_latency_p50_ms is None + assert report.execution_latency_p95_ms is None + assert report.execution_latency_p99_ms is None