feat: Implement latency profiling and guardrails for performance monitoring
CI / lint-test-build (push) Failing after 19s

- Added synthetic latency profiler scenarios and CLI scripts for baseline generation and regression checks.
- Introduced latency baseline and threshold artifacts for CI enforcement.
- Enhanced CI workflow with latency guardrail checks.
- Updated documentation to include latency profiling commands and performance metrics.
- Added unit tests for latency guardrail evaluation.
This commit is contained in:
2026-06-01 14:47:52 +02:00
parent c17f41aaf8
commit cc11082ea7
16 changed files with 900 additions and 56 deletions
+7
View File
@@ -43,6 +43,13 @@ jobs:
- name: Tests - name: Tests
run: pytest -q run: pytest -q
- name: Latency guardrails
run: |
python scripts/check_latency_regression.py \
--baseline ops/performance/latency_baseline.json \
--thresholds ops/performance/latency_thresholds.json \
--iterations 600
- name: Login to Gitea registry - name: Login to Gitea registry
if: github.event_name != 'pull_request' if: github.event_name != 'pull_request'
uses: docker/login-action@v3 uses: docker/login-action@v3
+1
View File
@@ -36,6 +36,7 @@ data/*.duckdb
data/*.duckdb.wal data/*.duckdb.wal
data/*.duckdb.tmp data/*.duckdb.tmp
logs/ logs/
ops/performance/latest_profile.json
# Node assets if used for frontend tooling # Node assets if used for frontend tooling
node_modules/ node_modules/
+4
View File
@@ -15,6 +15,8 @@
- Added `runtime_state_snapshots` persistence for control flags, open trade count, and last known balances. - Added `runtime_state_snapshots` persistence for control flags, open trade count, and last known balances.
- Added CI security gates for dependency auditing (`pip-audit --strict`) and a repository/worktree secret scan script. - Added CI security gates for dependency auditing (`pip-audit --strict`) and a repository/worktree secret scan script.
- Added strict settings validators for auth pairing, Kraken credential pairing, alert severity bounds, and key-scope policy. - Added strict settings validators for auth pairing, Kraken credential pairing, alert severity bounds, and key-scope policy.
- Added synthetic latency profiler scenarios and CLI scripts for baseline generation and regression checks.
- Added latency baseline/threshold artifacts and CI latency guardrail enforcement.
### Changed ### Changed
@@ -22,6 +24,7 @@
- Added configuration env keys for stop-condition thresholds. - Added configuration env keys for stop-condition thresholds.
- WebSocket client now emits system alerts for disconnect/reconnect and heartbeat staleness timeout events. - WebSocket client now emits system alerts for disconnect/reconnect and heartbeat staleness timeout events.
- Added explicit Kraken API key permission configuration (`KRAKEN_API_KEY_PERMISSIONS`) and docs for least-privilege key usage. - Added explicit Kraken API key permission configuration (`KRAKEN_API_KEY_PERMISSIONS`) and docs for least-privilege key usage.
- Optimized dashboard metrics aggregation to use DuckDB SQL aggregates/quantiles instead of Python row scans.
### Removed ### Removed
@@ -50,3 +53,4 @@
- Added startup restart safety guard that halts execution when open trades are detected after restart. - Added startup restart safety guard that halts execution when open trades are detected after restart.
- Added lifecycle tests for snapshot persistence, worker draining, recovery restore, and startup reconciliation hook. - Added lifecycle tests for snapshot persistence, worker draining, recovery restore, and startup reconciliation hook.
- Added unit coverage for security-related settings validation paths. - Added unit coverage for security-related settings validation paths.
- Added latency guardrail unit coverage and documented measured metrics aggregation speedup (`1.14x`).
+37
View File
@@ -210,6 +210,18 @@ Run secret scan (worktree + git history):
python scripts/security_scan.py python scripts/security_scan.py
``` ```
Generate latency profile baseline:
```powershell
python scripts/profile_latency.py --iterations 600 --output ops/performance/latency_baseline.json
```
Run latency regression guardrails:
```powershell
python scripts/check_latency_regression.py --baseline ops/performance/latency_baseline.json --thresholds ops/performance/latency_thresholds.json --iterations 600
```
Install pre-commit hooks: Install pre-commit hooks:
```powershell ```powershell
@@ -338,3 +350,28 @@ Hardening checklist:
- Run `pip-audit --skip-editable` in CI; treat vulnerability findings as release blockers. - Run `pip-audit --skip-editable` in CI; treat vulnerability findings as release blockers.
- Run `python scripts/security_scan.py` before release and after major merges. - Run `python scripts/security_scan.py` before release and after major merges.
- Store secrets in environment/secret manager; never commit `.env` or key material. - Store secrets in environment/secret manager; never commit `.env` or key material.
## Performance Hardening
Profile scenarios:
- `book_update_burst`
- `execution_spike`
- `reconnect_storm`
Latency baseline and threshold artifacts:
- `ops/performance/latency_baseline.json`
- `ops/performance/latency_thresholds.json`
CI guardrail:
- `.gitea/workflows/ci.yml` runs `scripts/check_latency_regression.py` and fails on regression.
Measured optimization impact (2026-06-01):
- `MetricsCalculator.compute()` switched from Python row scans to DuckDB SQL aggregates/quantiles.
- Benchmark (`scripts/benchmark_metrics_compute.py`):
- Python scan avg: `12.623 ms`
- SQL aggregate avg: `11.039 ms`
- Speedup: `1.14x`
+48
View File
@@ -0,0 +1,48 @@
# Performance Hardening
This folder contains latency profiling baselines and guardrail thresholds used in CI.
## Scenarios
The profiler covers representative load patterns:
- `book_update_burst`: rapid market-data deltas with moderate detection load.
- `execution_spike`: heavier detection/execution pressure.
- `reconnect_storm`: frequent reconnect/reset behavior.
## Profiling Commands
Generate a fresh profile:
```powershell
python scripts/profile_latency.py --iterations 600 --output ops/performance/latency_baseline.json
```
Check current performance against the baseline and thresholds:
```powershell
python scripts/check_latency_regression.py \
--baseline ops/performance/latency_baseline.json \
--thresholds ops/performance/latency_thresholds.json \
--iterations 600
```
CI executes the same guardrail check.
## Baseline Snapshot (2026-06-01)
Key end-to-end latency baselines from `latency_baseline.json`:
- `book_update_burst`: p95 = 0.0132 ms, p99 = 0.0198 ms
- `execution_spike`: p95 = 0.0139 ms, p99 = 0.0177 ms
- `reconnect_storm`: p95 = 0.0114 ms, p99 = 0.0134 ms
## Optimization Note
`MetricsCalculator.compute()` was optimized to use DuckDB SQL aggregations and quantiles, reducing Python-side row scans.
Measured benchmark (`scripts/benchmark_metrics_compute.py`):
- Python scan baseline: 12.623 ms
- SQL aggregate implementation: 11.039 ms
- Speedup: 1.14x
+96
View File
@@ -0,0 +1,96 @@
{
"iterations": 600,
"scenarios": {
"book_update_burst": {
"iterations": 600,
"stages": {
"ingest": {
"p50_ms": 0.0028,
"p95_ms": 0.0056,
"p99_ms": 0.0083
},
"detect": {
"p50_ms": 0.0034,
"p95_ms": 0.005899999999999999,
"p99_ms": 0.0081
},
"risk": {
"p50_ms": 0.0002,
"p95_ms": 0.0003,
"p99_ms": 0.0006
},
"execution": {
"p50_ms": 0.0006,
"p95_ms": 0.0012,
"p99_ms": 0.0020009999999999993
},
"end_to_end": {
"p50_ms": 0.007,
"p95_ms": 0.013204999999999996,
"p99_ms": 0.019801
}
}
},
"execution_spike": {
"iterations": 600,
"stages": {
"ingest": {
"p50_ms": 0.0029,
"p95_ms": 0.003,
"p99_ms": 0.00431099999999999
},
"detect": {
"p50_ms": 0.0097,
"p95_ms": 0.0101,
"p99_ms": 0.012404999999999996
},
"risk": {
"p50_ms": 0.0002,
"p95_ms": 0.00019999999999999998,
"p99_ms": 0.0003
},
"execution": {
"p50_ms": 0.0006,
"p95_ms": 0.0007,
"p99_ms": 0.001000999999999999
},
"end_to_end": {
"p50_ms": 0.0135,
"p95_ms": 0.0139,
"p99_ms": 0.017701999999999996
}
}
},
"reconnect_storm": {
"iterations": 600,
"stages": {
"ingest": {
"p50_ms": 0.0029,
"p95_ms": 0.0039,
"p99_ms": 0.0047
},
"detect": {
"p50_ms": 0.0051,
"p95_ms": 0.006,
"p99_ms": 0.007101999999999998
},
"risk": {
"p50_ms": 0.0002,
"p95_ms": 0.00019999999999999998,
"p99_ms": 0.0003009999999999991
},
"execution": {
"p50_ms": 0.0006,
"p95_ms": 0.0007999999999999999,
"p99_ms": 0.0011009999999999991
},
"end_to_end": {
"p50_ms": 0.0088,
"p95_ms": 0.0114,
"p99_ms": 0.013403999999999998
}
}
}
},
"generated_at": "2026-06-01T12:35:48.836000+00:00"
}
+16
View File
@@ -0,0 +1,16 @@
{
"default": {
"p95_ms": 3.0,
"p99_ms": 3.5
},
"scenarios": {
"execution_spike": {
"p95_ms": 3.2,
"p99_ms": 3.8
},
"reconnect_storm": {
"p95_ms": 3.4,
"p99_ms": 4.0
}
}
}
+176
View File
@@ -0,0 +1,176 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from pathlib import Path
from statistics import fmean
from tempfile import gettempdir
from time import perf_counter
from arbitrade.config.settings import Settings
from arbitrade.metrics import MetricsCalculator
from arbitrade.storage.db import DuckDBStore
def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float | None]:
with store.connect() as conn:
trade_rows = conn.execute("""
SELECT started_at, finished_at, realized_pnl
FROM trades
WHERE finished_at IS NOT NULL
""").fetchall()
opportunity_rows = conn.execute("SELECT detected_at FROM opportunities").fetchall()
realized = sum(float(row[2]) for row in trade_rows if row[2] is not None)
durations = [
(row[1] - row[0]).total_seconds()
for row in trade_rows
if isinstance(row[0], datetime) and isinstance(row[1], datetime)
]
avg_duration = fmean(durations) if durations else None
times = [row[0] for row in opportunity_rows if isinstance(row[0], datetime)]
if len(times) >= 2:
span_seconds = (max(times) - min(times)).total_seconds()
opm = len(times) / (span_seconds / 60.0) if span_seconds > 0.0 else float(len(times))
elif len(times) == 1:
opm = 60.0
else:
opm = None
return realized, avg_duration, opm
def _seed_dataset(store: DuckDBStore) -> None:
now = datetime.now(UTC)
trade_rows: list[tuple[object, ...]] = []
for i in range(2500):
started = now + timedelta(seconds=i)
finished = started + timedelta(milliseconds=150 + (i % 400))
pnl = ((i % 17) - 8) * 0.25
trade_rows.append(
(
f"t{i}",
started,
finished,
"filled",
pnl,
pnl * 0.9,
100.0,
"USD->BTC->ETH->USD",
3,
)
)
opportunity_rows: list[tuple[object, ...]] = []
for i in range(5000):
detected_at = now + timedelta(milliseconds=200 * i)
opportunity_rows.append((detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2)))
order_rows: list[tuple[object, ...]] = []
for i in range(3500):
order_rows.append(
(
f"t{i % 2500}",
f"o{i}",
0,
"BTC/USD",
"buy",
1.0,
i,
"closed",
0.9,
100.0,
"{}",
now,
)
)
with store.connect() as conn:
conn.execute("DELETE FROM trades")
conn.execute("DELETE FROM opportunities")
conn.execute("DELETE FROM orders")
conn.executemany(
"""
INSERT INTO trades (
trade_ref,
started_at,
finished_at,
status,
realized_pnl,
estimated_pnl,
capital_used,
cycle,
leg_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
trade_rows,
)
conn.executemany(
"""
INSERT INTO opportunities (
detected_at,
cycle,
gross_pct,
net_pct,
est_profit,
executed
) VALUES (?, ?, ?, ?, ?, ?)
""",
opportunity_rows,
)
conn.executemany(
"""
INSERT INTO orders (
trade_ref,
order_ref,
leg_index,
pair,
side,
volume,
user_ref,
status,
filled_volume,
avg_price,
raw_response,
recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
order_rows,
)
def main() -> int:
db_path = Path(gettempdir()) / "arbitrade_metrics_bench.duckdb"
settings = Settings(_env_file=None, DUCKDB_PATH=db_path)
store = DuckDBStore(settings)
store.migrate()
_seed_dataset(store)
calculator = MetricsCalculator(store)
for _ in range(3):
_python_scan_compute(store)
calculator.compute()
runs = 20
start = perf_counter()
for _ in range(runs):
_python_scan_compute(store)
python_ms = (perf_counter() - start) * 1000.0 / runs
start = perf_counter()
for _ in range(runs):
calculator.compute()
sql_ms = (perf_counter() - start) * 1000.0 / runs
speedup = (python_ms / sql_ms) if sql_ms > 0.0 else 0.0
print(f"python_scan_avg_ms={python_ms:.3f}")
print(f"sql_aggregate_avg_ms={sql_ms:.3f}")
print(f"speedup_x={speedup:.2f}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+50
View File
@@ -0,0 +1,50 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from arbitrade.perf.guardrails import evaluate_guardrails
from arbitrade.perf.latency import run_latency_profile
def _read_json(path: Path) -> dict[str, object]:
raw = path.read_text(encoding="utf-8")
parsed = json.loads(raw)
if not isinstance(parsed, dict):
raise ValueError(f"Expected object JSON at {path}")
return {str(k): parsed[k] for k in parsed}
def main() -> int:
parser = argparse.ArgumentParser(
description="Check latency profile against baseline thresholds."
)
parser.add_argument("--baseline", type=Path, required=True)
parser.add_argument("--thresholds", type=Path, required=True)
parser.add_argument("--iterations", type=int, default=600)
parser.add_argument(
"--out-current", type=Path, default=Path("ops/performance/latest_profile.json")
)
args = parser.parse_args()
baseline = _read_json(args.baseline)
thresholds = _read_json(args.thresholds)
current = run_latency_profile(iterations=args.iterations)
args.out_current.parent.mkdir(parents=True, exist_ok=True)
args.out_current.write_text(json.dumps(current, indent=2), encoding="utf-8")
failures = evaluate_guardrails(baseline=baseline, current=current, thresholds=thresholds)
if failures:
print("Latency guardrail failures:")
for failure in failures:
print(f"- {failure}")
return 1
print("Latency guardrails passed.")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+54
View File
@@ -0,0 +1,54 @@
from __future__ import annotations
import argparse
import json
from datetime import UTC, datetime
from pathlib import Path
from arbitrade.perf.latency import run_latency_profile
def _format_summary(profile: dict[str, object]) -> str:
scenarios = profile.get("scenarios")
if not isinstance(scenarios, dict):
return "No scenarios found."
lines = ["Latency profiling summary:"]
for scenario_name, payload in scenarios.items():
if not isinstance(payload, dict):
continue
lines.append(f"- {scenario_name}")
stages = payload.get("stages")
if not isinstance(stages, dict):
continue
for stage_name, stage_payload in stages.items():
if not isinstance(stage_payload, dict):
continue
p95 = float(stage_payload.get("p95_ms", 0.0))
p99 = float(stage_payload.get("p99_ms", 0.0))
lines.append(f" - {stage_name}: p95={p95:.4f}ms p99={p99:.4f}ms")
return "\n".join(lines)
def main() -> int:
parser = argparse.ArgumentParser(description="Profile synthetic latency scenarios.")
parser.add_argument("--iterations", type=int, default=600)
parser.add_argument("--output", type=Path, default=None)
args = parser.parse_args()
profile = run_latency_profile(iterations=args.iterations)
profile["generated_at"] = datetime.now(UTC).isoformat()
print(_format_summary(profile))
if args.output is not None:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(profile, indent=2), encoding="utf-8")
print(f"Wrote profile JSON to {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+82 -59
View File
@@ -1,9 +1,7 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from statistics import fmean
from arbitrade.storage.db import DuckDBStore from arbitrade.storage.db import DuckDBStore
@@ -24,78 +22,103 @@ class MetricsCalculator:
def __init__(self, store: DuckDBStore) -> None: def __init__(self, store: DuckDBStore) -> None:
self._store = store self._store = store
@staticmethod
def _percentile(values: Iterable[float], percentile: float) -> float | None:
samples = sorted(values)
if not samples:
return None
if percentile <= 0.0:
return samples[0]
if percentile >= 100.0:
return samples[-1]
rank = (len(samples) - 1) * (percentile / 100.0)
lower_index = int(rank)
upper_index = min(lower_index + 1, len(samples) - 1)
weight = rank - lower_index
return samples[lower_index] * (1.0 - weight) + samples[upper_index] * weight
def compute(self) -> PerformanceMetrics: def compute(self) -> PerformanceMetrics:
with self._store.connect() as conn: with self._store.connect() as conn:
trade_rows = conn.execute(""" trade_metrics = conn.execute("""
SELECT started_at, finished_at, realized_pnl SELECT
COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) AS realized_pnl_usd,
COUNT(*) AS total_trades,
SUM(CASE WHEN realized_pnl > 0 THEN 1 ELSE 0 END) AS winning_trades,
AVG(EPOCH(finished_at) - EPOCH(started_at)) AS avg_trade_duration_seconds,
quantile_cont(
EPOCH(finished_at) - EPOCH(started_at),
0.50
) AS latency_p50_seconds,
quantile_cont(
EPOCH(finished_at) - EPOCH(started_at),
0.95
) AS latency_p95_seconds,
quantile_cont(
EPOCH(finished_at) - EPOCH(started_at),
0.99
) AS latency_p99_seconds
FROM trades FROM trades
WHERE finished_at IS NOT NULL WHERE finished_at IS NOT NULL
""").fetchall() """).fetchone()
opportunity_rows = conn.execute("""
SELECT detected_at opportunity_metrics = conn.execute("""
SELECT
COUNT(*) AS opportunity_count,
MIN(detected_at) AS first_detected_at,
MAX(detected_at) AS last_detected_at
FROM opportunities FROM opportunities
""").fetchall() """).fetchone()
order_rows = conn.execute("""
SELECT volume, filled_volume fill_metrics = conn.execute("""
SELECT AVG(filled_volume / volume) AS fill_rate
FROM orders FROM orders
WHERE volume > 0 WHERE volume > 0 AND filled_volume IS NOT NULL
""").fetchall() """).fetchone()
realized_values = [float(row[2]) for row in trade_rows if row[2] is not None] realized_pnl_usd = (
realized_pnl_usd = sum(realized_values) float(trade_metrics[0]) if trade_metrics and trade_metrics[0] is not None else 0.0
)
total_trades = len(trade_rows) total_trades = (
winning_trades = sum(1 for row in trade_rows if row[2] is not None and float(row[2]) > 0.0) int(trade_metrics[1]) if trade_metrics and trade_metrics[1] is not None else 0
)
winning_trades = (
int(trade_metrics[2]) if trade_metrics and trade_metrics[2] is not None else 0
)
win_rate = winning_trades / total_trades if total_trades > 0 else None win_rate = winning_trades / total_trades if total_trades > 0 else None
durations_seconds = [ avg_trade_duration_seconds = (
(row[1] - row[0]).total_seconds() float(trade_metrics[3]) if trade_metrics and trade_metrics[3] is not None else None
for row in trade_rows
if isinstance(row[0], datetime) and isinstance(row[1], datetime)
]
avg_trade_duration_seconds = fmean(durations_seconds) if durations_seconds else None
opportunity_times = [row[0] for row in opportunity_rows if isinstance(row[0], datetime)]
opportunities_per_minute: float | None
if len(opportunity_times) >= 2:
span_seconds = (max(opportunity_times) - min(opportunity_times)).total_seconds()
opportunities_per_minute = (
len(opportunity_times) / (span_seconds / 60.0)
if span_seconds > 0.0
else float(len(opportunity_times))
) )
elif len(opportunity_times) == 1:
opportunity_count = (
int(opportunity_metrics[0])
if opportunity_metrics is not None and opportunity_metrics[0] is not None
else 0
)
first_detected_at = (
opportunity_metrics[1]
if opportunity_metrics is not None and isinstance(opportunity_metrics[1], datetime)
else None
)
last_detected_at = (
opportunity_metrics[2]
if opportunity_metrics is not None and isinstance(opportunity_metrics[2], datetime)
else None
)
opportunities_per_minute: float | None
if (
opportunity_count >= 2
and first_detected_at is not None
and last_detected_at is not None
):
span_seconds = (last_detected_at - first_detected_at).total_seconds()
opportunities_per_minute = (
opportunity_count / (span_seconds / 60.0)
if span_seconds > 0.0
else float(opportunity_count)
)
elif opportunity_count == 1:
opportunities_per_minute = 60.0 opportunities_per_minute = 60.0
else: else:
opportunities_per_minute = None opportunities_per_minute = None
fill_samples = [ fill_rate = float(fill_metrics[0]) if fill_metrics and fill_metrics[0] is not None else None
float(filled) / float(volume)
for volume, filled in order_rows
if filled is not None and float(volume) > 0.0
]
fill_rate = fmean(fill_samples) if fill_samples else None
latency_p50_seconds = self._percentile(durations_seconds, 50.0) latency_p50_seconds = (
latency_p95_seconds = self._percentile(durations_seconds, 95.0) float(trade_metrics[4]) if trade_metrics and trade_metrics[4] is not None else None
latency_p99_seconds = self._percentile(durations_seconds, 99.0) )
latency_p95_seconds = (
float(trade_metrics[5]) if trade_metrics and trade_metrics[5] is not None else None
)
latency_p99_seconds = (
float(trade_metrics[6]) if trade_metrics and trade_metrics[6] is not None else None
)
return PerformanceMetrics( return PerformanceMetrics(
realized_pnl_usd=realized_pnl_usd, realized_pnl_usd=realized_pnl_usd,
+4
View File
@@ -0,0 +1,4 @@
from arbitrade.perf.guardrails import evaluate_guardrails
from arbitrade.perf.latency import run_latency_profile
__all__ = ["run_latency_profile", "evaluate_guardrails"]
+80
View File
@@ -0,0 +1,80 @@
from __future__ import annotations
def evaluate_guardrails(
*,
baseline: dict[str, object],
current: dict[str, object],
thresholds: dict[str, object],
) -> list[str]:
failures: list[str] = []
baseline_scenarios = baseline.get("scenarios")
current_scenarios = current.get("scenarios")
if not isinstance(baseline_scenarios, dict) or not isinstance(current_scenarios, dict):
return ["invalid profile payload: missing scenarios map"]
default_thresholds = thresholds.get("default")
if not isinstance(default_thresholds, dict):
default_thresholds = {"p95_ms": 2.5, "p99_ms": 3.0}
scenario_thresholds = thresholds.get("scenarios")
if not isinstance(scenario_thresholds, dict):
scenario_thresholds = {}
for scenario, baseline_payload in baseline_scenarios.items():
current_payload = current_scenarios.get(scenario)
if not isinstance(baseline_payload, dict) or not isinstance(current_payload, dict):
failures.append(f"missing scenario in current profile: {scenario}")
continue
baseline_stages = baseline_payload.get("stages")
current_stages = current_payload.get("stages")
if not isinstance(baseline_stages, dict) or not isinstance(current_stages, dict):
failures.append(f"missing stages map for scenario: {scenario}")
continue
scenario_config = scenario_thresholds.get(scenario)
if not isinstance(scenario_config, dict):
scenario_config = {}
for stage, baseline_stage in baseline_stages.items():
current_stage = current_stages.get(stage)
if not isinstance(baseline_stage, dict) or not isinstance(current_stage, dict):
failures.append(f"missing stage in current profile: {scenario}.{stage}")
continue
for percentile_key in ("p95_ms", "p99_ms"):
threshold_ratio_raw = scenario_config.get(
percentile_key,
default_thresholds.get(percentile_key, 3.0),
)
threshold_ratio = (
float(threshold_ratio_raw)
if isinstance(threshold_ratio_raw, int | float)
else 3.0
)
base_value_raw = baseline_stage.get(percentile_key)
current_value_raw = current_stage.get(percentile_key)
if not isinstance(base_value_raw, int | float) or not isinstance(
current_value_raw, int | float
):
failures.append(
f"invalid percentile value: {scenario}.{stage}.{percentile_key}"
)
continue
base_value = float(base_value_raw)
current_value = float(current_value_raw)
# Avoid divide-by-zero while still preserving strict checks.
max_allowed = max(base_value * threshold_ratio, 0.001)
if current_value > max_allowed:
failures.append(
f"latency regression: {scenario}.{stage}.{percentile_key} "
f"current={current_value:.4f}ms "
f"baseline={base_value:.4f}ms "
f"allowed={max_allowed:.4f}ms"
)
return failures
+195
View File
@@ -0,0 +1,195 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from time import perf_counter_ns
import orjson
@dataclass(frozen=True, slots=True)
class PercentileSummary:
p50_ms: float
p95_ms: float
p99_ms: float
@dataclass(frozen=True, slots=True)
class ScenarioProfile:
scenario: str
iterations: int
stages: dict[str, PercentileSummary]
def _percentile(samples: list[float], percentile: float) -> float:
if not samples:
return 0.0
ordered = sorted(samples)
if percentile <= 0.0:
return ordered[0]
if percentile >= 100.0:
return ordered[-1]
rank = (len(ordered) - 1) * (percentile / 100.0)
lower = int(rank)
upper = min(lower + 1, len(ordered) - 1)
weight = rank - lower
return ordered[lower] * (1.0 - weight) + ordered[upper] * weight
def _summarize(samples: list[float]) -> PercentileSummary:
return PercentileSummary(
p50_ms=_percentile(samples, 50.0),
p95_ms=_percentile(samples, 95.0),
p99_ms=_percentile(samples, 99.0),
)
def _ingest_stage(raw_payload: bytes, state: dict[str, float]) -> None:
parsed = orjson.loads(raw_payload)
bids = parsed.get("bids", [])
asks = parsed.get("asks", [])
for price, volume in bids[:4]:
state[str(price)] = float(volume)
for price, volume in asks[:4]:
state[str(price)] = float(volume)
def _detect_stage(values: list[float], cycles: int) -> float:
best = 0.0
size = len(values)
for idx in range(cycles):
a = values[idx % size]
b = values[(idx + 3) % size]
c = values[(idx + 7) % size]
gross = (a / b) * c
net = gross * 0.9975
if net > best:
best = net
return best
def _risk_stage(net_edge: float, capital: float) -> float:
if net_edge < 1.0002:
return 0.0
if capital > 500.0:
capital = 500.0
return capital * min(net_edge - 1.0, 0.02)
def _execution_stage(planned_pnl: float, order_id: int) -> None:
payload = {
"order_id": order_id,
"planned_pnl": planned_pnl,
"legs": [
{"pair": "BTC/USD", "side": "buy", "qty": 0.01},
{"pair": "ETH/BTC", "side": "buy", "qty": 0.1},
{"pair": "ETH/USD", "side": "sell", "qty": 0.1},
],
}
_ = orjson.dumps(payload)
def _run_scenario(
name: str,
iterations: int,
detect_cycles: int,
reconnect_every: int,
) -> ScenarioProfile:
payloads = [
orjson.dumps(
{
"symbol": "BTC/USD",
"bids": [[100000.0 + i, 0.2 + (i % 5) * 0.01] for i in range(12)],
"asks": [[100001.0 + i, 0.2 + (i % 7) * 0.01] for i in range(12)],
}
)
for _ in range(5)
]
value_series = [1.0 + (idx % 31) * 0.0007 for idx in range(128)]
order_state: dict[str, float] = {}
ingest_ms: list[float] = []
detect_ms: list[float] = []
risk_ms: list[float] = []
execution_ms: list[float] = []
end_to_end_ms: list[float] = []
for idx in range(iterations):
start_ns = perf_counter_ns()
payload = payloads[idx % len(payloads)]
t0 = perf_counter_ns()
_ingest_stage(payload, order_state)
if reconnect_every > 0 and idx > 0 and idx % reconnect_every == 0:
order_state.clear()
t1 = perf_counter_ns()
net_edge = _detect_stage(value_series, detect_cycles)
t2 = perf_counter_ns()
planned = _risk_stage(net_edge, capital=100.0 + (idx % 50))
t3 = perf_counter_ns()
_execution_stage(planned, order_id=idx)
t4 = perf_counter_ns()
ingest_ms.append((t1 - t0) / 1_000_000.0)
detect_ms.append((t2 - t1) / 1_000_000.0)
risk_ms.append((t3 - t2) / 1_000_000.0)
execution_ms.append((t4 - t3) / 1_000_000.0)
end_to_end_ms.append((t4 - start_ns) / 1_000_000.0)
return ScenarioProfile(
scenario=name,
iterations=iterations,
stages={
"ingest": _summarize(ingest_ms),
"detect": _summarize(detect_ms),
"risk": _summarize(risk_ms),
"execution": _summarize(execution_ms),
"end_to_end": _summarize(end_to_end_ms),
},
)
def run_latency_profile(iterations: int = 600) -> dict[str, object]:
scenarios: list[Callable[[], ScenarioProfile]] = [
lambda: _run_scenario(
name="book_update_burst",
iterations=iterations,
detect_cycles=32,
reconnect_every=0,
),
lambda: _run_scenario(
name="execution_spike",
iterations=iterations,
detect_cycles=96,
reconnect_every=0,
),
lambda: _run_scenario(
name="reconnect_storm",
iterations=iterations,
detect_cycles=48,
reconnect_every=20,
),
]
result: dict[str, object] = {"iterations": iterations, "scenarios": {}}
scenario_map = result["scenarios"]
assert isinstance(scenario_map, dict)
for scenario in scenarios:
profile = scenario()
scenario_map[profile.scenario] = {
"iterations": profile.iterations,
"stages": {
stage: {
"p50_ms": summary.p50_ms,
"p95_ms": summary.p95_ms,
"p99_ms": summary.p99_ms,
}
for stage, summary in profile.stages.items()
},
}
return result
+49
View File
@@ -0,0 +1,49 @@
from __future__ import annotations
from arbitrade.perf.guardrails import evaluate_guardrails
from arbitrade.perf.latency import run_latency_profile
def test_run_latency_profile_contains_expected_shape() -> None:
profile = run_latency_profile(iterations=50)
scenarios = profile.get("scenarios")
assert isinstance(scenarios, dict)
assert set(scenarios) == {
"book_update_burst",
"execution_spike",
"reconnect_storm",
}
for payload in scenarios.values():
assert isinstance(payload, dict)
stages = payload.get("stages")
assert isinstance(stages, dict)
assert "end_to_end" in stages
def test_evaluate_guardrails_flags_regression() -> None:
baseline = {
"scenarios": {
"book_update_burst": {
"stages": {
"end_to_end": {"p95_ms": 1.0, "p99_ms": 1.0},
}
}
}
}
current = {
"scenarios": {
"book_update_burst": {
"stages": {
"end_to_end": {"p95_ms": 4.0, "p99_ms": 4.0},
}
}
}
}
thresholds = {"default": {"p95_ms": 2.0, "p99_ms": 2.0}}
failures = evaluate_guardrails(baseline=baseline, current=current, thresholds=thresholds)
assert failures
assert "latency regression" in failures[0]
+5 -1
View File
@@ -11,7 +11,11 @@ def test_dashboard_auth_requires_both_fields() -> None:
def test_kraken_api_auth_requires_key_and_secret() -> None: def test_kraken_api_auth_requires_key_and_secret() -> None:
with pytest.raises(ValidationError): with pytest.raises(ValidationError):
Settings(_env_file=None, KRAKEN_API_KEY="key-only") Settings(
_env_file=None,
KRAKEN_API_KEY="key-only",
KRAKEN_API_SECRET="",
)
def test_kraken_permissions_require_query_and_trade() -> None: def test_kraken_permissions_require_query_and_trade() -> None: