refactor: migrate database schema from TIMESTAMP to TIMESTAMPTZ for better timezone handling
CI / lint-test-build (push) Successful in 1m25s
CI / lint-test-build (push) Successful in 1m25s
This commit is contained in:
@@ -8,18 +8,19 @@ from time import perf_counter
|
||||
|
||||
from arbitrade.config.settings import Settings
|
||||
from arbitrade.metrics import MetricsCalculator
|
||||
from arbitrade.storage.db import DuckDBStore
|
||||
from arbitrade.storage.pg_store import PgStore
|
||||
|
||||
|
||||
def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float | None]:
|
||||
with store.connect() as conn:
|
||||
trade_rows = conn.execute("""
|
||||
SELECT started_at, finished_at, realized_pnl
|
||||
FROM trades
|
||||
WHERE finished_at IS NOT NULL
|
||||
""").fetchall()
|
||||
sql_d = "SELECT detected_at FROM opportunities"
|
||||
orows = conn.execute(sql_d).fetchall()
|
||||
async def _python_scan_compute(store: PgStore) -> tuple[float, float | None, float | None]:
|
||||
sql_s = """
|
||||
SELECT started_at, finished_at, realized_pnl
|
||||
FROM trades
|
||||
WHERE finished_at IS NOT NULL
|
||||
"""
|
||||
sql_d = "SELECT detected_at FROM opportunities"
|
||||
async with store.pool.acquire() as conn:
|
||||
trade_rows = await conn.fetch(sql_s)
|
||||
orows = await conn.fetch(sql_d)
|
||||
|
||||
realized = sum(float(row[2]) for row in trade_rows if row[2] is not None)
|
||||
durations = [
|
||||
@@ -41,7 +42,7 @@ def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float
|
||||
return realized, avg_duration, opm
|
||||
|
||||
|
||||
def _seed_dataset(store: DuckDBStore) -> None:
|
||||
async def _seed_dataset(store: PgStore) -> None:
|
||||
now = datetime.now(UTC)
|
||||
|
||||
trade_rows: list[tuple[object, ...]] = []
|
||||
@@ -66,7 +67,8 @@ def _seed_dataset(store: DuckDBStore) -> None:
|
||||
opportunity_rows: list[tuple[object, ...]] = []
|
||||
for i in range(5000):
|
||||
detected_at = now + timedelta(milliseconds=200 * i)
|
||||
opportunity_rows.append((detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2)))
|
||||
opportunity_rows.append(
|
||||
(detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2)))
|
||||
|
||||
order_rows: list[tuple[object, ...]] = []
|
||||
for i in range(3500):
|
||||
@@ -87,11 +89,11 @@ def _seed_dataset(store: DuckDBStore) -> None:
|
||||
)
|
||||
)
|
||||
|
||||
with store.connect() as conn:
|
||||
conn.execute("DELETE FROM trades")
|
||||
conn.execute("DELETE FROM opportunities")
|
||||
conn.execute("DELETE FROM orders")
|
||||
conn.executemany(
|
||||
async with store.pool.acquire() as conn:
|
||||
await conn.execute("DELETE FROM trades")
|
||||
await conn.execute("DELETE FROM opportunities")
|
||||
await conn.execute("DELETE FROM orders")
|
||||
await conn.executemany(
|
||||
"""
|
||||
INSERT INTO trades (
|
||||
trade_ref,
|
||||
@@ -107,7 +109,7 @@ def _seed_dataset(store: DuckDBStore) -> None:
|
||||
""",
|
||||
trade_rows,
|
||||
)
|
||||
conn.executemany(
|
||||
await conn.executemany(
|
||||
"""
|
||||
INSERT INTO opportunities (
|
||||
detected_at,
|
||||
@@ -120,7 +122,7 @@ def _seed_dataset(store: DuckDBStore) -> None:
|
||||
""",
|
||||
opportunity_rows,
|
||||
)
|
||||
conn.executemany(
|
||||
await conn.executemany(
|
||||
"""
|
||||
INSERT INTO orders (
|
||||
trade_ref,
|
||||
@@ -141,28 +143,28 @@ def _seed_dataset(store: DuckDBStore) -> None:
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
async def main() -> int:
|
||||
db_path = Path(gettempdir()) / "arbitrade_metrics_bench.duckdb"
|
||||
settings = Settings(_env_file=None, DUCKDB_PATH=db_path)
|
||||
store = DuckDBStore(settings)
|
||||
store = PgStore(settings)
|
||||
store.migrate()
|
||||
_seed_dataset(store)
|
||||
await _seed_dataset(store)
|
||||
|
||||
calculator = MetricsCalculator(store)
|
||||
|
||||
for _ in range(3):
|
||||
_python_scan_compute(store)
|
||||
calculator.compute()
|
||||
await _python_scan_compute(store)
|
||||
await calculator.compute()
|
||||
|
||||
runs = 20
|
||||
start = perf_counter()
|
||||
for _ in range(runs):
|
||||
_python_scan_compute(store)
|
||||
await _python_scan_compute(store)
|
||||
python_ms = (perf_counter() - start) * 1000.0 / runs
|
||||
|
||||
start = perf_counter()
|
||||
for _ in range(runs):
|
||||
calculator.compute()
|
||||
await calculator.compute()
|
||||
sql_ms = (perf_counter() - start) * 1000.0 / runs
|
||||
|
||||
speedup = (python_ms / sql_ms) if sql_ms > 0.0 else 0.0
|
||||
|
||||
Reference in New Issue
Block a user