diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index 6a704b1..f5d80a6 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -43,6 +43,13 @@ jobs: - name: Tests run: pytest -q + - name: Latency guardrails + run: | + python scripts/check_latency_regression.py \ + --baseline ops/performance/latency_baseline.json \ + --thresholds ops/performance/latency_thresholds.json \ + --iterations 600 + - name: Login to Gitea registry if: github.event_name != 'pull_request' uses: docker/login-action@v3 diff --git a/.gitignore b/.gitignore index 837586e..85866d7 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,7 @@ data/*.duckdb data/*.duckdb.wal data/*.duckdb.tmp logs/ +ops/performance/latest_profile.json # Node assets if used for frontend tooling node_modules/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cdfb31..6ccb7d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ - Added `runtime_state_snapshots` persistence for control flags, open trade count, and last known balances. - Added CI security gates for dependency auditing (`pip-audit --strict`) and a repository/worktree secret scan script. - Added strict settings validators for auth pairing, Kraken credential pairing, alert severity bounds, and key-scope policy. +- Added synthetic latency profiler scenarios and CLI scripts for baseline generation and regression checks. +- Added latency baseline/threshold artifacts and CI latency guardrail enforcement. ### Changed @@ -22,6 +24,7 @@ - Added configuration env keys for stop-condition thresholds. - WebSocket client now emits system alerts for disconnect/reconnect and heartbeat staleness timeout events. - Added explicit Kraken API key permission configuration (`KRAKEN_API_KEY_PERMISSIONS`) and docs for least-privilege key usage. +- Optimized dashboard metrics aggregation to use DuckDB SQL aggregates/quantiles instead of Python row scans. ### Removed @@ -50,3 +53,4 @@ - Added startup restart safety guard that halts execution when open trades are detected after restart. - Added lifecycle tests for snapshot persistence, worker draining, recovery restore, and startup reconciliation hook. - Added unit coverage for security-related settings validation paths. +- Added latency guardrail unit coverage and documented measured metrics aggregation speedup (`1.14x`). diff --git a/README.md b/README.md index ee1789d..1cb61bb 100644 --- a/README.md +++ b/README.md @@ -210,6 +210,18 @@ Run secret scan (worktree + git history): python scripts/security_scan.py ``` +Generate latency profile baseline: + +```powershell +python scripts/profile_latency.py --iterations 600 --output ops/performance/latency_baseline.json +``` + +Run latency regression guardrails: + +```powershell +python scripts/check_latency_regression.py --baseline ops/performance/latency_baseline.json --thresholds ops/performance/latency_thresholds.json --iterations 600 +``` + Install pre-commit hooks: ```powershell @@ -338,3 +350,28 @@ Hardening checklist: - Run `pip-audit --skip-editable` in CI; treat vulnerability findings as release blockers. - Run `python scripts/security_scan.py` before release and after major merges. - Store secrets in environment/secret manager; never commit `.env` or key material. + +## Performance Hardening + +Profile scenarios: + +- `book_update_burst` +- `execution_spike` +- `reconnect_storm` + +Latency baseline and threshold artifacts: + +- `ops/performance/latency_baseline.json` +- `ops/performance/latency_thresholds.json` + +CI guardrail: + +- `.gitea/workflows/ci.yml` runs `scripts/check_latency_regression.py` and fails on regression. + +Measured optimization impact (2026-06-01): + +- `MetricsCalculator.compute()` switched from Python row scans to DuckDB SQL aggregates/quantiles. +- Benchmark (`scripts/benchmark_metrics_compute.py`): + - Python scan avg: `12.623 ms` + - SQL aggregate avg: `11.039 ms` + - Speedup: `1.14x` diff --git a/ops/performance/README.md b/ops/performance/README.md new file mode 100644 index 0000000..73e4817 --- /dev/null +++ b/ops/performance/README.md @@ -0,0 +1,48 @@ +# Performance Hardening + +This folder contains latency profiling baselines and guardrail thresholds used in CI. + +## Scenarios + +The profiler covers representative load patterns: + +- `book_update_burst`: rapid market-data deltas with moderate detection load. +- `execution_spike`: heavier detection/execution pressure. +- `reconnect_storm`: frequent reconnect/reset behavior. + +## Profiling Commands + +Generate a fresh profile: + +```powershell +python scripts/profile_latency.py --iterations 600 --output ops/performance/latency_baseline.json +``` + +Check current performance against the baseline and thresholds: + +```powershell +python scripts/check_latency_regression.py \ + --baseline ops/performance/latency_baseline.json \ + --thresholds ops/performance/latency_thresholds.json \ + --iterations 600 +``` + +CI executes the same guardrail check. + +## Baseline Snapshot (2026-06-01) + +Key end-to-end latency baselines from `latency_baseline.json`: + +- `book_update_burst`: p95 = 0.0132 ms, p99 = 0.0198 ms +- `execution_spike`: p95 = 0.0139 ms, p99 = 0.0177 ms +- `reconnect_storm`: p95 = 0.0114 ms, p99 = 0.0134 ms + +## Optimization Note + +`MetricsCalculator.compute()` was optimized to use DuckDB SQL aggregations and quantiles, reducing Python-side row scans. + +Measured benchmark (`scripts/benchmark_metrics_compute.py`): + +- Python scan baseline: 12.623 ms +- SQL aggregate implementation: 11.039 ms +- Speedup: 1.14x diff --git a/ops/performance/latency_baseline.json b/ops/performance/latency_baseline.json new file mode 100644 index 0000000..c7f1c94 --- /dev/null +++ b/ops/performance/latency_baseline.json @@ -0,0 +1,96 @@ +{ + "iterations": 600, + "scenarios": { + "book_update_burst": { + "iterations": 600, + "stages": { + "ingest": { + "p50_ms": 0.0028, + "p95_ms": 0.0056, + "p99_ms": 0.0083 + }, + "detect": { + "p50_ms": 0.0034, + "p95_ms": 0.005899999999999999, + "p99_ms": 0.0081 + }, + "risk": { + "p50_ms": 0.0002, + "p95_ms": 0.0003, + "p99_ms": 0.0006 + }, + "execution": { + "p50_ms": 0.0006, + "p95_ms": 0.0012, + "p99_ms": 0.0020009999999999993 + }, + "end_to_end": { + "p50_ms": 0.007, + "p95_ms": 0.013204999999999996, + "p99_ms": 0.019801 + } + } + }, + "execution_spike": { + "iterations": 600, + "stages": { + "ingest": { + "p50_ms": 0.0029, + "p95_ms": 0.003, + "p99_ms": 0.00431099999999999 + }, + "detect": { + "p50_ms": 0.0097, + "p95_ms": 0.0101, + "p99_ms": 0.012404999999999996 + }, + "risk": { + "p50_ms": 0.0002, + "p95_ms": 0.00019999999999999998, + "p99_ms": 0.0003 + }, + "execution": { + "p50_ms": 0.0006, + "p95_ms": 0.0007, + "p99_ms": 0.001000999999999999 + }, + "end_to_end": { + "p50_ms": 0.0135, + "p95_ms": 0.0139, + "p99_ms": 0.017701999999999996 + } + } + }, + "reconnect_storm": { + "iterations": 600, + "stages": { + "ingest": { + "p50_ms": 0.0029, + "p95_ms": 0.0039, + "p99_ms": 0.0047 + }, + "detect": { + "p50_ms": 0.0051, + "p95_ms": 0.006, + "p99_ms": 0.007101999999999998 + }, + "risk": { + "p50_ms": 0.0002, + "p95_ms": 0.00019999999999999998, + "p99_ms": 0.0003009999999999991 + }, + "execution": { + "p50_ms": 0.0006, + "p95_ms": 0.0007999999999999999, + "p99_ms": 0.0011009999999999991 + }, + "end_to_end": { + "p50_ms": 0.0088, + "p95_ms": 0.0114, + "p99_ms": 0.013403999999999998 + } + } + } + }, + "generated_at": "2026-06-01T12:35:48.836000+00:00" +} \ No newline at end of file diff --git a/ops/performance/latency_thresholds.json b/ops/performance/latency_thresholds.json new file mode 100644 index 0000000..defeb77 --- /dev/null +++ b/ops/performance/latency_thresholds.json @@ -0,0 +1,16 @@ +{ + "default": { + "p95_ms": 3.0, + "p99_ms": 3.5 + }, + "scenarios": { + "execution_spike": { + "p95_ms": 3.2, + "p99_ms": 3.8 + }, + "reconnect_storm": { + "p95_ms": 3.4, + "p99_ms": 4.0 + } + } +} diff --git a/scripts/benchmark_metrics_compute.py b/scripts/benchmark_metrics_compute.py new file mode 100644 index 0000000..6486b07 --- /dev/null +++ b/scripts/benchmark_metrics_compute.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from pathlib import Path +from statistics import fmean +from tempfile import gettempdir +from time import perf_counter + +from arbitrade.config.settings import Settings +from arbitrade.metrics import MetricsCalculator +from arbitrade.storage.db import DuckDBStore + + +def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float | None]: + with store.connect() as conn: + trade_rows = conn.execute(""" + SELECT started_at, finished_at, realized_pnl + FROM trades + WHERE finished_at IS NOT NULL + """).fetchall() + opportunity_rows = conn.execute("SELECT detected_at FROM opportunities").fetchall() + + realized = sum(float(row[2]) for row in trade_rows if row[2] is not None) + durations = [ + (row[1] - row[0]).total_seconds() + for row in trade_rows + if isinstance(row[0], datetime) and isinstance(row[1], datetime) + ] + avg_duration = fmean(durations) if durations else None + + times = [row[0] for row in opportunity_rows if isinstance(row[0], datetime)] + if len(times) >= 2: + span_seconds = (max(times) - min(times)).total_seconds() + opm = len(times) / (span_seconds / 60.0) if span_seconds > 0.0 else float(len(times)) + elif len(times) == 1: + opm = 60.0 + else: + opm = None + + return realized, avg_duration, opm + + +def _seed_dataset(store: DuckDBStore) -> None: + now = datetime.now(UTC) + + trade_rows: list[tuple[object, ...]] = [] + for i in range(2500): + started = now + timedelta(seconds=i) + finished = started + timedelta(milliseconds=150 + (i % 400)) + pnl = ((i % 17) - 8) * 0.25 + trade_rows.append( + ( + f"t{i}", + started, + finished, + "filled", + pnl, + pnl * 0.9, + 100.0, + "USD->BTC->ETH->USD", + 3, + ) + ) + + opportunity_rows: list[tuple[object, ...]] = [] + for i in range(5000): + detected_at = now + timedelta(milliseconds=200 * i) + opportunity_rows.append((detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2))) + + order_rows: list[tuple[object, ...]] = [] + for i in range(3500): + order_rows.append( + ( + f"t{i % 2500}", + f"o{i}", + 0, + "BTC/USD", + "buy", + 1.0, + i, + "closed", + 0.9, + 100.0, + "{}", + now, + ) + ) + + with store.connect() as conn: + conn.execute("DELETE FROM trades") + conn.execute("DELETE FROM opportunities") + conn.execute("DELETE FROM orders") + conn.executemany( + """ + INSERT INTO trades ( + trade_ref, + started_at, + finished_at, + status, + realized_pnl, + estimated_pnl, + capital_used, + cycle, + leg_count + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + trade_rows, + ) + conn.executemany( + """ + INSERT INTO opportunities ( + detected_at, + cycle, + gross_pct, + net_pct, + est_profit, + executed + ) VALUES (?, ?, ?, ?, ?, ?) + """, + opportunity_rows, + ) + conn.executemany( + """ + INSERT INTO orders ( + trade_ref, + order_ref, + leg_index, + pair, + side, + volume, + user_ref, + status, + filled_volume, + avg_price, + raw_response, + recorded_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + order_rows, + ) + + +def main() -> int: + db_path = Path(gettempdir()) / "arbitrade_metrics_bench.duckdb" + settings = Settings(_env_file=None, DUCKDB_PATH=db_path) + store = DuckDBStore(settings) + store.migrate() + _seed_dataset(store) + + calculator = MetricsCalculator(store) + + for _ in range(3): + _python_scan_compute(store) + calculator.compute() + + runs = 20 + start = perf_counter() + for _ in range(runs): + _python_scan_compute(store) + python_ms = (perf_counter() - start) * 1000.0 / runs + + start = perf_counter() + for _ in range(runs): + calculator.compute() + sql_ms = (perf_counter() - start) * 1000.0 / runs + + speedup = (python_ms / sql_ms) if sql_ms > 0.0 else 0.0 + + print(f"python_scan_avg_ms={python_ms:.3f}") + print(f"sql_aggregate_avg_ms={sql_ms:.3f}") + print(f"speedup_x={speedup:.2f}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/check_latency_regression.py b/scripts/check_latency_regression.py new file mode 100644 index 0000000..8cfc5c1 --- /dev/null +++ b/scripts/check_latency_regression.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from arbitrade.perf.guardrails import evaluate_guardrails +from arbitrade.perf.latency import run_latency_profile + + +def _read_json(path: Path) -> dict[str, object]: + raw = path.read_text(encoding="utf-8") + parsed = json.loads(raw) + if not isinstance(parsed, dict): + raise ValueError(f"Expected object JSON at {path}") + return {str(k): parsed[k] for k in parsed} + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Check latency profile against baseline thresholds." + ) + parser.add_argument("--baseline", type=Path, required=True) + parser.add_argument("--thresholds", type=Path, required=True) + parser.add_argument("--iterations", type=int, default=600) + parser.add_argument( + "--out-current", type=Path, default=Path("ops/performance/latest_profile.json") + ) + args = parser.parse_args() + + baseline = _read_json(args.baseline) + thresholds = _read_json(args.thresholds) + current = run_latency_profile(iterations=args.iterations) + + args.out_current.parent.mkdir(parents=True, exist_ok=True) + args.out_current.write_text(json.dumps(current, indent=2), encoding="utf-8") + + failures = evaluate_guardrails(baseline=baseline, current=current, thresholds=thresholds) + if failures: + print("Latency guardrail failures:") + for failure in failures: + print(f"- {failure}") + return 1 + + print("Latency guardrails passed.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/profile_latency.py b/scripts/profile_latency.py new file mode 100644 index 0000000..a7895f5 --- /dev/null +++ b/scripts/profile_latency.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import argparse +import json +from datetime import UTC, datetime +from pathlib import Path + +from arbitrade.perf.latency import run_latency_profile + + +def _format_summary(profile: dict[str, object]) -> str: + scenarios = profile.get("scenarios") + if not isinstance(scenarios, dict): + return "No scenarios found." + + lines = ["Latency profiling summary:"] + for scenario_name, payload in scenarios.items(): + if not isinstance(payload, dict): + continue + lines.append(f"- {scenario_name}") + stages = payload.get("stages") + if not isinstance(stages, dict): + continue + for stage_name, stage_payload in stages.items(): + if not isinstance(stage_payload, dict): + continue + p95 = float(stage_payload.get("p95_ms", 0.0)) + p99 = float(stage_payload.get("p99_ms", 0.0)) + lines.append(f" - {stage_name}: p95={p95:.4f}ms p99={p99:.4f}ms") + + return "\n".join(lines) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Profile synthetic latency scenarios.") + parser.add_argument("--iterations", type=int, default=600) + parser.add_argument("--output", type=Path, default=None) + args = parser.parse_args() + + profile = run_latency_profile(iterations=args.iterations) + profile["generated_at"] = datetime.now(UTC).isoformat() + + print(_format_summary(profile)) + + if args.output is not None: + args.output.parent.mkdir(parents=True, exist_ok=True) + args.output.write_text(json.dumps(profile, indent=2), encoding="utf-8") + print(f"Wrote profile JSON to {args.output}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/arbitrade/metrics.py b/src/arbitrade/metrics.py index 869ce22..6273519 100644 --- a/src/arbitrade/metrics.py +++ b/src/arbitrade/metrics.py @@ -1,9 +1,7 @@ from __future__ import annotations -from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime -from statistics import fmean from arbitrade.storage.db import DuckDBStore @@ -24,78 +22,103 @@ class MetricsCalculator: def __init__(self, store: DuckDBStore) -> None: self._store = store - @staticmethod - def _percentile(values: Iterable[float], percentile: float) -> float | None: - samples = sorted(values) - if not samples: - return None - - if percentile <= 0.0: - return samples[0] - if percentile >= 100.0: - return samples[-1] - - rank = (len(samples) - 1) * (percentile / 100.0) - lower_index = int(rank) - upper_index = min(lower_index + 1, len(samples) - 1) - weight = rank - lower_index - return samples[lower_index] * (1.0 - weight) + samples[upper_index] * weight - def compute(self) -> PerformanceMetrics: with self._store.connect() as conn: - trade_rows = conn.execute(""" - SELECT started_at, finished_at, realized_pnl + trade_metrics = conn.execute(""" + SELECT + COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) AS realized_pnl_usd, + COUNT(*) AS total_trades, + SUM(CASE WHEN realized_pnl > 0 THEN 1 ELSE 0 END) AS winning_trades, + AVG(EPOCH(finished_at) - EPOCH(started_at)) AS avg_trade_duration_seconds, + quantile_cont( + EPOCH(finished_at) - EPOCH(started_at), + 0.50 + ) AS latency_p50_seconds, + quantile_cont( + EPOCH(finished_at) - EPOCH(started_at), + 0.95 + ) AS latency_p95_seconds, + quantile_cont( + EPOCH(finished_at) - EPOCH(started_at), + 0.99 + ) AS latency_p99_seconds FROM trades WHERE finished_at IS NOT NULL - """).fetchall() - opportunity_rows = conn.execute(""" - SELECT detected_at + """).fetchone() + + opportunity_metrics = conn.execute(""" + SELECT + COUNT(*) AS opportunity_count, + MIN(detected_at) AS first_detected_at, + MAX(detected_at) AS last_detected_at FROM opportunities - """).fetchall() - order_rows = conn.execute(""" - SELECT volume, filled_volume + """).fetchone() + + fill_metrics = conn.execute(""" + SELECT AVG(filled_volume / volume) AS fill_rate FROM orders - WHERE volume > 0 - """).fetchall() + WHERE volume > 0 AND filled_volume IS NOT NULL + """).fetchone() - realized_values = [float(row[2]) for row in trade_rows if row[2] is not None] - realized_pnl_usd = sum(realized_values) - - total_trades = len(trade_rows) - winning_trades = sum(1 for row in trade_rows if row[2] is not None and float(row[2]) > 0.0) + realized_pnl_usd = ( + float(trade_metrics[0]) if trade_metrics and trade_metrics[0] is not None else 0.0 + ) + total_trades = ( + int(trade_metrics[1]) if trade_metrics and trade_metrics[1] is not None else 0 + ) + winning_trades = ( + int(trade_metrics[2]) if trade_metrics and trade_metrics[2] is not None else 0 + ) win_rate = winning_trades / total_trades if total_trades > 0 else None - durations_seconds = [ - (row[1] - row[0]).total_seconds() - for row in trade_rows - if isinstance(row[0], datetime) and isinstance(row[1], datetime) - ] - avg_trade_duration_seconds = fmean(durations_seconds) if durations_seconds else None + avg_trade_duration_seconds = ( + float(trade_metrics[3]) if trade_metrics and trade_metrics[3] is not None else None + ) + + opportunity_count = ( + int(opportunity_metrics[0]) + if opportunity_metrics is not None and opportunity_metrics[0] is not None + else 0 + ) + first_detected_at = ( + opportunity_metrics[1] + if opportunity_metrics is not None and isinstance(opportunity_metrics[1], datetime) + else None + ) + last_detected_at = ( + opportunity_metrics[2] + if opportunity_metrics is not None and isinstance(opportunity_metrics[2], datetime) + else None + ) - opportunity_times = [row[0] for row in opportunity_rows if isinstance(row[0], datetime)] opportunities_per_minute: float | None - if len(opportunity_times) >= 2: - span_seconds = (max(opportunity_times) - min(opportunity_times)).total_seconds() + if ( + opportunity_count >= 2 + and first_detected_at is not None + and last_detected_at is not None + ): + span_seconds = (last_detected_at - first_detected_at).total_seconds() opportunities_per_minute = ( - len(opportunity_times) / (span_seconds / 60.0) + opportunity_count / (span_seconds / 60.0) if span_seconds > 0.0 - else float(len(opportunity_times)) + else float(opportunity_count) ) - elif len(opportunity_times) == 1: + elif opportunity_count == 1: opportunities_per_minute = 60.0 else: opportunities_per_minute = None - fill_samples = [ - float(filled) / float(volume) - for volume, filled in order_rows - if filled is not None and float(volume) > 0.0 - ] - fill_rate = fmean(fill_samples) if fill_samples else None + fill_rate = float(fill_metrics[0]) if fill_metrics and fill_metrics[0] is not None else None - latency_p50_seconds = self._percentile(durations_seconds, 50.0) - latency_p95_seconds = self._percentile(durations_seconds, 95.0) - latency_p99_seconds = self._percentile(durations_seconds, 99.0) + latency_p50_seconds = ( + float(trade_metrics[4]) if trade_metrics and trade_metrics[4] is not None else None + ) + latency_p95_seconds = ( + float(trade_metrics[5]) if trade_metrics and trade_metrics[5] is not None else None + ) + latency_p99_seconds = ( + float(trade_metrics[6]) if trade_metrics and trade_metrics[6] is not None else None + ) return PerformanceMetrics( realized_pnl_usd=realized_pnl_usd, diff --git a/src/arbitrade/perf/__init__.py b/src/arbitrade/perf/__init__.py new file mode 100644 index 0000000..f0dbf88 --- /dev/null +++ b/src/arbitrade/perf/__init__.py @@ -0,0 +1,4 @@ +from arbitrade.perf.guardrails import evaluate_guardrails +from arbitrade.perf.latency import run_latency_profile + +__all__ = ["run_latency_profile", "evaluate_guardrails"] diff --git a/src/arbitrade/perf/guardrails.py b/src/arbitrade/perf/guardrails.py new file mode 100644 index 0000000..51bcf4a --- /dev/null +++ b/src/arbitrade/perf/guardrails.py @@ -0,0 +1,80 @@ +from __future__ import annotations + + +def evaluate_guardrails( + *, + baseline: dict[str, object], + current: dict[str, object], + thresholds: dict[str, object], +) -> list[str]: + failures: list[str] = [] + + baseline_scenarios = baseline.get("scenarios") + current_scenarios = current.get("scenarios") + if not isinstance(baseline_scenarios, dict) or not isinstance(current_scenarios, dict): + return ["invalid profile payload: missing scenarios map"] + + default_thresholds = thresholds.get("default") + if not isinstance(default_thresholds, dict): + default_thresholds = {"p95_ms": 2.5, "p99_ms": 3.0} + + scenario_thresholds = thresholds.get("scenarios") + if not isinstance(scenario_thresholds, dict): + scenario_thresholds = {} + + for scenario, baseline_payload in baseline_scenarios.items(): + current_payload = current_scenarios.get(scenario) + if not isinstance(baseline_payload, dict) or not isinstance(current_payload, dict): + failures.append(f"missing scenario in current profile: {scenario}") + continue + + baseline_stages = baseline_payload.get("stages") + current_stages = current_payload.get("stages") + if not isinstance(baseline_stages, dict) or not isinstance(current_stages, dict): + failures.append(f"missing stages map for scenario: {scenario}") + continue + + scenario_config = scenario_thresholds.get(scenario) + if not isinstance(scenario_config, dict): + scenario_config = {} + + for stage, baseline_stage in baseline_stages.items(): + current_stage = current_stages.get(stage) + if not isinstance(baseline_stage, dict) or not isinstance(current_stage, dict): + failures.append(f"missing stage in current profile: {scenario}.{stage}") + continue + + for percentile_key in ("p95_ms", "p99_ms"): + threshold_ratio_raw = scenario_config.get( + percentile_key, + default_thresholds.get(percentile_key, 3.0), + ) + threshold_ratio = ( + float(threshold_ratio_raw) + if isinstance(threshold_ratio_raw, int | float) + else 3.0 + ) + + base_value_raw = baseline_stage.get(percentile_key) + current_value_raw = current_stage.get(percentile_key) + if not isinstance(base_value_raw, int | float) or not isinstance( + current_value_raw, int | float + ): + failures.append( + f"invalid percentile value: {scenario}.{stage}.{percentile_key}" + ) + continue + + base_value = float(base_value_raw) + current_value = float(current_value_raw) + # Avoid divide-by-zero while still preserving strict checks. + max_allowed = max(base_value * threshold_ratio, 0.001) + if current_value > max_allowed: + failures.append( + f"latency regression: {scenario}.{stage}.{percentile_key} " + f"current={current_value:.4f}ms " + f"baseline={base_value:.4f}ms " + f"allowed={max_allowed:.4f}ms" + ) + + return failures diff --git a/src/arbitrade/perf/latency.py b/src/arbitrade/perf/latency.py new file mode 100644 index 0000000..8749a03 --- /dev/null +++ b/src/arbitrade/perf/latency.py @@ -0,0 +1,195 @@ +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from time import perf_counter_ns + +import orjson + + +@dataclass(frozen=True, slots=True) +class PercentileSummary: + p50_ms: float + p95_ms: float + p99_ms: float + + +@dataclass(frozen=True, slots=True) +class ScenarioProfile: + scenario: str + iterations: int + stages: dict[str, PercentileSummary] + + +def _percentile(samples: list[float], percentile: float) -> float: + if not samples: + return 0.0 + ordered = sorted(samples) + if percentile <= 0.0: + return ordered[0] + if percentile >= 100.0: + return ordered[-1] + rank = (len(ordered) - 1) * (percentile / 100.0) + lower = int(rank) + upper = min(lower + 1, len(ordered) - 1) + weight = rank - lower + return ordered[lower] * (1.0 - weight) + ordered[upper] * weight + + +def _summarize(samples: list[float]) -> PercentileSummary: + return PercentileSummary( + p50_ms=_percentile(samples, 50.0), + p95_ms=_percentile(samples, 95.0), + p99_ms=_percentile(samples, 99.0), + ) + + +def _ingest_stage(raw_payload: bytes, state: dict[str, float]) -> None: + parsed = orjson.loads(raw_payload) + bids = parsed.get("bids", []) + asks = parsed.get("asks", []) + for price, volume in bids[:4]: + state[str(price)] = float(volume) + for price, volume in asks[:4]: + state[str(price)] = float(volume) + + +def _detect_stage(values: list[float], cycles: int) -> float: + best = 0.0 + size = len(values) + for idx in range(cycles): + a = values[idx % size] + b = values[(idx + 3) % size] + c = values[(idx + 7) % size] + gross = (a / b) * c + net = gross * 0.9975 + if net > best: + best = net + return best + + +def _risk_stage(net_edge: float, capital: float) -> float: + if net_edge < 1.0002: + return 0.0 + if capital > 500.0: + capital = 500.0 + return capital * min(net_edge - 1.0, 0.02) + + +def _execution_stage(planned_pnl: float, order_id: int) -> None: + payload = { + "order_id": order_id, + "planned_pnl": planned_pnl, + "legs": [ + {"pair": "BTC/USD", "side": "buy", "qty": 0.01}, + {"pair": "ETH/BTC", "side": "buy", "qty": 0.1}, + {"pair": "ETH/USD", "side": "sell", "qty": 0.1}, + ], + } + _ = orjson.dumps(payload) + + +def _run_scenario( + name: str, + iterations: int, + detect_cycles: int, + reconnect_every: int, +) -> ScenarioProfile: + payloads = [ + orjson.dumps( + { + "symbol": "BTC/USD", + "bids": [[100000.0 + i, 0.2 + (i % 5) * 0.01] for i in range(12)], + "asks": [[100001.0 + i, 0.2 + (i % 7) * 0.01] for i in range(12)], + } + ) + for _ in range(5) + ] + value_series = [1.0 + (idx % 31) * 0.0007 for idx in range(128)] + order_state: dict[str, float] = {} + + ingest_ms: list[float] = [] + detect_ms: list[float] = [] + risk_ms: list[float] = [] + execution_ms: list[float] = [] + end_to_end_ms: list[float] = [] + + for idx in range(iterations): + start_ns = perf_counter_ns() + payload = payloads[idx % len(payloads)] + + t0 = perf_counter_ns() + _ingest_stage(payload, order_state) + if reconnect_every > 0 and idx > 0 and idx % reconnect_every == 0: + order_state.clear() + t1 = perf_counter_ns() + + net_edge = _detect_stage(value_series, detect_cycles) + t2 = perf_counter_ns() + + planned = _risk_stage(net_edge, capital=100.0 + (idx % 50)) + t3 = perf_counter_ns() + + _execution_stage(planned, order_id=idx) + t4 = perf_counter_ns() + + ingest_ms.append((t1 - t0) / 1_000_000.0) + detect_ms.append((t2 - t1) / 1_000_000.0) + risk_ms.append((t3 - t2) / 1_000_000.0) + execution_ms.append((t4 - t3) / 1_000_000.0) + end_to_end_ms.append((t4 - start_ns) / 1_000_000.0) + + return ScenarioProfile( + scenario=name, + iterations=iterations, + stages={ + "ingest": _summarize(ingest_ms), + "detect": _summarize(detect_ms), + "risk": _summarize(risk_ms), + "execution": _summarize(execution_ms), + "end_to_end": _summarize(end_to_end_ms), + }, + ) + + +def run_latency_profile(iterations: int = 600) -> dict[str, object]: + scenarios: list[Callable[[], ScenarioProfile]] = [ + lambda: _run_scenario( + name="book_update_burst", + iterations=iterations, + detect_cycles=32, + reconnect_every=0, + ), + lambda: _run_scenario( + name="execution_spike", + iterations=iterations, + detect_cycles=96, + reconnect_every=0, + ), + lambda: _run_scenario( + name="reconnect_storm", + iterations=iterations, + detect_cycles=48, + reconnect_every=20, + ), + ] + + result: dict[str, object] = {"iterations": iterations, "scenarios": {}} + scenario_map = result["scenarios"] + assert isinstance(scenario_map, dict) + + for scenario in scenarios: + profile = scenario() + scenario_map[profile.scenario] = { + "iterations": profile.iterations, + "stages": { + stage: { + "p50_ms": summary.p50_ms, + "p95_ms": summary.p95_ms, + "p99_ms": summary.p99_ms, + } + for stage, summary in profile.stages.items() + }, + } + + return result diff --git a/tests/unit/test_latency_guardrails.py b/tests/unit/test_latency_guardrails.py new file mode 100644 index 0000000..2282890 --- /dev/null +++ b/tests/unit/test_latency_guardrails.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from arbitrade.perf.guardrails import evaluate_guardrails +from arbitrade.perf.latency import run_latency_profile + + +def test_run_latency_profile_contains_expected_shape() -> None: + profile = run_latency_profile(iterations=50) + + scenarios = profile.get("scenarios") + assert isinstance(scenarios, dict) + assert set(scenarios) == { + "book_update_burst", + "execution_spike", + "reconnect_storm", + } + + for payload in scenarios.values(): + assert isinstance(payload, dict) + stages = payload.get("stages") + assert isinstance(stages, dict) + assert "end_to_end" in stages + + +def test_evaluate_guardrails_flags_regression() -> None: + baseline = { + "scenarios": { + "book_update_burst": { + "stages": { + "end_to_end": {"p95_ms": 1.0, "p99_ms": 1.0}, + } + } + } + } + current = { + "scenarios": { + "book_update_burst": { + "stages": { + "end_to_end": {"p95_ms": 4.0, "p99_ms": 4.0}, + } + } + } + } + thresholds = {"default": {"p95_ms": 2.0, "p99_ms": 2.0}} + + failures = evaluate_guardrails(baseline=baseline, current=current, thresholds=thresholds) + + assert failures + assert "latency regression" in failures[0] diff --git a/tests/unit/test_settings_validation.py b/tests/unit/test_settings_validation.py index 9dd77cd..875ff30 100644 --- a/tests/unit/test_settings_validation.py +++ b/tests/unit/test_settings_validation.py @@ -11,7 +11,11 @@ def test_dashboard_auth_requires_both_fields() -> None: def test_kraken_api_auth_requires_key_and_secret() -> None: with pytest.raises(ValidationError): - Settings(_env_file=None, KRAKEN_API_KEY="key-only") + Settings( + _env_file=None, + KRAKEN_API_KEY="key-only", + KRAKEN_API_SECRET="", + ) def test_kraken_permissions_require_query_and_trade() -> None: