from __future__ import annotations from datetime import UTC, datetime, timedelta from pathlib import Path from statistics import fmean from tempfile import gettempdir from time import perf_counter from arbitrade.config.settings import Settings from arbitrade.metrics import MetricsCalculator from arbitrade.storage.pg_store import PgStore async def _python_scan_compute(store: PgStore) -> tuple[float, float | None, float | None]: sql_s = """ SELECT started_at, finished_at, realized_pnl FROM trades WHERE finished_at IS NOT NULL """ sql_d = "SELECT detected_at FROM opportunities" async with store.pool.acquire() as conn: trade_rows = await conn.fetch(sql_s) orows = await conn.fetch(sql_d) realized = sum(float(row[2]) for row in trade_rows if row[2] is not None) durations = [ (row[1] - row[0]).total_seconds() for row in trade_rows if isinstance(row[0], datetime) and isinstance(row[1], datetime) ] avg_duration = fmean(durations) if durations else None times = [row[0] for row in orows if isinstance(row[0], datetime)] if len(times) >= 2: ss = (max(times) - min(times)).total_seconds() opm = len(times) / (ss / 60.0) if ss > 0.0 else float(len(times)) elif len(times) == 1: opm = 60.0 else: opm = None return realized, avg_duration, opm async def _seed_dataset(store: PgStore) -> None: now = datetime.now(UTC) trade_rows: list[tuple[object, ...]] = [] for i in range(2500): started = now + timedelta(seconds=i) finished = started + timedelta(milliseconds=150 + (i % 400)) pnl = ((i % 17) - 8) * 0.25 trade_rows.append( ( f"t{i}", started, finished, "filled", pnl, pnl * 0.9, 100.0, "USD->BTC->ETH->USD", 3, ) ) opportunity_rows: list[tuple[object, ...]] = [] for i in range(5000): detected_at = now + timedelta(milliseconds=200 * i) opportunity_rows.append((detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2))) order_rows: list[tuple[object, ...]] = [] for i in range(3500): order_rows.append( ( f"t{i % 2500}", f"o{i}", 0, "BTC/USD", "buy", 1.0, i, "closed", 0.9, 100.0, "{}", now, ) ) async with store.pool.acquire() as conn: await conn.execute("DELETE FROM trades") await conn.execute("DELETE FROM opportunities") await conn.execute("DELETE FROM orders") await conn.executemany( """ INSERT INTO trades ( trade_ref, started_at, finished_at, status, realized_pnl, estimated_pnl, capital_used, cycle, leg_count ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, trade_rows, ) await conn.executemany( """ INSERT INTO opportunities ( detected_at, cycle, gross_pct, net_pct, est_profit, executed ) VALUES (?, ?, ?, ?, ?, ?) """, opportunity_rows, ) await conn.executemany( """ INSERT INTO orders ( trade_ref, order_ref, leg_index, pair, side, volume, user_ref, status, filled_volume, avg_price, raw_response, recorded_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, order_rows, ) async def main() -> int: db_path = Path(gettempdir()) / "arbitrade_metrics_bench.duckdb" settings = Settings(_env_file=None, DUCKDB_PATH=db_path) store = PgStore(settings) store.migrate() await _seed_dataset(store) calculator = MetricsCalculator(store) for _ in range(3): await _python_scan_compute(store) await calculator.compute() runs = 20 start = perf_counter() for _ in range(runs): await _python_scan_compute(store) python_ms = (perf_counter() - start) * 1000.0 / runs start = perf_counter() for _ in range(runs): await calculator.compute() sql_ms = (perf_counter() - start) * 1000.0 / runs speedup = (python_ms / sql_ms) if sql_ms > 0.0 else 0.0 print(f"python_scan_avg_ms={python_ms:.3f}") print(f"sql_aggregate_avg_ms={sql_ms:.3f}") print(f"speedup_x={speedup:.2f}") return 0 if __name__ == "__main__": raise SystemExit(main())