diff --git a/scripts/benchmark_metrics_compute.py b/scripts/benchmark_metrics_compute.py index 27606f3..e5535f6 100644 --- a/scripts/benchmark_metrics_compute.py +++ b/scripts/benchmark_metrics_compute.py @@ -8,18 +8,19 @@ from time import perf_counter from arbitrade.config.settings import Settings from arbitrade.metrics import MetricsCalculator -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore -def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float | None]: - with store.connect() as conn: - trade_rows = conn.execute(""" - SELECT started_at, finished_at, realized_pnl - FROM trades - WHERE finished_at IS NOT NULL - """).fetchall() - sql_d = "SELECT detected_at FROM opportunities" - orows = conn.execute(sql_d).fetchall() +async def _python_scan_compute(store: PgStore) -> tuple[float, float | None, float | None]: + sql_s = """ + SELECT started_at, finished_at, realized_pnl + FROM trades + WHERE finished_at IS NOT NULL + """ + sql_d = "SELECT detected_at FROM opportunities" + async with store.pool.acquire() as conn: + trade_rows = await conn.fetch(sql_s) + orows = await conn.fetch(sql_d) realized = sum(float(row[2]) for row in trade_rows if row[2] is not None) durations = [ @@ -41,7 +42,7 @@ def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float return realized, avg_duration, opm -def _seed_dataset(store: DuckDBStore) -> None: +async def _seed_dataset(store: PgStore) -> None: now = datetime.now(UTC) trade_rows: list[tuple[object, ...]] = [] @@ -66,7 +67,8 @@ def _seed_dataset(store: DuckDBStore) -> None: opportunity_rows: list[tuple[object, ...]] = [] for i in range(5000): detected_at = now + timedelta(milliseconds=200 * i) - opportunity_rows.append((detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2))) + opportunity_rows.append( + (detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2))) order_rows: list[tuple[object, ...]] = [] for i in range(3500): @@ -87,11 +89,11 @@ def _seed_dataset(store: DuckDBStore) -> None: ) ) - with store.connect() as conn: - conn.execute("DELETE FROM trades") - conn.execute("DELETE FROM opportunities") - conn.execute("DELETE FROM orders") - conn.executemany( + async with store.pool.acquire() as conn: + await conn.execute("DELETE FROM trades") + await conn.execute("DELETE FROM opportunities") + await conn.execute("DELETE FROM orders") + await conn.executemany( """ INSERT INTO trades ( trade_ref, @@ -107,7 +109,7 @@ def _seed_dataset(store: DuckDBStore) -> None: """, trade_rows, ) - conn.executemany( + await conn.executemany( """ INSERT INTO opportunities ( detected_at, @@ -120,7 +122,7 @@ def _seed_dataset(store: DuckDBStore) -> None: """, opportunity_rows, ) - conn.executemany( + await conn.executemany( """ INSERT INTO orders ( trade_ref, @@ -141,28 +143,28 @@ def _seed_dataset(store: DuckDBStore) -> None: ) -def main() -> int: +async def main() -> int: db_path = Path(gettempdir()) / "arbitrade_metrics_bench.duckdb" settings = Settings(_env_file=None, DUCKDB_PATH=db_path) - store = DuckDBStore(settings) + store = PgStore(settings) store.migrate() - _seed_dataset(store) + await _seed_dataset(store) calculator = MetricsCalculator(store) for _ in range(3): - _python_scan_compute(store) - calculator.compute() + await _python_scan_compute(store) + await calculator.compute() runs = 20 start = perf_counter() for _ in range(runs): - _python_scan_compute(store) + await _python_scan_compute(store) python_ms = (perf_counter() - start) * 1000.0 / runs start = perf_counter() for _ in range(runs): - calculator.compute() + await calculator.compute() sql_ms = (perf_counter() - start) * 1000.0 / runs speedup = (python_ms / sql_ms) if sql_ms > 0.0 else 0.0 diff --git a/src/arbitrade/storage/schema_pg.sql b/src/arbitrade/storage/schema_pg.sql index 926577f..3ea1f23 100644 --- a/src/arbitrade/storage/schema_pg.sql +++ b/src/arbitrade/storage/schema_pg.sql @@ -8,7 +8,7 @@ CREATE EXTENSION IF NOT EXISTS pgcrypto; -- ======================================== CREATE TABLE IF NOT EXISTS schema_migrations ( version INTEGER PRIMARY KEY, - applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + applied_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); -- ======================================== @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS config_sections ( id SERIAL PRIMARY KEY, name VARCHAR UNIQUE NOT NULL, description TEXT, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS config_settings ( @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS config_settings ( value_type VARCHAR NOT NULL, is_secret BOOLEAN DEFAULT FALSE, is_runtime_reloadable BOOLEAN DEFAULT FALSE, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_by VARCHAR ); @@ -38,8 +38,8 @@ CREATE TABLE IF NOT EXISTS config_pairings ( quote_asset VARCHAR NOT NULL, enabled BOOLEAN DEFAULT TRUE, source VARCHAR NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, UNIQUE(base_asset, quote_asset) ); @@ -58,7 +58,7 @@ CREATE TABLE IF NOT EXISTS config_backtesting_defaults ( -- ======================================== CREATE TABLE IF NOT EXISTS opportunities ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, - detected_at TIMESTAMP NOT NULL, + detected_at TIMESTAMPTZ NOT NULL, cycle VARCHAR NOT NULL, gross_pct DOUBLE PRECISION, net_pct DOUBLE PRECISION, @@ -69,8 +69,8 @@ CREATE TABLE IF NOT EXISTS opportunities ( CREATE TABLE IF NOT EXISTS trades ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, trade_ref VARCHAR NOT NULL, - started_at TIMESTAMP NOT NULL, - finished_at TIMESTAMP, + started_at TIMESTAMPTZ NOT NULL, + finished_at TIMESTAMPTZ, status VARCHAR NOT NULL, realized_pnl DOUBLE PRECISION, estimated_pnl DOUBLE PRECISION, @@ -92,13 +92,13 @@ CREATE TABLE IF NOT EXISTS orders ( filled_volume DOUBLE PRECISION, avg_price DOUBLE PRECISION, raw_response JSONB, - recorded_at TIMESTAMP NOT NULL + recorded_at TIMESTAMPTZ NOT NULL ); CREATE TABLE IF NOT EXISTS pnl_events ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, trade_ref VARCHAR NOT NULL, - recorded_at TIMESTAMP NOT NULL, + recorded_at TIMESTAMPTZ NOT NULL, kind VARCHAR NOT NULL, pnl_usd DOUBLE PRECISION NOT NULL, source VARCHAR NOT NULL @@ -108,13 +108,13 @@ CREATE TABLE IF NOT EXISTS pnl_events ( -- Snapshots & Monitoring -- ======================================== CREATE TABLE IF NOT EXISTS portfolio_snapshots ( - snapshot_at TIMESTAMP NOT NULL, + snapshot_at TIMESTAMPTZ NOT NULL, balances JSONB, total_value_usd DOUBLE PRECISION ); CREATE TABLE IF NOT EXISTS market_snapshots ( - snapshot_at TIMESTAMP NOT NULL, + snapshot_at TIMESTAMPTZ NOT NULL, symbol VARCHAR NOT NULL, source VARCHAR NOT NULL, payload JSONB NOT NULL, @@ -123,7 +123,7 @@ CREATE TABLE IF NOT EXISTS market_snapshots ( CREATE TABLE IF NOT EXISTS audit_events ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, - occurred_at TIMESTAMP NOT NULL, + occurred_at TIMESTAMPTZ NOT NULL, actor VARCHAR NOT NULL, event_type VARCHAR NOT NULL, decision VARCHAR NOT NULL, @@ -132,7 +132,7 @@ CREATE TABLE IF NOT EXISTS audit_events ( ); CREATE TABLE IF NOT EXISTS runtime_state_snapshots ( - snapshot_at TIMESTAMP NOT NULL, + snapshot_at TIMESTAMPTZ NOT NULL, is_running BOOLEAN NOT NULL, kill_switch_active BOOLEAN NOT NULL, kill_switch_reason VARCHAR, @@ -142,7 +142,7 @@ CREATE TABLE IF NOT EXISTS runtime_state_snapshots ( ); CREATE TABLE IF NOT EXISTS kraken_account_snapshots ( - snapshot_at TIMESTAMP NOT NULL, + snapshot_at TIMESTAMPTZ NOT NULL, fee_tier VARCHAR, maker_fee DOUBLE PRECISION, taker_fee DOUBLE PRECISION, @@ -161,7 +161,31 @@ CREATE TABLE IF NOT EXISTS backtest_jobs ( config JSONB, report JSONB, error VARCHAR, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - started_at TIMESTAMP, - finished_at TIMESTAMP -); \ No newline at end of file + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + started_at TIMESTAMPTZ, + finished_at TIMESTAMPTZ +); + +-- ======================================== +-- Migration: convert legacy TIMESTAMP→TIMESTAMPTZ +-- for databases created before the fix. +-- These are idempotent (no-op when already TIMESTAMPTZ). +-- ======================================== +ALTER TABLE audit_events ALTER COLUMN occurred_at TYPE TIMESTAMPTZ USING occurred_at AT TIME ZONE 'UTC'; +ALTER TABLE runtime_state_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC'; +ALTER TABLE schema_migrations ALTER COLUMN applied_at TYPE TIMESTAMPTZ USING applied_at AT TIME ZONE 'UTC'; +ALTER TABLE config_sections ALTER COLUMN updated_at TYPE TIMESTAMPTZ USING updated_at AT TIME ZONE 'UTC'; +ALTER TABLE config_settings ALTER COLUMN updated_at TYPE TIMESTAMPTZ USING updated_at AT TIME ZONE 'UTC'; +ALTER TABLE config_pairings ALTER COLUMN created_at TYPE TIMESTAMPTZ USING created_at AT TIME ZONE 'UTC'; +ALTER TABLE config_pairings ALTER COLUMN updated_at TYPE TIMESTAMPTZ USING updated_at AT TIME ZONE 'UTC'; +ALTER TABLE opportunities ALTER COLUMN detected_at TYPE TIMESTAMPTZ USING detected_at AT TIME ZONE 'UTC'; +ALTER TABLE trades ALTER COLUMN started_at TYPE TIMESTAMPTZ USING started_at AT TIME ZONE 'UTC'; +ALTER TABLE trades ALTER COLUMN finished_at TYPE TIMESTAMPTZ USING finished_at AT TIME ZONE 'UTC'; +ALTER TABLE orders ALTER COLUMN recorded_at TYPE TIMESTAMPTZ USING recorded_at AT TIME ZONE 'UTC'; +ALTER TABLE pnl_events ALTER COLUMN recorded_at TYPE TIMESTAMPTZ USING recorded_at AT TIME ZONE 'UTC'; +ALTER TABLE portfolio_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC'; +ALTER TABLE market_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC'; +ALTER TABLE kraken_account_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC'; +ALTER TABLE backtest_jobs ALTER COLUMN created_at TYPE TIMESTAMPTZ USING created_at AT TIME ZONE 'UTC'; +ALTER TABLE backtest_jobs ALTER COLUMN started_at TYPE TIMESTAMPTZ USING started_at AT TIME ZONE 'UTC'; +ALTER TABLE backtest_jobs ALTER COLUMN finished_at TYPE TIMESTAMPTZ USING finished_at AT TIME ZONE 'UTC'; \ No newline at end of file