diff --git a/.gitignore b/.gitignore index 342456f..0481260 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,7 @@ dist/ # Local database / runtime data data/*.duckdb.wal data/*.duckdb.tmp +data/arbitrade.duckdb logs/ ops/performance/latest_profile.json diff --git a/README.md b/README.md index f0b4d92..1c34304 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Current stack: - Python 3.12+ - FastAPI + HTMX/Jinja2 -- DuckDB for dev/test/prod +- PostgreSQL for all environments (via asyncpg) - Native Kraken WebSocket planned for market-data hot path - Gitea Actions + Gitea container registry @@ -22,7 +22,7 @@ Bootstrap complete for foundation layer: - typed settings and env loading - structured logging - encrypted secret helpers -- DuckDB connection + base schema +- PostgreSQL connection + full schema migration - FastAPI app with health endpoint - Gitea Actions CI scaffold - Docker / docker-compose scaffold @@ -152,7 +152,11 @@ APP_HOST=0.0.0.0 APP_PORT=9090 LOG_LEVEL=INFO LOG_JSON=true -DUCKDB_PATH=./data/arbitrade.duckdb +PG_HOST=192.168.88.35 +PG_PORT=5432 +PG_DATABASE=arbitrade +PG_USER=arbitrade +PG_PASSWORD=arbitrade FERNET_KEY= KRAKEN_API_KEY= KRAKEN_API_SECRET= @@ -182,15 +186,19 @@ Health endpoints: ## Database -DuckDB used everywhere: local dev, tests, production. +PostgreSQL used everywhere: local dev, tests, production. -Default database file: +Default connection: ```text -./data/arbitrade.duckdb +PG_HOST=192.168.88.35 +PG_PORT=5432 +PG_DATABASE=arbitrade +PG_USER=arbitrade +PG_PASSWORD=arbitrade ``` -Schema bootstrap runs automatically on app startup. +Schema bootstrap runs automatically on app startup via `PgStore.migrate()`. Current tables: @@ -220,7 +228,7 @@ DELETE FROM audit_events WHERE occurred_at < NOW() - INTERVAL 30 DAY; ``` -- Back up archive files and the main DuckDB file together. +- Back up archive files and the PostgreSQL database together. - For production, run archive + backup as scheduled maintenance (cron/task scheduler). ## Quality Checks @@ -342,7 +350,7 @@ Add a persistent volume in Coolify: - Mount Path: `/app/data` -This preserves DuckDB and other runtime artifacts across restarts/redeploys. +This preserves PostgreSQL data and other runtime artifacts across restarts/redeploys. ### 5) Configure environment variables @@ -351,7 +359,11 @@ Add runtime environment variables in Coolify (UI: Environment Variables): - `APP_ENV=prod` - `APP_HOST=0.0.0.0` - `APP_PORT=9090` -- `DUCKDB_PATH=/app/data/arbitrade.duckdb` +- `PG_HOST=postgres` + `PG_PORT=5432` + `PG_DATABASE=arbitrade` + `PG_USER=arbitrade` + `PG_PASSWORD=arbitrade` - `LOG_LEVEL=INFO` - `LOG_JSON=true` - `KRAKEN_API_KEY=...` diff --git a/data/arbitrade.duckdb b/data/arbitrade.duckdb deleted file mode 100644 index 6d0fdde..0000000 Binary files a/data/arbitrade.duckdb and /dev/null differ diff --git a/docs/DEPLOYMENT.md b/docs/DEPLOYMENT.md index 91842f6..f3b8a42 100644 --- a/docs/DEPLOYMENT.md +++ b/docs/DEPLOYMENT.md @@ -7,16 +7,16 @@ This guide provides two supported deployment paths for Arbitrade on Coolify: Reference docs: -- Coolify Applications: https://coolify.io/docs/applications -- Coolify Build Packs: https://coolify.io/docs/applications/build-packs -- Coolify Dockerfile Build Pack: https://coolify.io/docs/applications/build-packs/dockerfile -- Coolify Nixpacks Build Pack: https://coolify.io/docs/applications/build-packs/nixpacks -- Coolify CI/CD (Git providers): https://coolify.io/docs/applications/ci-cd -- Coolify Gitea integration: https://coolify.io/docs/applications/ci-cd/gitea/integration -- Coolify environment variables: https://coolify.io/docs/knowledge-base/environment-variables -- Coolify persistent storage: https://coolify.io/docs/knowledge-base/persistent-storage -- Coolify health checks: https://coolify.io/docs/knowledge-base/health-checks -- Coolify Docker registry credentials: https://coolify.io/docs/knowledge-base/docker/registry +- [Coolify Applications](https://coolify.io/docs/applications) +- [Coolify Build Packs](https://coolify.io/docs/applications/build-packs) +- [Coolify Dockerfile Build Pack](https://coolify.io/docs/applications/build-packs/dockerfile) +- [Coolify Nixpacks Build Pack](https://coolify.io/docs/applications/build-packs/nixpacks) +- [Coolify CI/CD (Git providers)](https://coolify.io/docs/applications/ci-cd) +- [Coolify Gitea integration](https://coolify.io/docs/applications/ci-cd/gitea/integration) +- [Coolify environment variables](https://coolify.io/docs/knowledge-base/environment-variables) +- [Coolify persistent storage](https://coolify.io/docs/knowledge-base/persistent-storage) +- [Coolify health checks](https://coolify.io/docs/knowledge-base/health-checks) +- [Coolify Docker registry credentials](https://coolify.io/docs/knowledge-base/docker/registry) ## Common Runtime Configuration @@ -32,14 +32,14 @@ Use these values in both deployment modes. - Add a persistent volume - Mount path: `/app/data` -- Set DB path to: `DUCKDB_PATH=/app/data/arbitrade.duckdb` +- Set PG connection: `PG_HOST=postgres`, `PG_PORT=5432`, `PG_DATABASE=arbitrade`, `PG_USER=arbitrade`, `PG_PASSWORD=arbitrade` ### Required environment variables - `APP_ENV=prod` - `APP_HOST=0.0.0.0` - `APP_PORT=9090` -- `DUCKDB_PATH=/app/data/arbitrade.duckdb` +- `PG_DATABASE=arbitrade` - `LOG_LEVEL=INFO` - `LOG_JSON=true` - `KRAKEN_API_KEY=` @@ -135,7 +135,7 @@ Update flow for new releases: - Ensure the deployed image/wheel includes package templates under `arbitrade/web/templates/*`. - If you build from source, do not remove packaged template files under `src/arbitrade/web/templates`. - DB resets after deploy: -- Confirm persistent mount exists at `/app/data` and `DUCKDB_PATH` points there. +- Confirm PostgreSQL is reachable at `PG_HOST`. - Registry pull fails: - Re-check Docker registry credentials in Coolify. - App starts but unavailable externally: diff --git a/docs/architecture/README.md b/docs/architecture/README.md index c7057f4..25ec703 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -8,7 +8,7 @@ Primary goals: - Detect and execute triangular opportunities on Kraken with fee/slippage-aware math. - Keep hot-path latency low with incremental order-book updates and event-driven scoring. -- Persist operational data in DuckDB for dev, test, and prod. +- Persist operational data in PostgreSQL for all environments. - Provide operator controls, audit trail, and alerting through a server-rendered dashboard. - Support backtesting, parameter sweeps, and deferred experimental strategy work behind feature flags. @@ -17,7 +17,7 @@ Primary goals: - Python 3.12+ runtime. - Native Kraken WebSocket on the hot path. - HTMX + Jinja2 UI, no SPA build step. -- DuckDB everywhere. +- PostgreSQL everywhere. - Self-hosted Gitea Actions CI and Gitea registry. - Windows development support. - Secrets must stay out of the repository. @@ -32,7 +32,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades - Kraken REST + WebSocket provide market data and execution. - FastAPI serves HTML fragments, JSON endpoints, and SSE streams. -- DuckDB stores trades, opportunities, snapshots, audit events, and runtime state. +- PostgreSQL stores trades, opportunities, snapshots, audit events, and runtime state. - Coolify can deploy the published image using environment variables and persistent storage. ## 4. Solution Strategy @@ -55,7 +55,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades - `execution/` - multi-leg trade sequencing. - `backtesting/` - replay engine, parameter sweep, experiment scaffolds. - `strategy/` - experimental strategy modules such as stat-arb. -- `storage/` - DuckDB schema and repositories. +- `storage/` - PostgreSQL schema and repositories. - `alerting/` - multi-channel notifications. - `runtime/` - startup recovery and graceful shutdown. @@ -64,7 +64,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades - `fastapi`, `uvicorn`, `jinja2`, `htmx`-driven templates. - `orjson` for low-alloc parsing. - `sortedcontainers` for book state. -- `duckdb` for persistence and analytics. +- `asyncpg` for PostgreSQL persistence. - `pydantic` / `pydantic-settings` for typed configuration. - `cryptography` / keyring for secret handling. @@ -77,7 +77,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades 3. Incremental detector scores impacted cycles. 4. Risk manager validates the opportunity. 5. Execution sequencer places legs if approved. -6. Trades and snapshots persist to DuckDB. +7. Trades and snapshots persist to PostgreSQL. 7. Dashboard and alerts reflect state changes. ### 6.2 Dashboard Control Flow @@ -112,7 +112,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades - Deploy from the published image. - Configure runtime via environment variables. -- Mount persistent storage at `/app/data` for DuckDB. +- Connect to PostgreSQL at configured `PG_HOST`. ## 8. Cross-Cutting Concepts @@ -126,7 +126,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades ## 9. Architecture Decisions - Native Kraken WS instead of a generic exchange abstraction on the hot path. -- DuckDB as the single database engine. +- PostgreSQL as the single database engine. - HTMX + Jinja2 instead of SPA frontend. - Backtesting reuses production detector/risk/execution logic. - Experimental stat-arb stays behind a feature flag. @@ -152,5 +152,5 @@ The bot consumes Kraken market data, detects opportunities, and executes trades - WS: WebSocket. - HTMX: HTML-over-the-wire UI library. - SSE: Server-Sent Events. -- DUCKDB: Embedded analytical database used for all environments. +- PGSQL: PostgreSQL database used for all environments. - Stat arb: Statistical arbitrage, currently experimental and feature-flagged. diff --git a/docs/architecture/repositories.md b/docs/architecture/repositories.md new file mode 100644 index 0000000..4d5b216 --- /dev/null +++ b/docs/architecture/repositories.md @@ -0,0 +1,88 @@ +# Database Layer: Schema & Repositories + +> **Database engine**: PostgreSQL 15+ on `192.168.88.35` +> **Driver**: `asyncpg` (async connection pool) +> **Store class**: `PgStore` in `src/arbitrade/storage/pg_store.py` + +## Connection Lifecycle + +```txt +FastAPI lifespan (create_app) + └─ PgStore.start() # creates asyncpg connection pool + └─ PgStore.migrate() # reads schema_pg.sql, creates tables + └─ ... application runs ... + └─ PgStore.stop() # closes the pool +``` + +All repository classes accept a `PgStore` instance and acquire connections +via `async with self._store.pool.acquire() as conn:`. + +## Schema + +Defined in `src/arbitrade/storage/schema_pg.sql`. 15 tables: + +| Table | Purpose | PK | Notes | +| ----------------------------- | -------------------------- | --------------- | ---------------------------------------- | +| `schema_migrations` | Version tracking | `version` | Single-row per version | +| `config_sections` | Config section metadata | `id` (SERIAL) | `name` UNIQUE | +| `config_settings` | Key-value config store | `key` (VARCHAR) | JSON-serialized values | +| `config_pairings` | Currency pairs to monitor | `id` (SERIAL) | `(base_asset, quote_asset)` UNIQUE | +| `config_backtesting_defaults` | Default backtest params | `id` (SERIAL) | Singleton via `ORDER BY id DESC LIMIT 1` | +| `opportunities` | Detected arb opportunities | `id` (UUID) | | +| `trades` | Executed trades | `id` (UUID) | | +| `orders` | Individual leg orders | `id` (UUID) | | +| `pnl_events` | P&L event stream | `id` (UUID) | | +| `portfolio_snapshots` | Balance snapshots | — | Append-only | +| `market_snapshots` | Raw order-book snapshots | — | Append-only | +| `audit_events` | Audit trail | `id` (UUID) | | +| `runtime_state_snapshots` | Runtime state history | — | Append-only | +| `kraken_account_snapshots` | Fee tier + account data | — | Append-only | +| `backtest_jobs` | Backtest job records | `id` (UUID) | | + +JSON columns use `JSONB` for indexability. UUID primary keys use +`gen_random_uuid()` (requires `pgcrypto` extension). + +## Repository Classes + +All in `src/arbitrade/storage/repositories.py`. Every method is `async def`. + +| Class | Key Methods | Used By | +| ------------------------------------- | ---------------------------------------------------------- | --------------------------- | +| `MarketSnapshotRepository` | `insert()` | `AsyncMarketSnapshotWriter` | +| `OpportunityRepository` | `insert()` | `AsyncOpportunityWriter` | +| `TradeRepository` | `insert()` | `AsyncExecutionWriter` | +| `OrderRepository` | `insert()` | `AsyncExecutionWriter` | +| `PnLRepository` | `insert()` | `AsyncExecutionWriter` | +| `AuditRepository` | `insert()`, `list_recent()` | API routes, lifecycle | +| `RuntimeStateRepository` | `insert()`, `latest()` | Lifecycle, API | +| `ConfigSectionRepository` | `create_section()`, `get_section()`, `list_sections()` | Config service | +| `ConfigSettingRepository` | Full CRUD + `get_latest_updated_at()` | Config service | +| `ConfigPairingRepository` | Full CRUD + `upsert_pairing()`, `list_pairings()` | Feeds, pairing sync | +| `ConfigBacktestingDefaultsRepository` | `create_defaults()`, `get_defaults()`, `update_defaults()` | Config service | +| `KrakenAccountSnapshotRepository` | `insert_snapshot()`, `latest_snapshot()` | Fee sync loop | +| `BacktestJobRepository` | Full CRUD | Backtesting UI + worker | + +## Async Writers + +Three background writer tasks buffer high-frequency writes: + +- **`AsyncExecutionWriter`** — trades/orders/P&L queue +- **`AsyncMarketSnapshotWriter`** — order-book snapshot queue +- **`AsyncOpportunityWriter`** — opportunity event queue + +Each uses an `asyncio.Queue` and drains it in a background task with +`await repo.insert(...)`. + +## Integration Tests + +`tests/integration/test_postgresql_schema.py` verifies: + +- Connection to PostgreSQL server +- `pgcrypto` extension availability +- All 15 tables exist after migration +- Migration is idempotent +- Correct columns per table +- Primary keys and unique constraints +- Tables start empty +- Simple INSERT/SELECT round-trip +- `ON CONFLICT ... DO UPDATE` on config_pairings diff --git a/ops/performance/README.md b/ops/performance/README.md index 73e4817..0311a06 100644 --- a/ops/performance/README.md +++ b/ops/performance/README.md @@ -39,7 +39,7 @@ Key end-to-end latency baselines from `latency_baseline.json`: ## Optimization Note -`MetricsCalculator.compute()` was optimized to use DuckDB SQL aggregations and quantiles, reducing Python-side row scans. +`MetricsCalculator.compute()` uses PostgreSQL SQL aggregations and percentiles, reducing Python-side row scans. Measured benchmark (`scripts/benchmark_metrics_compute.py`): diff --git a/pyproject.toml b/pyproject.toml index 3b6e2f9..cee7bde 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ pretty = true mypy_path = "src" [[tool.mypy.overrides]] -module = ["duckdb", "keyring", "sortedcontainers"] +module = ["asyncpg", "keyring", "sortedcontainers"] ignore_missing_imports = true [tool.pytest.ini_options] diff --git a/requirements/latest-dev.in b/requirements/latest-dev.in index 24dd450..bf02b80 100644 --- a/requirements/latest-dev.in +++ b/requirements/latest-dev.in @@ -1,4 +1,5 @@ # Unpinned dev dependencies (latest available) +asyncpg-stubs black mypy pre-commit diff --git a/requirements/latest-runtime.in b/requirements/latest-runtime.in index f348e2c..968178f 100644 --- a/requirements/latest-runtime.in +++ b/requirements/latest-runtime.in @@ -1,6 +1,6 @@ # Unpinned runtime dependencies (latest available) +asyncpg cryptography -duckdb fastapi httptools httpx diff --git a/src/arbitrade/api/app.py b/src/arbitrade/api/app.py index 9a81d89..02f8ff0 100644 --- a/src/arbitrade/api/app.py +++ b/src/arbitrade/api/app.py @@ -25,15 +25,15 @@ from arbitrade.market_data.feed_builder import ( ) from arbitrade.metrics import MetricsCalculator from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter from arbitrade.storage.opportunities import AsyncOpportunityWriter -from arbitrade.storage.repositories import AuditRepository, RuntimeStateRepository +from arbitrade.storage.repositories import AuditRepository, RuntimeStateRepository, MarketSnapshotRepository, OpportunityRepository _LOG = structlog.get_logger(__name__) -def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task[None] | None: +async def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task[None] | None: """Create and start a MarketDataFeed task from enabled pairings. If kill_switch_only=True, only create a kill-switch-bound stub (no detector/feed). @@ -45,14 +45,14 @@ def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task controls = app.state.dashboard_controls # Build detector from enabled pairings - detector = build_detector_from_enabled_pairings( + detector = await build_detector_from_enabled_pairings( db, fee_rate=0.0, # will be overridden by fee sync max_depth_levels=controls.strategy_max_depth_levels, min_profit_threshold=controls.strategy_profit_threshold, ) - symbols = get_enabled_pair_symbols(db) + symbols = await get_enabled_pair_symbols(db) if not symbols and not kill_switch_only: _LOG.warning("no_enabled_pair_symbols_feed_not_started") return None @@ -64,8 +64,8 @@ def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task ws_client.set_subscribed_symbols(symbols) - snapshot_writer = AsyncMarketSnapshotWriter(db) - opportunity_writer = AsyncOpportunityWriter(db) + snapshot_writer = AsyncMarketSnapshotWriter(MarketSnapshotRepository(db)) + opportunity_writer = AsyncOpportunityWriter(OpportunityRepository(db)) feed = MarketDataFeed( ws_client=ws_client, @@ -90,8 +90,8 @@ def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task def create_app(settings: Settings) -> FastAPI: configure_logging(settings.log_level, settings.log_json) - db = DuckDBStore(settings) - db.migrate() + db = PgStore(settings) + kraken_client = KrakenRestClient(settings) fee_sync_stop_event = asyncio.Event() pairing_sync_stop_event = asyncio.Event() @@ -101,6 +101,9 @@ def create_app(settings: Settings) -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: + await app.state.store.start() + await app.state.store.migrate() + await app.state.configuration_service.load_database_settings() await restore_runtime_state(app) fee_sync_task = asyncio.create_task( run_fee_sync_loop( @@ -123,7 +126,7 @@ def create_app(settings: Settings) -> FastAPI: name="backtest_worker", ) # Start market data feed from enabled pairings - _start_feed(app) + await _start_feed(app) app.state.fee_sync_task = fee_sync_task app.state.pairing_sync_task = pairing_sync_task app.state.backtest_task = backtest_task @@ -161,6 +164,7 @@ def create_app(settings: Settings) -> FastAPI: pass await kraken_client.close() await graceful_shutdown(app) + await app.state.store.stop() app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan) app.state.settings = settings diff --git a/src/arbitrade/api/routes.py b/src/arbitrade/api/routes.py index 8fe50b1..69b7618 100644 --- a/src/arbitrade/api/routes.py +++ b/src/arbitrade/api/routes.py @@ -9,7 +9,6 @@ from pathlib import Path from typing import cast from urllib.parse import parse_qs -import duckdb import orjson from fastapi import APIRouter, Depends, Request, Response from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse @@ -21,6 +20,7 @@ from arbitrade.api.control_state import DashboardControlState from arbitrade.config.pairing_sync import sync_pairings_from_kraken from arbitrade.config.service import ConfigPairing from arbitrade.detection.graph import CurrencyGraph, TriangularCycle +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import ( AuditRecord, AuditRepository, @@ -35,7 +35,8 @@ public_router = APIRouter() def _resolve_templates_directory() -> str: # Support source layout, Docker runtime (/app), and installed package data. - source_layout_path = Path(__file__).resolve().parents[3] / "web" / "templates" + source_layout_path = Path( + __file__).resolve().parents[3] / "web" / "templates" if source_layout_path.is_dir(): return str(source_layout_path) @@ -44,7 +45,8 @@ def _resolve_templates_directory() -> str: return str(docker_runtime_path) try: - package_path = resources.files("arbitrade").joinpath("web", "templates") + package_path = resources.files( + "arbitrade").joinpath("web", "templates") if package_path.is_dir(): return str(package_path) except (ModuleNotFoundError, AttributeError): @@ -64,8 +66,8 @@ def _format_metric(value: float | None, *, precision: int = 2, suffix: str = "") return f"{value:.{precision}f}{suffix}" -def _dashboard_metrics(request: Request) -> dict[str, str]: - metrics = request.app.state.metrics.compute() +async def _dashboard_metrics(request: Request) -> dict[str, str]: + metrics = await request.app.state.metrics.compute() return { "realized_pnl": _format_metric(metrics.realized_pnl_usd, precision=2, suffix=" USD"), "win_rate": _format_metric( @@ -91,57 +93,85 @@ def _dashboard_metrics(request: Request) -> dict[str, str]: } -def _table_columns(conn: duckdb.DuckDBPyConnection, table_name: str) -> set[str]: - rows = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall() - return {str(row[1]) for row in rows} - - -def _dashboard_overview(request: Request) -> dict[str, object]: - store = request.app.state.store - with store.connect() as conn: - trade_columns = _table_columns(conn, "trades") - trade_ref_expr = "trade_ref" if "trade_ref" in trade_columns else "CAST(id AS VARCHAR)" - cycle_expr = "cycle" if "cycle" in trade_columns else "NULL" - if "finished_at" in trade_columns: - open_trade_filter = "finished_at IS NULL" - else: - open_trade_filter = "LOWER(status) NOT IN ('filled', 'closed', 'cancelled', 'canceled')" - - portfolio_row = conn.execute(""" +async def _dashboard_overview(request: Request) -> dict[str, object]: + store: PgStore = request.app.state.store + async with store.pool.acquire() as conn: + portfolio_row = await conn.fetchrow(""" SELECT balances, total_value_usd FROM portfolio_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """).fetchone() - open_trades = conn.execute(f""" - SELECT {trade_ref_expr}, status, started_at, {cycle_expr} + """) + open_trades = await conn.fetch(""" + SELECT trade_ref, status, started_at, cycle FROM trades - WHERE {open_trade_filter} + WHERE finished_at IS NULL ORDER BY started_at DESC LIMIT 5 - """).fetchall() - rpnl = conn.execute(""" + """) + rpnl = await conn.fetchrow(""" SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) FROM trades - """).fetchone() - latest_opportunities = conn.execute(""" + """) + latest_opportunities = await conn.fetch(""" SELECT cycle, net_pct, est_profit, detected_at FROM opportunities ORDER BY detected_at DESC LIMIT 5 - """).fetchall() + """) + + # Query equity from kraken_account_snapshots + equity_value = "—" + try: + equity_row = await conn.fetchrow(""" + SELECT trade_balance_raw + FROM kraken_account_snapshots + ORDER BY snapshot_at DESC + LIMIT 1 + """) + if equity_row is not None and equity_row["trade_balance_raw"] is not None: + tb_raw = equity_row["trade_balance_raw"] + if isinstance(tb_raw, str): + tb_raw = json.loads(tb_raw) + if isinstance(tb_raw, dict): + eb = tb_raw.get("eb") + equity_value = f"{float(eb):.2f} USD" if eb is not None else "—" + except Exception: + pass + + # Query latest Kraken account snapshot for fee info + fee_tier = "—" + maker_fee = "—" + taker_fee = "—" + thirty_day_volume = "—" + try: + acct_row = await conn.fetchrow(""" + SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume + FROM kraken_account_snapshots + ORDER BY snapshot_at DESC + LIMIT 1 + """) + if acct_row is not None: + fee_tier = str( + acct_row["fee_tier"]) if acct_row["fee_tier"] is not None else "—" + maker_fee = f"{float(acct_row['maker_fee']):.4%}" if acct_row["maker_fee"] is not None else "—" + taker_fee = f"{float(acct_row['taker_fee']):.4%}" if acct_row["taker_fee"] is not None else "—" + thirty_day_volume = f"{float(acct_row['thirty_day_volume']):.2f}" if acct_row[ + "thirty_day_volume"] is not None else "—" + except Exception: + pass balances_value = "—" total_value = "—" - equity_value = "—" if portfolio_row is not None: - balances_raw, total_value_raw = portfolio_row + balances_raw = portfolio_row["balances"] + total_value_raw = portfolio_row["total_value_usd"] if isinstance(balances_raw, str) and balances_raw: try: parsed = json.loads(balances_raw) if isinstance(parsed, dict): - # Filter out zero balances, show non-zero as "AMT ASSET" - non_zero = {k: float(v) for k, v in parsed.items() if float(v) > 0.0} + non_zero = {k: float(v) + for k, v in parsed.items() if float(v) > 0.0} if non_zero: balances_value = "
".join( f"{v:.6g} {k}" for k, v in sorted(non_zero.items()) @@ -157,63 +187,25 @@ def _dashboard_overview(request: Request) -> dict[str, object]: if total_value_raw is not None: total_value = f"{float(total_value_raw):.2f} USD" - # Query equity from kraken_account_snapshots - try: - equity_row = conn.execute(""" - SELECT trade_balance_raw - FROM kraken_account_snapshots - ORDER BY snapshot_at DESC - LIMIT 1 - """).fetchone() - if equity_row is not None and equity_row[0] is not None: - tb_raw = equity_row[0] - if isinstance(tb_raw, str): - tb_raw = json.loads(tb_raw) - if isinstance(tb_raw, dict): - eb = tb_raw.get("eb") - equity_value = f"{float(eb):.2f} USD" if eb is not None else "—" - except Exception: - pass - open_trade_rows = [ { - "trade_ref": str(row[0]), - "status": str(row[1]), - "started_at": row[2].isoformat() if isinstance(row[2], datetime) else "—", - "cycle": str(row[3]) if row[3] is not None else "—", + "trade_ref": str(r["trade_ref"]), + "status": str(r["status"]), + "started_at": r["started_at"].isoformat() if isinstance(r["started_at"], datetime) else "—", + "cycle": str(r["cycle"]) if r["cycle"] is not None else "—", } - for row in open_trades + for r in open_trades ] opportunity_rows = [ { - "cycle": str(row[0]), - "net_pct": f"{float(row[1]):.2f}%" if row[1] is not None else "—", - "est_profit": f"{float(row[2]):.2f} USD" if row[2] is not None else "—", - "detected_at": row[3].isoformat() if isinstance(row[3], datetime) else "—", + "cycle": str(r["cycle"]), + "net_pct": f"{float(r['net_pct']):.2f}%" if r["net_pct"] is not None else "—", + "est_profit": f"{float(r['est_profit']):.2f} USD" if r["est_profit"] is not None else "—", + "detected_at": r["detected_at"].isoformat() if isinstance(r["detected_at"], datetime) else "—", } - for row in latest_opportunities + for r in latest_opportunities ] - # Query latest Kraken account snapshot for fee info - fee_tier = "—" - maker_fee = "—" - taker_fee = "—" - thirty_day_volume = "—" - try: - acct_row = conn.execute(""" - SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume - FROM kraken_account_snapshots - ORDER BY snapshot_at DESC - LIMIT 1 - """).fetchone() - if acct_row is not None: - fee_tier = str(acct_row[0]) if acct_row[0] is not None else "—" - maker_fee = f"{float(acct_row[1]):.4%}" if acct_row[1] is not None else "—" - taker_fee = f"{float(acct_row[2]):.4%}" if acct_row[2] is not None else "—" - thirty_day_volume = f"{float(acct_row[3]):.2f}" if acct_row[3] is not None else "—" - except Exception: - pass - return { "status": "live", "generated_at": datetime.now(UTC).isoformat(), @@ -232,26 +224,28 @@ def _dashboard_overview(request: Request) -> dict[str, object]: } -def _dashboard_charts(request: Request) -> dict[str, object]: - store = request.app.state.store - with store.connect() as conn: - opportunity_rows = conn.execute(""" +async def _dashboard_charts(request: Request) -> dict[str, object]: + store: PgStore = request.app.state.store + async with store.pool.acquire() as conn: + rows = await conn.fetch(""" SELECT detected_at, cycle, net_pct, est_profit FROM opportunities ORDER BY detected_at DESC LIMIT 10 - """).fetchall() + """) - cr = list(reversed(opportunity_rows)) + cr = list(reversed(rows)) labels = [] for index, row in enumerate(cr): - if isinstance(row[0], datetime): - labels.append(row[0].isoformat()) + if isinstance(row["detected_at"], datetime): + labels.append(row["detected_at"].isoformat()) else: labels.append(f"opportunity-{index + 1}") - np = [float(row[2]) if row[2] is not None else 0.0 for row in cr] - ep = [float(row[3]) if row[3] is not None else 0.0 for row in cr] - cycles = [str(row[1]) for row in cr] + np = [float(row["net_pct"]) if row["net_pct"] + is not None else 0.0 for row in cr] + ep = [float(row["est_profit"]) if row["est_profit"] + is not None else 0.0 for row in cr] + cycles = [str(row["cycle"]) for row in cr] return { "labels": labels, @@ -272,7 +266,7 @@ def _audit_repository(request: Request) -> AuditRepository | None: return cast(AuditRepository | None, repository) -def _record_audit( +async def _record_audit( request: Request, *, actor: str, @@ -288,7 +282,7 @@ def _record_audit( ret_pl = {str(key): payload[key] for key in payload} else: ret_pl = None - repository.insert( + await repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor=actor, @@ -300,7 +294,7 @@ def _record_audit( ) -def _dashboard_audit(request: Request, *, limit: int = 15) -> dict[str, object]: +async def _dashboard_audit(request: Request, *, limit: int = 15) -> dict[str, object]: repository = _audit_repository(request) if repository is None: return { @@ -308,7 +302,7 @@ def _dashboard_audit(request: Request, *, limit: int = 15) -> dict[str, object]: "generated_at": datetime.now(UTC).isoformat(), } - records = repository.list_recent(limit=limit) + records = await repository.list_recent(limit=limit) entries: list[dict[str, str]] = [] for record in records: payload_text = "—" @@ -355,7 +349,7 @@ def _alert_status_snapshot(request: Request) -> dict[str, object]: } -def _dashboard_config_context(request: Request) -> dict[str, object]: +async def _dashboard_config_context(request: Request) -> dict[str, object]: ctl = _dashboard_controls_state(request) rs = request.app.state.settings max_trade_capital_usd = ( @@ -416,7 +410,8 @@ def _dashboard_config_context(request: Request) -> dict[str, object]: max_consecutive_failures_value = ( str(rs.max_consecutive_failures) if rs.max_consecutive_failures is not None else "" ) - strategy_stat_arb_enabled = bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)) + strategy_stat_arb_enabled = bool( + getattr(rs, "strategy_enable_stat_arb_experiment", False)) return { # Runtime @@ -537,7 +532,8 @@ def _dashboard_controls(request: Request) -> dict[str, object]: alerts_last_channel_results = [ str(item) for item in cast(list[object], alert_status.get("last_channel_results", [])) ] - strategy_stat_arb_enabled = bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)) + strategy_stat_arb_enabled = bool( + getattr(rs, "strategy_enable_stat_arb_experiment", False)) return { "execution_status": "running" if ctl.is_running else "stopped", @@ -611,7 +607,7 @@ def _normalize_fee_profile(profile: str) -> str: return profile.strip().lower().replace("-", "_") -def _fee_rate_for_profile( +async def _fee_rate_for_profile( profile: str, custom_fee_rate: float | None, request: Request | None = None, @@ -628,9 +624,9 @@ def _fee_rate_for_profile( if normalized == "api": if request is None: raise ValueError("api fee profile requires request context") - store = request.app.state.store + store: PgStore = request.app.state.store repo = KrakenAccountSnapshotRepository(store) - latest = repo.latest_snapshot() + latest = await repo.latest_snapshot() if latest is not None and latest.maker_fee is not None: return latest.maker_fee # Fallback to standard if no snapshot yet @@ -700,11 +696,11 @@ def _build_cycles_from_events( return graph.index_cycles_by_pair(cycles), sorted(symbols) -def _recent_backtest_reports(request: Request) -> list[dict[str, object]]: +async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]: """Fetch recent backtest jobs from DB.""" - store = request.app.state.store + store: PgStore = request.app.state.store repo = BacktestJobRepository(store) - jobs = repo.list_jobs(limit=20) + jobs = await repo.list_jobs(limit=20) return [ { "job_id": j.id or "", @@ -719,7 +715,7 @@ def _recent_backtest_reports(request: Request) -> list[dict[str, object]]: ] -def _backtesting_panel_context( +async def _backtesting_panel_context( request: Request, *, status: str = "idle", @@ -742,7 +738,7 @@ def _backtesting_panel_context( if defaults is not None: default_values.update(defaults) - reports = _recent_backtest_reports(request) + reports = await _recent_backtest_reports(request) latest = latest_report or (reports[0] if reports else None) return { @@ -814,10 +810,11 @@ async def dashboard_backtesting_page(request: Request) -> HTMLResponse: @router.get("/dashboard/fragment/backtesting", response_class=HTMLResponse) async def dashboard_backtesting_fragment(request: Request) -> HTMLResponse: + d_context = await _dashboard_config_context(request) return templates.TemplateResponse( request=request, name="partials/backtesting_panel.html", - context={"request": request, **_backtesting_panel_context(request)}, + context={"request": request, **d_context}, ) @@ -826,7 +823,7 @@ async def dashboard_backtesting_pairings_fragment(request: Request) -> HTMLRespo """HTMX fragment: pairing checkboxes for backtest form.""" store = request.app.state.store repo = ConfigPairingRepository(store) - pairings = repo.list_pairings() + pairings = await repo.list_pairings() pairings.sort(key=lambda p: (p.base_asset, p.quote_asset)) return templates.TemplateResponse( request=request, @@ -840,7 +837,7 @@ async def dashboard_metrics(request: Request) -> HTMLResponse: return templates.TemplateResponse( request=request, name="partials/metrics.html", - context={"request": request, **_dashboard_metrics(request)}, + context={"request": request, **(await _dashboard_metrics(request))}, ) @@ -849,7 +846,7 @@ async def dashboard_overview(request: Request) -> HTMLResponse: return templates.TemplateResponse( request=request, name="partials/overview.html", - context={"request": request, **_dashboard_overview(request)}, + context={"request": request, **(await _dashboard_overview(request))}, ) @@ -867,7 +864,7 @@ async def dashboard_charts(request: Request) -> HTMLResponse: return templates.TemplateResponse( request=request, name="partials/charts.html", - context={"request": request, **_dashboard_charts(request)}, + context={"request": request, **(await _dashboard_charts(request))}, ) @@ -879,7 +876,7 @@ async def dashboard_audit_page(request: Request) -> HTMLResponse: context={ "title": "Arbitrade Audit Trail", "request": request, - **_dashboard_audit(request), + **(await _dashboard_audit(request)), }, ) @@ -889,7 +886,7 @@ async def dashboard_audit_fragment(request: Request) -> HTMLResponse: return templates.TemplateResponse( request=request, name="partials/audit.html", - context={"request": request, **_dashboard_audit(request)}, + context={"request": request, **(await _dashboard_audit(request))}, ) @@ -898,12 +895,13 @@ async def dashboard_audit(request: Request) -> HTMLResponse: return templates.TemplateResponse( request=request, name="partials/audit.html", - context={"request": request, **_dashboard_audit(request)}, + context={"request": request, **(await _dashboard_audit(request))}, ) @router.get("/dashboard/config", response_class=HTMLResponse) async def dashboard_config_page(request: Request) -> HTMLResponse: + d_context = await _dashboard_config_context(request) return templates.TemplateResponse( request=request, name="config.html", @@ -911,17 +909,18 @@ async def dashboard_config_page(request: Request) -> HTMLResponse: "title": "Arbitrade Configuration", "request": request, "config_endpoint": "/dashboard/control/config", - **_dashboard_config_context(request), + **d_context, }, ) @router.get("/dashboard/fragment/config", response_class=HTMLResponse) async def dashboard_config_fragment(request: Request) -> HTMLResponse: + d_context = await _dashboard_config_context(request) return templates.TemplateResponse( request=request, name="partials/config.html", - context={"request": request, **_dashboard_config_context(request)}, + context={"request": request, **d_context}, ) @@ -932,7 +931,7 @@ async def dashboard_alert_status(request: Request) -> JSONResponse: @router.get("/dashboard/api/audit/recent", response_class=JSONResponse) async def dashboard_audit_recent(request: Request) -> JSONResponse: - return JSONResponse(_dashboard_audit(request, limit=25)) + return JSONResponse(await _dashboard_audit(request, limit=25)) @router.get("/dashboard/api/backtesting/reports", response_class=JSONResponse) @@ -940,7 +939,7 @@ async def dashboard_backtesting_reports(request: Request) -> JSONResponse: return JSONResponse( { "generated_at": datetime.now(UTC).isoformat(), - "reports": _recent_backtest_reports(request), + "reports": await _recent_backtest_reports(request), } ) @@ -965,9 +964,10 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse: try: custom_fee_rate = ( - float(defaults["custom_fee_rate"]) if defaults["custom_fee_rate"].strip() else None + float(defaults["custom_fee_rate"] + ) if defaults["custom_fee_rate"].strip() else None ) - fee_rate = _fee_rate_for_profile(defaults["fee_profile"], custom_fee_rate, request=request) + fee_rate = await _fee_rate_for_profile(defaults["fee_profile"], custom_fee_rate, request=request) config_dict: dict[str, object] = { "source": defaults["source"], @@ -986,13 +986,13 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse: store = request.app.state.store repo = BacktestJobRepository(store) events_label = defaults["symbols"] if defaults["symbols"] else "DB-sourced" - job = repo.create_job(events_label, config_dict) + job = await repo.create_job(events_label, config_dict) msg_job = job.id[:8] if job.id else "unknown" queue = request.app.state.backtest_queue await queue.put((job.id or "", config_dict)) - _record_audit( + await _record_audit( request, actor="dashboard_user", event_type="dashboard.backtesting.submit", @@ -1000,14 +1000,14 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse: payload={"job_id": job.id, "source": defaults["source"]}, ) - context = _backtesting_panel_context( + context = await _backtesting_panel_context( request, status="submitted", message=f"Job {msg_job}... queued. Refresh to see results.", defaults=defaults, ) except ValueError as exc: - context = _backtesting_panel_context( + context = await _backtesting_panel_context( request, status="failed", message=str(exc), @@ -1025,11 +1025,12 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse: async def dashboard_backtesting_delete(request: Request, job_id: str) -> HTMLResponse: store = request.app.state.store repo = BacktestJobRepository(store) - repo.delete_job(job_id) + context = await _backtesting_panel_context(request) + await repo.delete_job(job_id) return templates.TemplateResponse( request=request, name="partials/backtesting_panel.html", - context={"request": request, **_backtesting_panel_context(request)}, + context={"request": request, **context}, ) @@ -1037,7 +1038,7 @@ async def dashboard_backtesting_delete(request: Request, job_id: str) -> HTMLRes async def dashboard_backtesting_job_detail(request: Request, job_id: str) -> HTMLResponse: store = request.app.state.store repo = BacktestJobRepository(store) - job = repo.get_job(job_id) + job = await repo.get_job(job_id) if job is None: return HTMLResponse("

Job not found

", status_code=404) @@ -1069,7 +1070,7 @@ async def dashboard_backtesting_job_detail(request: Request, job_id: str) -> HTM async def dashboard_backtesting_export(request: Request, job_id: str) -> Response: store = request.app.state.store repo = BacktestJobRepository(store) - job = repo.get_job(job_id) + job = await repo.get_job(job_id) if job is None: return Response("Job not found", status_code=404) @@ -1087,7 +1088,8 @@ async def dashboard_backtesting_export(request: Request, job_id: str) -> Respons return Response( content=orjson.dumps(payload).decode("utf-8"), media_type="application/x-jsonlines", - headers={"Content-Disposition": f"attachment; filename=backtest_{job_id[:8]}.jsonl"}, + headers={ + "Content-Disposition": f"attachment; filename=backtest_{job_id[:8]}.jsonl"}, ) @@ -1104,7 +1106,7 @@ async def dashboard_control_start(request: Request) -> HTMLResponse: title="Execution started", message="Dashboard control started execution.", ) - _record_audit( + await _record_audit( request, actor="dashboard_user", event_type="dashboard.control.start", @@ -1131,7 +1133,7 @@ async def dashboard_control_stop(request: Request) -> HTMLResponse: title="Execution stopped", message="Dashboard control stopped execution.", ) - _record_audit( + await _record_audit( request, actor="dashboard_user", event_type="dashboard.control.stop", @@ -1162,7 +1164,7 @@ async def dashboard_control_kill_switch(request: Request) -> HTMLResponse: message="Kill switch triggered from dashboard control.", details={"reason": reason}, ) - _record_audit( + await _record_audit( request, actor="dashboard_user", event_type="dashboard.control.kill_switch", @@ -1232,7 +1234,7 @@ async def dashboard_control_config(request: Request) -> HTMLResponse: "paper_trading_mode": "true" if rs.paper_trading_mode else "false", }, ) - _record_audit( + await _record_audit( request, actor="dashboard_user", event_type="dashboard.control.config", @@ -1258,11 +1260,12 @@ async def dashboard_control_config(request: Request) -> HTMLResponse: @router.get("/dashboard/stream/metrics") async def dashboard_metrics_stream(request: Request) -> StreamingResponse: + metrics = await _dashboard_metrics(request) fragment = ( templates.get_template("partials/metrics.html") .render( request=request, - **_dashboard_metrics(request), + **metrics, ) .strip() .replace("\n", "") @@ -1277,9 +1280,10 @@ async def dashboard_metrics_stream(request: Request) -> StreamingResponse: @router.get("/dashboard/stream/overview") async def dashboard_overview_stream(request: Request) -> StreamingResponse: + overview = await _dashboard_overview(request) fragment = ( templates.get_template("partials/overview.html") - .render(request=request, **_dashboard_overview(request)) + .render(request=request, **overview) .strip() .replace("\n", "") ) @@ -1316,7 +1320,7 @@ async def dashboard_api_pairings( ) -> JSONResponse: """List pairings with optional filters.""" repo = _pairing_repo(request) - pairings = repo.list_pairings() + pairings = await repo.list_pairings() # Apply filters if search: @@ -1332,9 +1336,11 @@ async def dashboard_api_pairings( if source: pairings = [p for p in pairings if p.source.lower() == source.lower()] if base: - pairings = [p for p in pairings if p.base_asset.lower() == base.lower()] + pairings = [p for p in pairings if p.base_asset.lower() == + base.lower()] if quote: - pairings = [p for p in pairings if p.quote_asset.lower() == quote.lower()] + pairings = [p for p in pairings if p.quote_asset.lower() == + quote.lower()] # Sort reverse = order.lower() == "desc" @@ -1370,7 +1376,7 @@ async def dashboard_pairings_fragment( ) -> HTMLResponse: """HTMX fragment: pairing table for config page.""" repo = _pairing_repo(request) - pairings = repo.list_pairings() + pairings = await repo.list_pairings() # Apply search filter if search: @@ -1406,7 +1412,7 @@ async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse: return HTMLResponse("Missing base_asset or quote_asset", status_code=400) repo = _pairing_repo(request) - existing = repo.get_pairing(base_asset, quote_asset) + existing = await repo.get_pairing(base_asset, quote_asset) if existing is None: return HTMLResponse("Pairing not found", status_code=404) @@ -1416,9 +1422,9 @@ async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse: enabled=not existing.enabled, source=existing.source, ) - repo.update_pairing(base_asset, quote_asset, toggled) + await repo.update_pairing(base_asset, quote_asset, toggled) - _record_audit( + await _record_audit( request, actor="dashboard_user", event_type="dashboard.pairings.toggle", @@ -1432,7 +1438,7 @@ async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse: # Return refreshed fragment pairings_repo = _pairing_repo(request) - pairings = pairings_repo.list_pairings() + pairings = await pairings_repo.list_pairings() pairings.sort(key=lambda p: (p.base_asset, p.quote_asset)) return templates.TemplateResponse( request=request, @@ -1448,7 +1454,7 @@ async def dashboard_api_pairings_sync(request: Request) -> JSONResponse: store = request.app.state.store summary = await sync_pairings_from_kraken(kraken_client, store) - _record_audit( + await _record_audit( request, actor="dashboard_user", event_type="dashboard.pairings.sync", diff --git a/src/arbitrade/backtesting/replay.py b/src/arbitrade/backtesting/replay.py index 9d05ea2..ca517ce 100644 --- a/src/arbitrade/backtesting/replay.py +++ b/src/arbitrade/backtesting/replay.py @@ -17,6 +17,7 @@ from arbitrade.execution.sequencer import TriangularExecutionSequencer from arbitrade.market_data.order_book import OrderBook from arbitrade.risk.pre_trade import PreTradeValidator from arbitrade.risk.trade_limits import TradeLimitsGuard +from arbitrade.storage.pg_store import PgStore @dataclass(slots=True) @@ -185,8 +186,8 @@ def load_replay_events(path: Path) -> list[ReplayBookEvent]: return sorted(events, key=lambda event: event.occurred_at) -def load_replay_events_from_db( - store: object, +async def load_replay_events_from_db( + store: PgStore, *, symbols: list[str] | None = None, start: datetime | None = None, @@ -197,32 +198,32 @@ def load_replay_events_from_db( Each market_snapshots row has snapshot_at, symbol, payload (raw Kraken WS). Payload format: {channel, symbol, data: [{bids: [{price, qty}], asks: [{price, qty}]}]} """ - with store.connect() as conn: # type: ignore + async with store.pool.acquire() as conn: query = "SELECT snapshot_at, symbol, payload FROM market_snapshots WHERE 1=1" params: list[object] = [] if symbols: - placeholders = ",".join("?" for _ in symbols) + placeholders = ",".join(f"${i+1}" for i in range(len(symbols))) query += f" AND symbol IN ({placeholders})" params.extend(symbols) if start is not None: - query += " AND snapshot_at >= ?" params.append(start) + query += f" AND snapshot_at >= ${len(params)}" if end is not None: - query += " AND snapshot_at <= ?" params.append(end) + query += f" AND snapshot_at <= ${len(params)}" query += " ORDER BY snapshot_at ASC" - rows = conn.execute(query, params).fetchall() + rows = await conn.fetch(query, *params) events: list[ReplayBookEvent] = [] for row in rows: - snapshot_at: datetime = row[0] - symbol: str = row[1] - payload_raw = row[2] + snapshot_at: datetime = row["snapshot_at"] + symbol: str = row["symbol"] + payload_raw = row["payload"] if isinstance(payload_raw, str): payload = orjson.loads(payload_raw) diff --git a/src/arbitrade/backtesting/runner.py b/src/arbitrade/backtesting/runner.py index e1a3562..6058d2c 100644 --- a/src/arbitrade/backtesting/runner.py +++ b/src/arbitrade/backtesting/runner.py @@ -15,7 +15,7 @@ from arbitrade.backtesting.replay import ( load_replay_events_from_db, ) from arbitrade.detection.graph import CurrencyGraph, TriangularCycle -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import BacktestJobRepository _LOG = structlog.get_logger(__name__) @@ -50,11 +50,11 @@ def _parse_balances(raw: str) -> dict[str, float]: async def run_backtest_job( job_id: str, config_dict: dict[str, object] | None, - store: DuckDBStore, + store: PgStore, ) -> None: """Execute a single backtest job: load events from DB or file, run engine, store report.""" repo = BacktestJobRepository(store) - repo.update_status(job_id, "running") + await repo.update_status(job_id, "running") _LOG.info("backtest_job_started", job_id=job_id) try: @@ -79,7 +79,7 @@ async def run_backtest_job( elif isinstance(symbols_raw, list): symbols = [str(s).upper() for s in symbols_raw] - events = load_replay_events_from_db( + events = await load_replay_events_from_db( store, symbols=symbols, start=start_dt, @@ -141,18 +141,18 @@ async def run_backtest_job( "finished_at": report.finished_at.isoformat(), } - repo.store_report(job_id, report_dict) - repo.update_status(job_id, "completed") + await repo.store_report(job_id, report_dict) + await repo.update_status(job_id, "completed") _LOG.info("backtest_job_completed", job_id=job_id, pnl=report.realized_pnl_usd) except Exception as exc: - repo.update_status(job_id, "failed", error=str(exc)) + await repo.update_status(job_id, "failed", error=str(exc)) _LOG.exception("backtest_job_failed", job_id=job_id, error=str(exc)) async def backtest_worker( queue: asyncio.Queue[tuple[str, dict[str, object] | None] | None], - store: DuckDBStore, + store: PgStore, ) -> None: """Worker coroutine: pull jobs from queue and execute them one at a time.""" _LOG.info("backtest_worker_started") diff --git a/src/arbitrade/config/pairing_sync.py b/src/arbitrade/config/pairing_sync.py index 3149796..cf7af04 100644 --- a/src/arbitrade/config/pairing_sync.py +++ b/src/arbitrade/config/pairing_sync.py @@ -9,7 +9,7 @@ import structlog from arbitrade.config.service import ConfigPairing from arbitrade.detection.graph import CurrencyGraph from arbitrade.exchange.kraken_rest import KrakenRestClient -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import ConfigPairingRepository _LOG = structlog.get_logger(__name__) @@ -17,7 +17,7 @@ _LOG = structlog.get_logger(__name__) async def sync_pairings_from_kraken( kraken_client: KrakenRestClient, - store: DuckDBStore, + store: PgStore, ) -> dict[str, int]: """Fetch all asset pairs from Kraken and upsert into config_pairings. @@ -37,7 +37,7 @@ async def sync_pairings_from_kraken( if symbol in seen_symbols: continue seen_symbols.add(symbol) - existing = repo.get_pairing(base, quote) + existing = await repo.get_pairing(base, quote) pairing = ConfigPairing( base_asset=base, quote_asset=quote, @@ -45,7 +45,7 @@ async def sync_pairings_from_kraken( source="kraken", ) try: - repo.upsert_pairing(pairing) + await repo.upsert_pairing(pairing) total += 1 if existing: updated += 1 @@ -65,7 +65,7 @@ async def sync_pairings_from_kraken( async def run_pairing_sync_loop( kraken_client: KrakenRestClient, - store: DuckDBStore, + store: PgStore, stop_event: asyncio.Event, interval_seconds: int = 86400, ) -> None: diff --git a/src/arbitrade/config/service.py b/src/arbitrade/config/service.py index 89350bc..7cb31f2 100644 --- a/src/arbitrade/config/service.py +++ b/src/arbitrade/config/service.py @@ -7,7 +7,7 @@ import orjson from pydantic import BaseModel from arbitrade.config.settings import Settings -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore class ConfigSection(BaseModel): @@ -49,16 +49,15 @@ class ConfigBacktestingDefaults(BaseModel): class ConfigurationService: """Manages application configuration from environment and database sources.""" - def __init__(self, settings: Settings, store: DuckDBStore, audit_repo: Any) -> None: + def __init__(self, settings: Settings, store: PgStore, audit_repo: Any) -> None: self._settings = settings self._store = store self._audit_repo = audit_repo self._config_version = 0 self._loaded_settings: dict[str, Any] = {} self._last_updated_at: datetime | None = None - self._load_database_settings() - def _load_database_settings(self) -> None: + async def load_database_settings(self) -> None: """Load user settings from database and merge with defaults.""" # Import here to avoid circular imports from arbitrade.storage.repositories import ConfigSettingRepository @@ -66,7 +65,7 @@ class ConfigurationService: setting_repo = ConfigSettingRepository(self._store) # Load all settings from database - db_settings = setting_repo.list_settings() + db_settings = await setting_repo.list_settings() # Convert to dictionary for easy access for setting in db_settings: @@ -116,7 +115,7 @@ class ConfigurationService: """Get the timestamp of the last configuration update.""" return self._last_updated_at - def is_config_outdated(self) -> bool: + async def is_config_outdated(self) -> bool: """Check if configuration has been updated since last load.""" # Import here to avoid circular imports from arbitrade.storage.repositories import ConfigSettingRepository @@ -124,7 +123,7 @@ class ConfigurationService: setting_repo = ConfigSettingRepository(self._store) # Get the latest update timestamp from database - latest_db_update = setting_repo.get_latest_updated_at() + latest_db_update = await setting_repo.get_latest_updated_at() # Compare with our last loaded timestamp if latest_db_update and self._last_updated_at: @@ -133,15 +132,15 @@ class ConfigurationService: return True return False - def reload_if_changed(self) -> bool: + async def reload_if_changed(self) -> bool: """Reload configuration if it has been updated in the database.""" - if self.is_config_outdated(): - self._load_database_settings() + if await self.is_config_outdated(): + await self.load_database_settings() self._config_version += 1 return True return False - def set_setting(self, key: str, value: Any, updated_by: str | None = None) -> None: + async def set_setting(self, key: str, value: Any, updated_by: str | None = None) -> None: """Set a configuration setting value and persist to database.""" # Import here to avoid circular imports from arbitrade.storage.repositories import ConfigSettingRepository @@ -183,13 +182,13 @@ class ConfigurationService: ) # Check if setting exists - existing_setting = setting_repo.get_setting(key) + existing_setting = await setting_repo.get_setting(key) if existing_setting: # Update existing setting - updated_setting = setting_repo.update_setting(key, setting) + updated_setting = await setting_repo.update_setting(key, setting) else: # Create new setting - updated_setting = setting_repo.create_setting(setting) + updated_setting = await setting_repo.create_setting(setting) # Update in-memory cache self._loaded_settings[key] = value @@ -211,18 +210,18 @@ class ConfigurationService: return ConfigPairingRepository(self._store) - def list_pairings(self) -> list[ConfigPairing]: + async def list_pairings(self) -> list[ConfigPairing]: """List all currency pairings.""" r = self._pairing_repo() # type: ignore[no-untyped-call] - p = r.list_pairings() + p = await r.list_pairings() return p # type: ignore[no-any-return] - def create_pairing( + async def create_pairing( self, base_asset: str, quote_asset: str, source: str = "manual" ) -> ConfigPairing: """Create a new currency pairing.""" r = self._pairing_repo() # type: ignore[no-untyped-call] - e = r.get_pairing(base_asset, quote_asset) + e = await r.get_pairing(base_asset, quote_asset) if e: return e # type: ignore[no-any-return] pairing = ConfigPairing( diff --git a/src/arbitrade/config/settings.py b/src/arbitrade/config/settings.py index d0d9c2e..bf7feec 100644 --- a/src/arbitrade/config/settings.py +++ b/src/arbitrade/config/settings.py @@ -1,7 +1,6 @@ from __future__ import annotations from functools import lru_cache -from pathlib import Path from pydantic import Field, field_validator, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict @@ -32,49 +31,78 @@ class Settings(BaseSettings): ) alerts_enabled: bool = Field(default=True, alias="ALERTS_ENABLED") - alert_min_severity: str = Field(default="warning", alias="ALERT_MIN_SEVERITY") - alert_dedup_seconds: float = Field(default=30.0, alias="ALERT_DEDUP_SECONDS") - alert_on_trade_events: bool = Field(default=True, alias="ALERT_ON_TRADE_EVENTS") - alert_on_error_events: bool = Field(default=True, alias="ALERT_ON_ERROR_EVENTS") - alert_on_threshold_events: bool = Field(default=True, alias="ALERT_ON_THRESHOLD_EVENTS") - alert_on_system_events: bool = Field(default=True, alias="ALERT_ON_SYSTEM_EVENTS") + alert_min_severity: str = Field( + default="warning", alias="ALERT_MIN_SEVERITY") + alert_dedup_seconds: float = Field( + default=30.0, alias="ALERT_DEDUP_SECONDS") + alert_on_trade_events: bool = Field( + default=True, alias="ALERT_ON_TRADE_EVENTS") + alert_on_error_events: bool = Field( + default=True, alias="ALERT_ON_ERROR_EVENTS") + alert_on_threshold_events: bool = Field( + default=True, alias="ALERT_ON_THRESHOLD_EVENTS") + alert_on_system_events: bool = Field( + default=True, alias="ALERT_ON_SYSTEM_EVENTS") - telegram_alerts_enabled: bool = Field(default=False, alias="TELEGRAM_ALERTS_ENABLED") - telegram_bot_token: str | None = Field(default=None, alias="TELEGRAM_BOT_TOKEN") - telegram_chat_id: str | None = Field(default=None, alias="TELEGRAM_CHAT_ID") + telegram_alerts_enabled: bool = Field( + default=False, alias="TELEGRAM_ALERTS_ENABLED") + telegram_bot_token: str | None = Field( + default=None, alias="TELEGRAM_BOT_TOKEN") + telegram_chat_id: str | None = Field( + default=None, alias="TELEGRAM_CHAT_ID") - discord_alerts_enabled: bool = Field(default=False, alias="DISCORD_ALERTS_ENABLED") - discord_webhook_url: str | None = Field(default=None, alias="DISCORD_WEBHOOK_URL") + discord_alerts_enabled: bool = Field( + default=False, alias="DISCORD_ALERTS_ENABLED") + discord_webhook_url: str | None = Field( + default=None, alias="DISCORD_WEBHOOK_URL") - email_alerts_enabled: bool = Field(default=False, alias="EMAIL_ALERTS_ENABLED") + email_alerts_enabled: bool = Field( + default=False, alias="EMAIL_ALERTS_ENABLED") email_smtp_host: str | None = Field(default=None, alias="EMAIL_SMTP_HOST") email_smtp_port: int = Field(default=587, alias="EMAIL_SMTP_PORT") - email_smtp_username: str | None = Field(default=None, alias="EMAIL_SMTP_USERNAME") - email_smtp_password: str | None = Field(default=None, alias="EMAIL_SMTP_PASSWORD") - email_alert_from: str | None = Field(default=None, alias="EMAIL_ALERT_FROM") + email_smtp_username: str | None = Field( + default=None, alias="EMAIL_SMTP_USERNAME") + email_smtp_password: str | None = Field( + default=None, alias="EMAIL_SMTP_PASSWORD") + email_alert_from: str | None = Field( + default=None, alias="EMAIL_ALERT_FROM") email_alert_to: str | None = Field(default=None, alias="EMAIL_ALERT_TO") email_smtp_use_tls: bool = Field(default=True, alias="EMAIL_SMTP_USE_TLS") - duckdb_path: Path = Field(default=Path("./data/arbitrade.duckdb"), alias="DUCKDB_PATH") + # PostgreSQL connection settings + pg_host: str = Field(default="192.168.88.35", alias="PG_HOST") + pg_port: int = Field(default=5432, alias="PG_PORT") + pg_database: str = Field(default="arbitrade", alias="PG_DATABASE") + pg_user: str = Field(default="arbitrade", alias="PG_USER") + pg_password: str = Field(default="arbitrade", alias="PG_PASSWORD") + pg_min_connections: int = Field(default=2, alias="PG_MIN_CONNECTIONS") + pg_max_connections: int = Field(default=10, alias="PG_MAX_CONNECTIONS") - kraken_rest_url: str = Field(default="https://api.kraken.com", alias="KRAKEN_REST_URL") - kraken_ws_url: str = Field(default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL") + kraken_rest_url: str = Field( + default="https://api.kraken.com", alias="KRAKEN_REST_URL") + kraken_ws_url: str = Field( + default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL") kraken_private_rate_limit_seconds: float = Field( default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS" ) - kraken_http_timeout_seconds: float = Field(default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS") - kraken_retry_attempts: int = Field(default=3, alias="KRAKEN_RETRY_ATTEMPTS") + kraken_http_timeout_seconds: float = Field( + default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS") + kraken_retry_attempts: int = Field( + default=3, alias="KRAKEN_RETRY_ATTEMPTS") kraken_retry_base_delay_seconds: float = Field( default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS" ) kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY") - kraken_api_secret: str | None = Field(default=None, alias="KRAKEN_API_SECRET") + kraken_api_secret: str | None = Field( + default=None, alias="KRAKEN_API_SECRET") kraken_api_key_permissions: str = Field( default="query,trade", alias="KRAKEN_API_KEY_PERMISSIONS", ) - ws_heartbeat_timeout_seconds: float = Field(default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS") - ws_max_staleness_seconds: float = Field(default=5.0, alias="WS_MAX_STALENESS_SECONDS") + ws_heartbeat_timeout_seconds: float = Field( + default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS") + ws_max_staleness_seconds: float = Field( + default=5.0, alias="WS_MAX_STALENESS_SECONDS") strategy_enable_stat_arb_experiment: bool = Field( default=False, alias="STRATEGY_ENABLE_STAT_ARB_EXPERIMENT", @@ -97,20 +125,29 @@ class Settings(BaseSettings): ) paper_trading_mode: bool = Field(default=True, alias="PAPER_TRADING_MODE") trade_capital_usd: float = Field(default=100.0, alias="TRADE_CAPITAL_USD") - max_trade_capital_usd: float = Field(default=100.0, alias="MAX_TRADE_CAPITAL_USD") - max_concurrent_trades: int | None = Field(default=None, alias="MAX_CONCURRENT_TRADES") + max_trade_capital_usd: float = Field( + default=100.0, alias="MAX_TRADE_CAPITAL_USD") + max_concurrent_trades: int | None = Field( + default=None, alias="MAX_CONCURRENT_TRADES") max_exposure_per_asset_usd: float | None = Field( default=None, alias="MAX_EXPOSURE_PER_ASSET_USD", ) - quote_balance_asset: str = Field(default="USD", alias="QUOTE_BALANCE_ASSET") - min_order_size_usd: float | None = Field(default=None, alias="MIN_ORDER_SIZE_USD") + quote_balance_asset: str = Field( + default="USD", alias="QUOTE_BALANCE_ASSET") + min_order_size_usd: float | None = Field( + default=None, alias="MIN_ORDER_SIZE_USD") kill_switch_active: bool = Field(default=False, alias="KILL_SWITCH_ACTIVE") - daily_loss_limit_usd: float | None = Field(default=None, alias="DAILY_LOSS_LIMIT_USD") - cumulative_loss_limit_usd: float | None = Field(default=None, alias="CUMULATIVE_LOSS_LIMIT_USD") - max_source_latency_ms: float | None = Field(default=None, alias="MAX_SOURCE_LATENCY_MS") - max_apply_latency_ms: float | None = Field(default=None, alias="MAX_APPLY_LATENCY_MS") - max_consecutive_failures: int | None = Field(default=None, alias="MAX_CONSECUTIVE_FAILURES") + daily_loss_limit_usd: float | None = Field( + default=None, alias="DAILY_LOSS_LIMIT_USD") + cumulative_loss_limit_usd: float | None = Field( + default=None, alias="CUMULATIVE_LOSS_LIMIT_USD") + max_source_latency_ms: float | None = Field( + default=None, alias="MAX_SOURCE_LATENCY_MS") + max_apply_latency_ms: float | None = Field( + default=None, alias="MAX_APPLY_LATENCY_MS") + max_consecutive_failures: int | None = Field( + default=None, alias="MAX_CONSECUTIVE_FAILURES") fernet_key: str | None = Field(default=None, alias="FERNET_KEY") @@ -127,7 +164,8 @@ class Settings(BaseSettings): def _validate_log_level(cls, value: str) -> str: normalized = value.strip().upper() if normalized not in {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}: - raise ValueError("LOG_LEVEL must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL") + raise ValueError( + "LOG_LEVEL must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL") return normalized @field_validator("alert_min_severity") @@ -135,16 +173,19 @@ class Settings(BaseSettings): def _validate_alert_severity(cls, value: str) -> str: normalized = value.strip().lower() if normalized not in {"info", "warning", "error", "critical"}: - raise ValueError("ALERT_MIN_SEVERITY must be one of: info, warning, error, critical") + raise ValueError( + "ALERT_MIN_SEVERITY must be one of: info, warning, error, critical") return normalized @model_validator(mode="after") def _validate_security_constraints(self) -> Settings: if bool(self.dashboard_auth_username) ^ bool(self.dashboard_auth_password): - raise ValueError("dashboard auth requires both username and password") + raise ValueError( + "dashboard auth requires both username and password") if bool(self.kraken_api_key) ^ bool(self.kraken_api_secret): - raise ValueError("Kraken API auth requires both API key and secret") + raise ValueError( + "Kraken API auth requires both API key and secret") permissions = { token.strip().lower() @@ -152,9 +193,11 @@ class Settings(BaseSettings): if token.strip() } if permissions and ("query" not in permissions or "trade" not in permissions): - raise ValueError("KRAKEN_API_KEY_PERMISSIONS must include query and trade") + raise ValueError( + "KRAKEN_API_KEY_PERMISSIONS must include query and trade") if "withdraw" in permissions or "withdrawals" in permissions: - raise ValueError("KRAKEN_API_KEY_PERMISSIONS must not include withdrawal scope") + raise ValueError( + "KRAKEN_API_KEY_PERMISSIONS must not include withdrawal scope") if self.alert_dedup_seconds < 0.0: raise ValueError("ALERT_DEDUP_SECONDS must be >= 0") @@ -170,7 +213,8 @@ class Settings(BaseSettings): "STRATEGY_STAT_ARB_ENTRY_ZSCORE must be greater than STRATEGY_STAT_ARB_EXIT_ZSCORE" ) if self.strategy_stat_arb_max_holding_seconds <= 0.0: - raise ValueError("STRATEGY_STAT_ARB_MAX_HOLDING_SECONDS must be > 0") + raise ValueError( + "STRATEGY_STAT_ARB_MAX_HOLDING_SECONDS must be > 0") return self diff --git a/src/arbitrade/exchange/fee_service.py b/src/arbitrade/exchange/fee_service.py index 0104374..e76b093 100644 --- a/src/arbitrade/exchange/fee_service.py +++ b/src/arbitrade/exchange/fee_service.py @@ -9,7 +9,7 @@ import orjson import structlog from arbitrade.exchange.kraken_rest import KrakenRestClient -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import ( KrakenAccountSnapshot, KrakenAccountSnapshotRepository, @@ -22,7 +22,7 @@ _FEE_REFRESH_INTERVAL_SECONDS = 86400 # 1 day async def fetch_and_store_account_snapshot( client: KrakenRestClient, - store: DuckDBStore, + store: PgStore, ) -> KrakenAccountSnapshot | None: """Query TradeVolume + TradeBalance, persist as snapshot. @@ -42,9 +42,12 @@ async def fetch_and_store_account_snapshot( _LOG.exception("trade_balance_fetch_failed") return None - fee_tier = volume_data.get("fee_tier") if isinstance(volume_data, dict) else None - fees_dict = volume_data.get("fees") if isinstance(volume_data, dict) else None - fees_maker = volume_data.get("fees_maker") if isinstance(volume_data, dict) else None + fee_tier = volume_data.get("fee_tier") if isinstance( + volume_data, dict) else None + fees_dict = volume_data.get("fees") if isinstance( + volume_data, dict) else None + fees_maker = volume_data.get("fees_maker") if isinstance( + volume_data, dict) else None currency = volume_data.get("currency") thirty_day_volume_str = volume_data.get("volume") @@ -70,7 +73,8 @@ async def fetch_and_store_account_snapshot( if currency is not None: fee_schedule["currency"] = currency - thirty_day_volume = float(thirty_day_volume_str) if thirty_day_volume_str is not None else None + thirty_day_volume = float( + thirty_day_volume_str) if thirty_day_volume_str is not None else None snapshot = KrakenAccountSnapshot( snapshot_at=datetime.now(UTC), @@ -78,11 +82,12 @@ async def fetch_and_store_account_snapshot( maker_fee=maker_fee, taker_fee=taker_fee, thirty_day_volume=thirty_day_volume, - trade_balance_raw=balance_data if isinstance(balance_data, dict) else None, + trade_balance_raw=balance_data if isinstance( + balance_data, dict) else None, fee_schedule_raw=fee_schedule if fee_schedule else None, ) - repo.insert_snapshot(snapshot) + await repo.insert_snapshot(snapshot) _LOG.info( "account_snapshot_stored", fee_tier=fee_tier_str, @@ -97,15 +102,14 @@ async def fetch_and_store_account_snapshot( if isinstance(balance_data, dict): eb = balance_data.get("eb") total_value = float(eb) if eb is not None else 0.0 - with store.connect() as conn: - conn.execute( + async with store.pool.acquire() as conn: + await conn.execute( "INSERT INTO portfolio_snapshots" - " (snapshot_at, balances, total_value_usd) VALUES (?, ?, ?)", - ( - datetime.now(UTC), - orjson.dumps(wallet_balances).decode("utf-8") if wallet_balances else None, - total_value, - ), + " (snapshot_at, balances, total_value_usd) VALUES ($1, $2, $3)", + datetime.now(UTC), + orjson.dumps(wallet_balances).decode( + "utf-8") if wallet_balances else None, + total_value, ) _LOG.info("portfolio_snapshot_stored", total_value_usd=total_value) except Exception: @@ -116,14 +120,15 @@ async def fetch_and_store_account_snapshot( async def run_fee_sync_loop( client: KrakenRestClient, - store: DuckDBStore, + store: PgStore, stop_event: asyncio.Event, ) -> None: """Periodic loop: fetch account snapshot every hour. Runs until stop_event is set. """ - _LOG.info("fee_sync_loop_started", interval_s=_FEE_REFRESH_INTERVAL_SECONDS) + _LOG.info("fee_sync_loop_started", + interval_s=_FEE_REFRESH_INTERVAL_SECONDS) while not stop_event.is_set(): try: diff --git a/src/arbitrade/market_data/feed_builder.py b/src/arbitrade/market_data/feed_builder.py index be09ee9..0571ecc 100644 --- a/src/arbitrade/market_data/feed_builder.py +++ b/src/arbitrade/market_data/feed_builder.py @@ -6,14 +6,14 @@ import structlog from arbitrade.detection.engine import IncrementalCycleDetector from arbitrade.detection.graph import CurrencyGraph -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import ConfigPairingRepository _LOG = structlog.get_logger(__name__) -def build_detector_from_enabled_pairings( - store: DuckDBStore, +async def build_detector_from_enabled_pairings( + store: PgStore, *, fee_rate: float = 0.0, max_depth_levels: int = 10, @@ -24,7 +24,7 @@ def build_detector_from_enabled_pairings( Returns None if no enabled pairings exist. """ repo = ConfigPairingRepository(store) - pairings = repo.list_pairings(enabled_only=True) + pairings = await repo.list_pairings(enabled_only=True) if not pairings: _LOG.warning("no_enabled_pairings_found_detector_not_created") return None @@ -55,8 +55,8 @@ def build_detector_from_enabled_pairings( ) -def get_enabled_pair_symbols(store: DuckDBStore) -> list[str]: +async def get_enabled_pair_symbols(store: PgStore) -> list[str]: """Return list of enabled pair symbols (e.g. ['BTC/USD', 'ETH/BTC']).""" repo = ConfigPairingRepository(store) - pairings = repo.list_pairings(enabled_only=True) + pairings = await repo.list_pairings(enabled_only=True) return [f"{p.base_asset}/{p.quote_asset}" for p in pairings if p.enabled] diff --git a/src/arbitrade/metrics.py b/src/arbitrade/metrics.py index aadaf32..ca39d97 100644 --- a/src/arbitrade/metrics.py +++ b/src/arbitrade/metrics.py @@ -3,7 +3,7 @@ from __future__ import annotations from dataclasses import dataclass from datetime import datetime -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore @dataclass(frozen=True, slots=True) @@ -19,57 +19,55 @@ class PerformanceMetrics: class MetricsCalculator: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def compute(self) -> PerformanceMetrics: - with self._store.connect() as conn: - tm = conn.execute(""" + async def compute(self) -> PerformanceMetrics: + async with self._store.pool.acquire() as conn: + tm = await conn.fetchrow(""" SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) AS realized_pnl_usd, COUNT(*) AS total_trades, SUM(CASE WHEN realized_pnl > 0 THEN 1 ELSE 0 END) AS winning_trades, - AVG(EPOCH(finished_at) - EPOCH(started_at)) AS avg_trade_duration_seconds, - quantile_cont( - EPOCH(finished_at) - EPOCH(started_at), - 0.50 - ) AS latency_p50_seconds, - quantile_cont( - EPOCH(finished_at) - EPOCH(started_at), - 0.95 - ) AS latency_p95_seconds, - quantile_cont( - EPOCH(finished_at) - EPOCH(started_at), - 0.99 - ) AS latency_p99_seconds + AVG(EXTRACT(EPOCH FROM finished_at - started_at)) AS avg_trade_duration_seconds, + PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM finished_at - started_at)) AS latency_p50_seconds, + PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM finished_at - started_at)) AS latency_p95_seconds, + PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM finished_at - started_at)) AS latency_p99_seconds FROM trades WHERE finished_at IS NOT NULL - """).fetchone() + """) - om = conn.execute(""" + om = await conn.fetchrow(""" SELECT COUNT(*) AS opportunity_count, MIN(detected_at) AS first_detected_at, MAX(detected_at) AS last_detected_at FROM opportunities - """).fetchone() + """) - fm = conn.execute(""" + fm = await conn.fetchrow(""" SELECT AVG(filled_volume / volume) AS fill_rate FROM orders WHERE volume > 0 AND filled_volume IS NOT NULL - """).fetchone() + """) - r_pnl_usd = float(tm[0]) if tm and tm[0] is not None else 0.0 - tt = int(tm[1]) if tm and tm[1] is not None else 0 - wt = int(tm[2]) if tm and tm[2] is not None else 0 + r_pnl_usd = float( + tm["realized_pnl_usd"]) if tm and tm["realized_pnl_usd"] is not None else 0.0 + tt = int(tm["total_trades"] + ) if tm and tm["total_trades"] is not None else 0 + wt = int(tm["winning_trades"] + ) if tm and tm["winning_trades"] is not None else 0 wr = wt / tt if tt > 0 else None - atd = float(tm[3]) if tm and tm[3] is not None else None + atd = float(tm["avg_trade_duration_seconds"] + ) if tm and tm["avg_trade_duration_seconds"] is not None else None - oc = int(om[0]) if om is not None and om[0] is not None else 0 - fo = om[1] if om is not None and isinstance(om[1], datetime) else None - lo = om[2] if om is not None and isinstance(om[2], datetime) else None + oc = int(om["opportunity_count"] + ) if om is not None and om["opportunity_count"] is not None else 0 + fo = om["first_detected_at"] if om is not None and isinstance( + om["first_detected_at"], datetime) else None + lo = om["last_detected_at"] if om is not None and isinstance( + om["last_detected_at"], datetime) else None opportunities_per_minute: float | None if oc >= 2 and fo is not None and lo is not None: @@ -82,11 +80,15 @@ class MetricsCalculator: else: opportunities_per_minute = None - fill_rate = float(fm[0]) if fm and fm[0] is not None else None + fill_rate = float( + fm["fill_rate"]) if fm and fm["fill_rate"] is not None else None - lp50 = float(tm[4]) if tm and tm[4] is not None else None - lp95 = float(tm[5]) if tm and tm[5] is not None else None - lp99 = float(tm[6]) if tm and tm[6] is not None else None + lp50 = float(tm["latency_p50_seconds"] + ) if tm and tm["latency_p50_seconds"] is not None else None + lp95 = float(tm["latency_p95_seconds"] + ) if tm and tm["latency_p95_seconds"] is not None else None + lp99 = float(tm["latency_p99_seconds"] + ) if tm and tm["latency_p99_seconds"] is not None else None return PerformanceMetrics( realized_pnl_usd=r_pnl_usd, diff --git a/src/arbitrade/runtime/lifecycle.py b/src/arbitrade/runtime/lifecycle.py index c00a0e0..0f6738b 100644 --- a/src/arbitrade/runtime/lifecycle.py +++ b/src/arbitrade/runtime/lifecycle.py @@ -8,7 +8,7 @@ from typing import Any, cast from fastapi import FastAPI from arbitrade.api.control_state import DashboardControlState -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import ( AuditRecord, AuditRepository, @@ -29,8 +29,8 @@ def _controls(app: FastAPI) -> DashboardControlState: return cast(DashboardControlState, app.state.dashboard_controls) -def _store(app: FastAPI) -> DuckDBStore: - return cast(DuckDBStore, app.state.store) +def _store(app: FastAPI) -> PgStore: + return cast(PgStore, app.state.store) def _audit_repository(app: FastAPI) -> AuditRepository | None: @@ -43,34 +43,34 @@ def _runtime_repository(app: FastAPI) -> RuntimeStateRepository | None: return repository if isinstance(repository, RuntimeStateRepository) else None -def _open_trade_count(store: DuckDBStore) -> int: - with store.connect() as conn: - row = conn.execute(""" +async def _open_trade_count(store: PgStore) -> int: + async with store.pool.acquire() as conn: + row = await conn.fetchrow(""" SELECT COUNT(*) FROM trades WHERE finished_at IS NULL - """).fetchone() + """) return int(row[0]) if row is not None else 0 -def _latest_balances(store: DuckDBStore) -> dict[str, Any] | None: - with store.connect() as conn: - row = conn.execute(""" +async def _latest_balances(store: PgStore) -> dict[str, Any] | None: + async with store.pool.acquire() as conn: + row = await conn.fetchrow(""" SELECT balances FROM portfolio_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """).fetchone() + """) - if row is None or row[0] is None: + if row is None or row["balances"] is None: return None - raw_balances = row[0] + raw_balances = row["balances"] if isinstance(raw_balances, str): return {"raw": raw_balances} return {"raw": str(raw_balances)} -def _record_audit( +async def _record_audit( app: FastAPI, *, event_type: str, @@ -80,7 +80,7 @@ def _record_audit( repository = _audit_repository(app) if repository is None: return - repository.insert( + await repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="runtime", @@ -106,7 +106,7 @@ async def _run_startup_reconciler(app: FastAPI) -> None: await result -def persist_runtime_snapshot(app: FastAPI, *, note: str | None = None) -> RuntimeStateRecord | None: +async def persist_runtime_snapshot(app: FastAPI, *, note: str | None = None) -> RuntimeStateRecord | None: repository = _runtime_repository(app) if repository is None: return None @@ -118,11 +118,11 @@ def persist_runtime_snapshot(app: FastAPI, *, note: str | None = None) -> Runtim is_running=controls.is_running, kill_switch_active=controls.kill_switch.is_active, kill_switch_reason=controls.kill_switch.reason, - open_trade_count=_open_trade_count(store), - last_known_balances=_latest_balances(store), + open_trade_count=await _open_trade_count(store), + last_known_balances=await _latest_balances(store), note=note, ) - repository.insert(snapshot) + await repository.insert(snapshot) return snapshot @@ -134,7 +134,7 @@ async def restore_runtime_state(app: FastAPI) -> RuntimeRecoveryReport: restored_from_snapshot = False snapshot_at: str | None = None - latest = repo.latest() if repo is not None else None + latest = await repo.latest() if repo is not None else None if latest is not None: restored_from_snapshot = True snapshot_at = latest.snapshot_at.isoformat() @@ -146,7 +146,7 @@ async def restore_runtime_state(app: FastAPI) -> RuntimeRecoveryReport: ctl.kill_switch.deactivate() ctl.mark_updated() - open_trades = _open_trade_count(store) + open_trades = await _open_trade_count(store) restart_guard_active = False if open_trades > 0: ctl.is_running = False @@ -163,7 +163,7 @@ async def restore_runtime_state(app: FastAPI) -> RuntimeRecoveryReport: ) app.state.recovery_report = report - _record_audit( + await _record_audit( app, event_type="runtime.startup_recovery", decision="applied", @@ -212,7 +212,7 @@ async def graceful_shutdown(app: FastAPI) -> None: controls.is_running = False controls.mark_updated() - _record_audit( + await _record_audit( app, event_type="runtime.shutdown", decision="initiated", @@ -220,4 +220,4 @@ async def graceful_shutdown(app: FastAPI) -> None: ) await drain_background_workers(app) - persist_runtime_snapshot(app, note="graceful_shutdown") + await persist_runtime_snapshot(app, note="graceful_shutdown") diff --git a/src/arbitrade/storage/db.py b/src/arbitrade/storage/db.py deleted file mode 100644 index 6b79e63..0000000 --- a/src/arbitrade/storage/db.py +++ /dev/null @@ -1,233 +0,0 @@ -from __future__ import annotations - -from collections.abc import Iterator -from contextlib import contextmanager -from pathlib import Path - -import duckdb -import structlog - -from arbitrade.config.settings import Settings - -_LOG = structlog.get_logger(__name__) - -SCHEMA_SQL = """ -CREATE TABLE IF NOT EXISTS schema_migrations ( - version INTEGER PRIMARY KEY, - applied_at TIMESTAMP DEFAULT current_timestamp -); - -CREATE TABLE IF NOT EXISTS config_sections ( - id INTEGER PRIMARY KEY, - name VARCHAR UNIQUE NOT NULL, - description TEXT, - updated_at TIMESTAMP DEFAULT current_timestamp -); - -CREATE TABLE IF NOT EXISTS config_settings ( - key VARCHAR PRIMARY KEY, - section VARCHAR NOT NULL, - value_json TEXT NOT NULL, - value_type VARCHAR NOT NULL, - is_secret BOOLEAN DEFAULT FALSE, - is_runtime_reloadable BOOLEAN DEFAULT FALSE, - updated_at TIMESTAMP DEFAULT current_timestamp, - updated_by VARCHAR -); - -CREATE TABLE IF NOT EXISTS config_pairings ( - id INTEGER PRIMARY KEY, - base_asset VARCHAR NOT NULL, - quote_asset VARCHAR NOT NULL, - enabled BOOLEAN DEFAULT TRUE, - source VARCHAR NOT NULL, - created_at TIMESTAMP DEFAULT current_timestamp, - updated_at TIMESTAMP DEFAULT current_timestamp, - UNIQUE(base_asset, quote_asset) -); - -CREATE TABLE IF NOT EXISTS config_backtesting_defaults ( - id INTEGER PRIMARY KEY, - starting_balances JSON, - trade_capital DOUBLE, - min_profit_threshold DOUBLE, - slippage_bps INTEGER, - execution_latency_ms INTEGER, - fee_source VARCHAR DEFAULT 'api' -); - -CREATE TABLE IF NOT EXISTS opportunities ( - id UUID DEFAULT uuid(), - detected_at TIMESTAMP NOT NULL, - cycle VARCHAR NOT NULL, - gross_pct DOUBLE, - net_pct DOUBLE, - est_profit DOUBLE, - executed BOOLEAN DEFAULT FALSE -); - -CREATE TABLE IF NOT EXISTS trades ( - id UUID DEFAULT uuid(), - trade_ref VARCHAR NOT NULL, - started_at TIMESTAMP NOT NULL, - finished_at TIMESTAMP, - status VARCHAR NOT NULL, - realized_pnl DOUBLE, - estimated_pnl DOUBLE, - capital_used DOUBLE, - cycle VARCHAR, - leg_count INTEGER -); - -CREATE TABLE IF NOT EXISTS orders ( - id UUID DEFAULT uuid(), - trade_ref VARCHAR NOT NULL, - order_ref VARCHAR NOT NULL, - leg_index INTEGER NOT NULL, - pair VARCHAR NOT NULL, - side VARCHAR NOT NULL, - volume DOUBLE NOT NULL, - user_ref INTEGER, - status VARCHAR, - filled_volume DOUBLE, - avg_price DOUBLE, - raw_response JSON, - recorded_at TIMESTAMP NOT NULL -); - -CREATE TABLE IF NOT EXISTS pnl_events ( - id UUID DEFAULT uuid(), - trade_ref VARCHAR NOT NULL, - recorded_at TIMESTAMP NOT NULL, - kind VARCHAR NOT NULL, - pnl_usd DOUBLE NOT NULL, - source VARCHAR NOT NULL -); - -CREATE TABLE IF NOT EXISTS portfolio_snapshots ( - snapshot_at TIMESTAMP NOT NULL, - balances JSON, - total_value_usd DOUBLE -); - -CREATE TABLE IF NOT EXISTS market_snapshots ( - snapshot_at TIMESTAMP NOT NULL, - symbol VARCHAR NOT NULL, - source VARCHAR NOT NULL, - payload JSON NOT NULL, - latency_ms DOUBLE -); - -CREATE TABLE IF NOT EXISTS audit_events ( - id UUID DEFAULT uuid(), - occurred_at TIMESTAMP NOT NULL, - actor VARCHAR NOT NULL, - event_type VARCHAR NOT NULL, - decision VARCHAR NOT NULL, - payload JSON, - correlation_id VARCHAR -); - -CREATE TABLE IF NOT EXISTS runtime_state_snapshots ( - snapshot_at TIMESTAMP NOT NULL, - is_running BOOLEAN NOT NULL, - kill_switch_active BOOLEAN NOT NULL, - kill_switch_reason VARCHAR, - open_trade_count INTEGER NOT NULL, - last_known_balances JSON, - note VARCHAR -); - -CREATE TABLE IF NOT EXISTS kraken_account_snapshots ( - snapshot_at TIMESTAMP NOT NULL, - fee_tier VARCHAR, - maker_fee DOUBLE, - taker_fee DOUBLE, - thirty_day_volume DOUBLE, - trade_balance_raw JSON, - fee_schedule_raw JSON -); - -CREATE TABLE IF NOT EXISTS backtest_jobs ( - id UUID DEFAULT uuid(), - status VARCHAR NOT NULL DEFAULT 'pending', - events_path VARCHAR NOT NULL, - config JSON, - report JSON, - error VARCHAR, - created_at TIMESTAMP DEFAULT current_timestamp, - started_at TIMESTAMP, - finished_at TIMESTAMP -); -""" - - -class DuckDBStore: - SCHEMA_VERSION = 5 - - def __init__(self, settings: Settings) -> None: - self._db_path = Path(settings.duckdb_path) - self._db_path.parent.mkdir(parents=True, exist_ok=True) - self._use_memory_fallback = False - - @contextmanager - def connect(self) -> Iterator[duckdb.DuckDBPyConnection]: - try: - conn = duckdb.connect(str(self._db_path)) - except duckdb.IOException: - if not self._use_memory_fallback: - _LOG.warning( - "duckdb_path_unavailable_falling_back_to_memory", path=str(self._db_path) - ) - self._use_memory_fallback = True - conn = duckdb.connect(":memory:") - try: - yield conn - finally: - conn.close() - - def _get_table_columns(self, conn: duckdb.DuckDBPyConnection, table_name: str) -> set[str]: - try: - rows = conn.execute(f"PRAGMA table_info({table_name})").fetchall() - return {str(row[1]) for row in rows} - except Exception: - return set() - - def _table_exists(self, conn: duckdb.DuckDBPyConnection, table_name: str) -> bool: - try: - result = conn.execute( - f"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_name}'" - ).fetchone() - count = result[0] if result else 0 - return count > 0 - except Exception: - return False - - def _ensure_column( - self, conn: duckdb.DuckDBPyConnection, table_name: str, column_def: str - ) -> None: - """Add a column to a table if it doesn't already exist.""" - existing = self._get_table_columns(conn, table_name) - col_name = column_def.split()[0] - if col_name not in existing: - conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_def}") - - def migrate(self) -> None: - with self.connect() as conn: - # Run CREATE TABLE IF NOT EXISTS for all tables - conn.execute(SCHEMA_SQL) - - # Ensure schema_migrations table exists and get current version - if not self._table_exists(conn, "schema_migrations"): - conn.execute(""" - CREATE TABLE IF NOT EXISTS schema_migrations ( - version INTEGER PRIMARY KEY, - applied_at TIMESTAMP DEFAULT current_timestamp - ) - """) - - # Update version to current - conn.execute( - f"INSERT OR REPLACE INTO schema_migrations (version, applied_at) " - f"VALUES ({self.SCHEMA_VERSION}, current_timestamp)" - ) diff --git a/src/arbitrade/storage/executions.py b/src/arbitrade/storage/executions.py index 4262091..502ab2d 100644 --- a/src/arbitrade/storage/executions.py +++ b/src/arbitrade/storage/executions.py @@ -36,7 +36,8 @@ class AsyncExecutionWriter: async def start(self) -> None: if self._task is None or self._task.done(): self._stop.clear() - self._task = asyncio.create_task(self._run(), name="execution-writer") + self._task = asyncio.create_task( + self._run(), name="execution-writer") async def stop(self) -> None: self._stop.set() @@ -55,11 +56,11 @@ class AsyncExecutionWriter: try: if isinstance(record, TradeRecord): - self._trade_repository.insert(record) + await self._trade_repository.insert(record) elif isinstance(record, OrderRecord): - self._order_repository.insert(record) + await self._order_repository.insert(record) else: - self._pnl_repository.insert(record) + await self._pnl_repository.insert(record) except Exception as exc: _LOG.error("execution_write_failed", error=str(exc)) finally: diff --git a/src/arbitrade/storage/market_snapshots.py b/src/arbitrade/storage/market_snapshots.py index c87c529..c3e4493 100644 --- a/src/arbitrade/storage/market_snapshots.py +++ b/src/arbitrade/storage/market_snapshots.py @@ -24,14 +24,16 @@ class MarketSnapshot: class AsyncMarketSnapshotWriter: def __init__(self, repository: MarketSnapshotRepository, max_queue_size: int = 50_000) -> None: self._repository = repository - self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue(maxsize=max_queue_size) + self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue( + maxsize=max_queue_size) self._task: asyncio.Task[None] | None = None self._stop = asyncio.Event() async def start(self) -> None: if self._task is None or self._task.done(): self._stop.clear() - self._task = asyncio.create_task(self._run(), name="market-snapshot-writer") + self._task = asyncio.create_task( + self._run(), name="market-snapshot-writer") async def stop(self) -> None: self._stop.set() @@ -49,7 +51,7 @@ class AsyncMarketSnapshotWriter: continue try: - self._repository.insert( + await self._repository.insert( MarketSnapshotRecord( snapshot_at=item.snapshot_at, symbol=item.symbol, @@ -59,6 +61,7 @@ class AsyncMarketSnapshotWriter: ) ) except Exception as exc: - _LOG.error("market_snapshot_write_failed", error=str(exc), symbol=item.symbol) + _LOG.error("market_snapshot_write_failed", + error=str(exc), symbol=item.symbol) finally: self._queue.task_done() diff --git a/src/arbitrade/storage/opportunities.py b/src/arbitrade/storage/opportunities.py index 032b23a..419520d 100644 --- a/src/arbitrade/storage/opportunities.py +++ b/src/arbitrade/storage/opportunities.py @@ -13,14 +13,16 @@ _LOG = structlog.get_logger(__name__) class AsyncOpportunityWriter: def __init__(self, repository: OpportunityRepository, max_queue_size: int = 50_000) -> None: self._repository = repository - self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue(maxsize=max_queue_size) + self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue( + maxsize=max_queue_size) self._task: asyncio.Task[None] | None = None self._stop = asyncio.Event() async def start(self) -> None: if self._task is None or self._task.done(): self._stop.clear() - self._task = asyncio.create_task(self._run(), name="opportunity-writer") + self._task = asyncio.create_task( + self._run(), name="opportunity-writer") async def stop(self) -> None: self._stop.set() @@ -38,7 +40,7 @@ class AsyncOpportunityWriter: continue try: - self._repository.insert( + await self._repository.insert( OpportunityRecord( detected_at=event.detected_at, cycle=event.cycle, diff --git a/src/arbitrade/storage/pg_store.py b/src/arbitrade/storage/pg_store.py new file mode 100644 index 0000000..58c45f7 --- /dev/null +++ b/src/arbitrade/storage/pg_store.py @@ -0,0 +1,134 @@ +"""PostgreSQL store — async connection pool wrapper around asyncpg.""" + +from __future__ import annotations + +from pathlib import Path + +import asyncpg +import structlog + +from arbitrade.config.settings import Settings + +_LOG = structlog.get_logger(__name__) + +SCHEMA_VERSION = 1 + + +class PgStore: + """Async PostgreSQL connection pool for the arbitrade bot. + + Wraps an ``asyncpg.Pool`` with schema migration support. + """ + + def __init__(self, settings: Settings) -> None: + self._dsn: str | None = None + self._pool: asyncpg.Pool | None = None + self._settings = settings + + # ── lifecycle ──────────────────────────────────────────────── + + async def start(self) -> None: + """Create the connection pool.""" + s = self._settings + self._pool = await asyncpg.create_pool( + host=s.pg_host, + port=s.pg_port, + database=s.pg_database, + user=s.pg_user, + password=s.pg_password, + min_size=s.pg_min_connections, + max_size=s.pg_max_connections, + ) + _LOG.info( + "pg_pool_created", + host=s.pg_host, + database=s.pg_database, + min_size=s.pg_min_connections, + max_size=s.pg_max_connections, + ) + + async def stop(self) -> None: + """Close the connection pool.""" + if self._pool is not None: + await self._pool.close() + self._pool = None + _LOG.info("pg_pool_closed") + + @property + def pool(self) -> asyncpg.Pool: + """Return the underlying connection pool. + + Raises ``RuntimeError`` if ``start()`` has not been called yet. + """ + if self._pool is None: + raise RuntimeError("PgStore not started — call start() first") + return self._pool + + # ── schema migration ───────────────────────────────────────── + + async def migrate(self) -> None: + """Apply the PostgreSQL schema. + + Reads ``schema_pg.sql`` from the same package directory and + executes it, then records the migration version. + """ + schema_path = Path(__file__).with_name("schema_pg.sql") + schema_sql = schema_path.read_text(encoding="utf-8") + + async with self.pool.acquire() as conn: + # Apply the full schema (CREATE TABLE IF NOT EXISTS …) + await conn.execute(schema_sql) + + # Record the current schema version + await conn.execute( + """ + INSERT INTO schema_migrations (version, applied_at) + VALUES ($1, CURRENT_TIMESTAMP) + ON CONFLICT (version) DO UPDATE SET applied_at = CURRENT_TIMESTAMP + """, + SCHEMA_VERSION, + ) + + _LOG.info("pg_schema_migrated", version=SCHEMA_VERSION) + + # ── helpers ────────────────────────────────────────────────── + + async def table_exists(self, table_name: str) -> bool: + """Check if a table exists in the current schema.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT COUNT(*) AS cnt + FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = $1 + """, + table_name, + ) + return bool(row and row["cnt"] > 0) + + async def get_table_columns(self, table_name: str) -> set[str]: + """Return the set of column names for *table_name*.""" + async with self.pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = $1 + """, + table_name, + ) + return {str(r["column_name"]) for r in rows} + + async def ensure_column(self, table_name: str, column_def: str) -> None: + """Add a column to *table_name* if it does not already exist. + + ``column_def`` should be something like ``"my_col VARCHAR"``. + """ + existing = await self.get_table_columns(table_name) + col_name = column_def.split()[0] + if col_name not in existing: + async with self.pool.acquire() as conn: + await conn.execute( + f"ALTER TABLE {table_name} ADD COLUMN {column_def}" + ) + _LOG.info("pg_column_added", table=table_name, column=col_name) diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index a7240ec..9ba472f 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -12,7 +12,7 @@ from arbitrade.config.service import ( ConfigSection, ConfigSetting, ) -from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.pg_store import PgStore @dataclass(slots=True) @@ -94,33 +94,31 @@ class RuntimeStateRecord: class MarketSnapshotRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def insert(self, record: MarketSnapshotRecord) -> None: - with self._store.connect() as conn: - conn.execute( + async def insert(self, record: MarketSnapshotRecord) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( """ INSERT INTO market_snapshots (snapshot_at, symbol, source, payload, latency_ms) - VALUES (?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5) """, - [ - record.snapshot_at, - record.symbol, - record.source, - orjson.dumps(record.payload).decode("utf-8"), - record.latency_ms, - ], + record.snapshot_at, + record.symbol, + record.source, + orjson.dumps(record.payload).decode("utf-8"), + record.latency_ms, ) class OpportunityRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def insert(self, record: OpportunityRecord) -> None: - with self._store.connect() as conn: - conn.execute( + async def insert(self, record: OpportunityRecord) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( """ INSERT INTO opportunities ( detected_at, @@ -130,26 +128,24 @@ class OpportunityRepository: est_profit, executed ) - VALUES (?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6) """, - [ - record.detected_at, - record.cycle, - record.gross_pct, - record.net_pct, - record.est_profit, - record.executed, - ], + record.detected_at, + record.cycle, + record.gross_pct, + record.net_pct, + record.est_profit, + record.executed, ) class TradeRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def insert(self, record: TradeRecord) -> None: - with self._store.connect() as conn: - conn.execute( + async def insert(self, record: TradeRecord) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( """ INSERT INTO trades ( trade_ref, @@ -162,29 +158,27 @@ class TradeRepository: cycle, leg_count ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) """, - [ - record.trade_ref, - record.started_at, - record.finished_at, - record.status, - record.realized_pnl, - record.estimated_pnl, - record.capital_used, - record.cycle, - record.leg_count, - ], + record.trade_ref, + record.started_at, + record.finished_at, + record.status, + record.realized_pnl, + record.estimated_pnl, + record.capital_used, + record.cycle, + record.leg_count, ) class OrderRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def insert(self, record: OrderRecord) -> None: - with self._store.connect() as conn: - conn.execute( + async def insert(self, record: OrderRecord) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( """ INSERT INTO orders ( trade_ref, @@ -200,32 +194,30 @@ class OrderRepository: raw_response, recorded_at ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) """, - [ - record.trade_ref, - record.order_ref, - record.leg_index, - record.pair, - record.side, - record.volume, - record.user_ref, - record.status, - record.filled_volume, - record.avg_price, - orjson.dumps(record.raw_response).decode("utf-8"), - record.recorded_at, - ], + record.trade_ref, + record.order_ref, + record.leg_index, + record.pair, + record.side, + record.volume, + record.user_ref, + record.status, + record.filled_volume, + record.avg_price, + orjson.dumps(record.raw_response).decode("utf-8"), + record.recorded_at, ) class PnLRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def insert(self, record: PnLRecord) -> None: - with self._store.connect() as conn: - conn.execute( + async def insert(self, record: PnLRecord) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( """ INSERT INTO pnl_events ( trade_ref, @@ -234,25 +226,23 @@ class PnLRepository: pnl_usd, source ) - VALUES (?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5) """, - [ - record.trade_ref, - record.recorded_at, - record.kind, - record.pnl_usd, - record.source, - ], + record.trade_ref, + record.recorded_at, + record.kind, + record.pnl_usd, + record.source, ) class AuditRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def insert(self, record: AuditRecord) -> None: - with self._store.connect() as conn: - conn.execute( + async def insert(self, record: AuditRecord) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( """ INSERT INTO audit_events ( occurred_at, @@ -262,38 +252,36 @@ class AuditRepository: payload, correlation_id ) - VALUES (?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6) """, - [ - record.occurred_at, - record.actor, - record.event_type, - record.decision, - ( - None - if record.payload is None - else orjson.dumps(record.payload).decode("utf-8") - ), - record.correlation_id, - ], + record.occurred_at, + record.actor, + record.event_type, + record.decision, + ( + None + if record.payload is None + else orjson.dumps(record.payload).decode("utf-8") + ), + record.correlation_id, ) - def list_recent(self, *, limit: int = 25) -> list[AuditRecord]: - with self._store.connect() as conn: - rows = conn.execute( + async def list_recent(self, *, limit: int = 25) -> list[AuditRecord]: + async with self._store.pool.acquire() as conn: + rows = await conn.fetch( """ SELECT occurred_at, actor, event_type, decision, payload, correlation_id FROM audit_events ORDER BY occurred_at DESC - LIMIT ? + LIMIT $1 """, - [limit], - ).fetchall() + limit, + ) records: list[AuditRecord] = [] for row in rows: payload: dict[str, Any] | None = None - raw_payload = row[4] + raw_payload = row["payload"] if isinstance(raw_payload, str) and raw_payload: decoded = orjson.loads(raw_payload) if isinstance(decoded, dict): @@ -301,12 +289,13 @@ class AuditRepository: records.append( AuditRecord( - occurred_at=row[0], - actor=str(row[1]), - event_type=str(row[2]), - decision=str(row[3]), + occurred_at=row["occurred_at"], + actor=str(row["actor"]), + event_type=str(row["event_type"]), + decision=str(row["decision"]), payload=payload, - correlation_id=str(row[5]) if row[5] is not None else None, + correlation_id=str( + row["correlation_id"]) if row["correlation_id"] is not None else None, ) ) @@ -314,12 +303,12 @@ class AuditRepository: class RuntimeStateRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def insert(self, record: RuntimeStateRecord) -> None: - with self._store.connect() as conn: - conn.execute( + async def insert(self, record: RuntimeStateRecord) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( """ INSERT INTO runtime_state_snapshots ( snapshot_at, @@ -330,26 +319,24 @@ class RuntimeStateRepository: last_known_balances, note ) - VALUES (?, ?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6, $7) """, - [ - record.snapshot_at, - record.is_running, - record.kill_switch_active, - record.kill_switch_reason, - record.open_trade_count, - ( - None - if record.last_known_balances is None - else orjson.dumps(record.last_known_balances).decode("utf-8") - ), - record.note, - ], + record.snapshot_at, + record.is_running, + record.kill_switch_active, + record.kill_switch_reason, + record.open_trade_count, + ( + None + if record.last_known_balances is None + else orjson.dumps(record.last_known_balances).decode("utf-8") + ), + record.note, ) - def latest(self) -> RuntimeStateRecord | None: - with self._store.connect() as conn: - row = conn.execute(""" + async def latest(self) -> RuntimeStateRecord | None: + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow(""" SELECT snapshot_at, is_running, @@ -361,491 +348,464 @@ class RuntimeStateRepository: FROM runtime_state_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """).fetchone() + """) if row is None: return None balances: dict[str, Any] | None = None - raw_balances = row[5] + raw_balances = row["last_known_balances"] if isinstance(raw_balances, str) and raw_balances: decoded = orjson.loads(raw_balances) if isinstance(decoded, dict): balances = {str(key): decoded[key] for key in decoded} return RuntimeStateRecord( - snapshot_at=row[0], - is_running=bool(row[1]), - kill_switch_active=bool(row[2]), - kill_switch_reason=str(row[3]) if row[3] is not None else None, - open_trade_count=int(row[4]), + snapshot_at=row["snapshot_at"], + is_running=bool(row["is_running"]), + kill_switch_active=bool(row["kill_switch_active"]), + kill_switch_reason=str( + row["kill_switch_reason"]) if row["kill_switch_reason"] is not None else None, + open_trade_count=int(row["open_trade_count"]), last_known_balances=balances, - note=str(row[6]) if row[6] is not None else None, + note=str(row["note"]) if row["note"] is not None else None, ) # Configuration repository classes class ConfigSectionRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def create_section(self, section: ConfigSection) -> ConfigSection: + async def create_section(self, section: ConfigSection) -> ConfigSection: """Create a new configuration section.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ INSERT INTO config_sections (name, description) - VALUES (?, ?) + VALUES ($1, $2) RETURNING id, name, description, updated_at """, - (section.name, section.description), + section.name, section.description, ) - row = cursor.fetchone() if row: - return ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3]) + return ConfigSection(id=row["id"], name=row["name"], description=row["description"], updated_at=row["updated_at"]) raise ValueError("Failed to create section") - def get_section(self, name: str) -> ConfigSection | None: + async def get_section(self, name: str) -> ConfigSection | None: """Get a configuration section by name.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ SELECT id, name, description, updated_at FROM config_sections - WHERE name = ? + WHERE name = $1 """, - (name,), + name, ) - row = cursor.fetchone() if row: - return ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3]) + return ConfigSection(id=row["id"], name=row["name"], description=row["description"], updated_at=row["updated_at"]) return None - def _build_section_from_row(self, row: tuple[Any, ...]) -> ConfigSection: - return ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3]) - - def list_sections(self) -> list[ConfigSection]: + async def list_sections(self) -> list[ConfigSection]: """List all configuration sections.""" - with self._store.connect() as conn: - cursor = conn.execute(""" + async with self._store.pool.acquire() as conn: + rows = await conn.fetch(""" SELECT id, name, description, updated_at FROM config_sections ORDER BY name """) - c = cursor.fetchall() - r = [] - for row in c: - s = self._build_section_from_row(row) - r.append(s) - - return r + return [ + ConfigSection( + id=r["id"], name=r["name"], description=r["description"], updated_at=r["updated_at"]) + for r in rows + ] class ConfigSettingRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def create_setting(self, setting: ConfigSetting) -> ConfigSetting: + async def create_setting(self, setting: ConfigSetting) -> ConfigSetting: """Create a new configuration setting.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ INSERT INTO config_settings (key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_by) - VALUES (?, ?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by """, - ( - setting.key, - setting.section, - setting.value_json, - setting.value_type, - setting.is_secret, - setting.is_runtime_reloadable, - setting.updated_by, - ), + setting.key, + setting.section, + setting.value_json, + setting.value_type, + setting.is_secret, + setting.is_runtime_reloadable, + setting.updated_by, ) - row = cursor.fetchone() if row: return ConfigSetting( - key=row[0], - section=row[1], - value_json=row[2], - value_type=row[3], - is_secret=bool(row[4]), - is_runtime_reloadable=bool(row[5]), - updated_at=row[6], - updated_by=row[7], + key=row["key"], + section=row["section"], + value_json=row["value_json"], + value_type=row["value_type"], + is_secret=bool(row["is_secret"]), + is_runtime_reloadable=bool(row["is_runtime_reloadable"]), + updated_at=row["updated_at"], + updated_by=row["updated_by"], ) raise ValueError("Failed to create setting") - def get_setting(self, key: str) -> ConfigSetting | None: + async def get_setting(self, key: str) -> ConfigSetting | None: """Get a configuration setting by key.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by FROM config_settings - WHERE key = ? + WHERE key = $1 """, - (key,), + key, ) - row = cursor.fetchone() if row: return ConfigSetting( - key=row[0], - section=row[1], - value_json=row[2], - value_type=row[3], - is_secret=bool(row[4]), - is_runtime_reloadable=bool(row[5]), - updated_at=row[6], - updated_by=row[7], + key=row["key"], + section=row["section"], + value_json=row["value_json"], + value_type=row["value_type"], + is_secret=bool(row["is_secret"]), + is_runtime_reloadable=bool(row["is_runtime_reloadable"]), + updated_at=row["updated_at"], + updated_by=row["updated_by"], ) return None - def update_setting(self, key: str, setting: ConfigSetting) -> ConfigSetting: + async def update_setting(self, key: str, setting: ConfigSetting) -> ConfigSetting: """Update an existing configuration setting.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ UPDATE config_settings - SET section = ?, value_json = ?, value_type = ?, is_secret = ?, is_runtime_reloadable = ?, updated_by = ? - WHERE key = ? + SET section = $1, value_json = $2, value_type = $3, is_secret = $4, is_runtime_reloadable = $5, updated_by = $6 + WHERE key = $7 RETURNING key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by """, - ( - setting.section, - setting.value_json, - setting.value_type, - setting.is_secret, - setting.is_runtime_reloadable, - setting.updated_by, - key, - ), + setting.section, + setting.value_json, + setting.value_type, + setting.is_secret, + setting.is_runtime_reloadable, + setting.updated_by, + key, ) - row = cursor.fetchone() if row: return ConfigSetting( - key=row[0], - section=row[1], - value_json=row[2], - value_type=row[3], - is_secret=bool(row[4]), - is_runtime_reloadable=bool(row[5]), - updated_at=row[6], - updated_by=row[7], + key=row["key"], + section=row["section"], + value_json=row["value_json"], + value_type=row["value_type"], + is_secret=bool(row["is_secret"]), + is_runtime_reloadable=bool(row["is_runtime_reloadable"]), + updated_at=row["updated_at"], + updated_by=row["updated_by"], ) raise ValueError("Failed to update setting") - def delete_setting(self, key: str) -> bool: + async def delete_setting(self, key: str) -> bool: """Delete a configuration setting.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + result = await conn.execute( """ DELETE FROM config_settings - WHERE key = ? + WHERE key = $1 """, - (key,), + key, ) - return cursor.rowcount > 0 + return result != "DELETE 0" - def list_settings(self, section: str | None = None) -> list[ConfigSetting]: + async def list_settings(self, section: str | None = None) -> list[ConfigSetting]: """List all configuration settings, optionally filtered by section.""" - with self._store.connect() as conn: + async with self._store.pool.acquire() as conn: if section: - cursor = conn.execute( + rows = await conn.fetch( """ SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by FROM config_settings - WHERE section = ? + WHERE section = $1 ORDER BY key """, - (section,), + section, ) else: - cursor = conn.execute(""" + rows = await conn.fetch(""" SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by FROM config_settings ORDER BY key """) return [ ConfigSetting( - key=row[0], - section=row[1], - value_json=row[2], - value_type=row[3], - is_secret=bool(row[4]), - is_runtime_reloadable=bool(row[5]), - updated_at=row[6], - updated_by=row[7], + key=r["key"], + section=r["section"], + value_json=r["value_json"], + value_type=r["value_type"], + is_secret=bool(r["is_secret"]), + is_runtime_reloadable=bool(r["is_runtime_reloadable"]), + updated_at=r["updated_at"], + updated_by=r["updated_by"], ) - for row in cursor.fetchall() + for r in rows ] - def get_latest_updated_at(self) -> datetime | None: + async def get_latest_updated_at(self) -> datetime | None: """Get the latest updated_at timestamp from config_settings table.""" - with self._store.connect() as conn: - cursor = conn.execute(""" + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow(""" SELECT MAX(updated_at) as latest_updated_at FROM config_settings """) - row = cursor.fetchone() - if row and row[0]: - # Convert string timestamp to datetime - return datetime.fromisoformat(row[0].replace("Z", "+00:00")) + if row and row["latest_updated_at"]: + ts = row["latest_updated_at"] + if isinstance(ts, str): + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + return ts return None class ConfigPairingRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def create_pairing(self, pairing: ConfigPairing) -> ConfigPairing: + async def create_pairing(self, pairing: ConfigPairing) -> ConfigPairing: """Create a new currency pairing.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ INSERT INTO config_pairings (base_asset, quote_asset, enabled, source) - VALUES (?, ?, ?, ?) + VALUES ($1, $2, $3, $4) RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at """, - ( - pairing.base_asset, - pairing.quote_asset, - pairing.enabled, - pairing.source, - ), + pairing.base_asset, + pairing.quote_asset, + pairing.enabled, + pairing.source, ) - row = cursor.fetchone() if row: return ConfigPairing( - id=row[0], - base_asset=row[1], - quote_asset=row[2], - enabled=bool(row[3]), - source=row[4], - created_at=row[5], - updated_at=row[6], + id=row["id"], + base_asset=row["base_asset"], + quote_asset=row["quote_asset"], + enabled=bool(row["enabled"]), + source=row["source"], + created_at=row["created_at"], + updated_at=row["updated_at"], ) raise ValueError("Failed to create pairing") - def get_pairing(self, base_asset: str, quote_asset: str) -> ConfigPairing | None: + async def get_pairing(self, base_asset: str, quote_asset: str) -> ConfigPairing | None: """Get a currency pairing by base and quote assets.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at FROM config_pairings - WHERE base_asset = ? AND quote_asset = ? + WHERE base_asset = $1 AND quote_asset = $2 """, - (base_asset, quote_asset), + base_asset, quote_asset, ) - row = cursor.fetchone() if row: return ConfigPairing( - id=row[0], - base_asset=row[1], - quote_asset=row[2], - enabled=bool(row[3]), - source=row[4], - created_at=row[5], - updated_at=row[6], + id=row["id"], + base_asset=row["base_asset"], + quote_asset=row["quote_asset"], + enabled=bool(row["enabled"]), + source=row["source"], + created_at=row["created_at"], + updated_at=row["updated_at"], ) return None - def update_pairing( + async def update_pairing( self, base_asset: str, quote_asset: str, pairing: ConfigPairing ) -> ConfigPairing: """Update an existing currency pairing.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ UPDATE config_pairings - SET enabled = ?, source = ? - WHERE base_asset = ? AND quote_asset = ? + SET enabled = $1, source = $2 + WHERE base_asset = $3 AND quote_asset = $4 RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at """, - ( - pairing.enabled, - pairing.source, - base_asset, - quote_asset, - ), + pairing.enabled, + pairing.source, + base_asset, + quote_asset, ) - row = cursor.fetchone() if row: return ConfigPairing( - id=row[0], - base_asset=row[1], - quote_asset=row[2], - enabled=bool(row[3]), - source=row[4], - created_at=row[5], - updated_at=row[6], + id=row["id"], + base_asset=row["base_asset"], + quote_asset=row["quote_asset"], + enabled=bool(row["enabled"]), + source=row["source"], + created_at=row["created_at"], + updated_at=row["updated_at"], ) raise ValueError("Failed to update pairing") - def delete_pairing(self, base_asset: str, quote_asset: str) -> bool: + async def delete_pairing(self, base_asset: str, quote_asset: str) -> bool: """Delete a currency pairing.""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + result = await conn.execute( """ DELETE FROM config_pairings - WHERE base_asset = ? AND quote_asset = ? + WHERE base_asset = $1 AND quote_asset = $2 """, - (base_asset, quote_asset), + base_asset, quote_asset, ) - return cursor.rowcount > 0 + return result != "DELETE 0" - def upsert_pairing(self, pairing: ConfigPairing) -> ConfigPairing: + async def upsert_pairing(self, pairing: ConfigPairing) -> ConfigPairing: """Insert or update a currency pairing (upsert on base_asset, quote_asset).""" - with self._store.connect() as conn: - cursor = conn.execute( + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """ INSERT INTO config_pairings (base_asset, quote_asset, enabled, source) - VALUES (?, ?, ?, ?) + VALUES ($1, $2, $3, $4) ON CONFLICT(base_asset, quote_asset) DO UPDATE SET enabled = EXCLUDED.enabled, source = EXCLUDED.source, - updated_at = current_timestamp + updated_at = CURRENT_TIMESTAMP RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at """, - ( - pairing.base_asset, - pairing.quote_asset, - pairing.enabled, - pairing.source, - ), + pairing.base_asset, + pairing.quote_asset, + pairing.enabled, + pairing.source, ) - row = cursor.fetchone() if row: return ConfigPairing( - id=row[0], - base_asset=row[1], - quote_asset=row[2], - enabled=bool(row[3]), - source=row[4], - created_at=row[5], - updated_at=row[6], + id=row["id"], + base_asset=row["base_asset"], + quote_asset=row["quote_asset"], + enabled=bool(row["enabled"]), + source=row["source"], + created_at=row["created_at"], + updated_at=row["updated_at"], ) raise ValueError("Failed to upsert pairing") - def list_pairings(self, enabled_only: bool = False) -> list[ConfigPairing]: + async def list_pairings(self, enabled_only: bool = False) -> list[ConfigPairing]: """List all currency pairings. If enabled_only=True, only enabled pairings.""" - with self._store.connect() as conn: + async with self._store.pool.acquire() as conn: query = """ SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at FROM config_pairings """ - params: list[object] = [] if enabled_only: query += " WHERE enabled = TRUE" query += " ORDER BY base_asset, quote_asset" - cursor = conn.execute(query, params) + rows = await conn.fetch(query) return [ ConfigPairing( - id=row[0], - base_asset=row[1], - quote_asset=row[2], - enabled=bool(row[3]), - source=row[4], - created_at=row[5], - updated_at=row[6], + id=r["id"], + base_asset=r["base_asset"], + quote_asset=r["quote_asset"], + enabled=bool(r["enabled"]), + source=r["source"], + created_at=r["created_at"], + updated_at=r["updated_at"], ) - for row in cursor.fetchall() + for r in rows ] class ConfigBacktestingDefaultsRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: + async def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: """Create new backtesting defaults.""" - with self._store.connect() as conn: + async with self._store.pool.acquire() as conn: balances_json = ( orjson.dumps(defaults.starting_balances).decode("utf-8") if defaults.starting_balances else None ) - cursor = conn.execute( + row = await conn.fetchrow( """ INSERT INTO config_backtesting_defaults (starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms) - VALUES (?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5) RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms """, - ( - balances_json, - defaults.trade_capital, - defaults.min_profit_threshold, - defaults.slippage_bps, - defaults.execution_latency_ms, - ), + balances_json, + defaults.trade_capital, + defaults.min_profit_threshold, + defaults.slippage_bps, + defaults.execution_latency_ms, ) - row = cursor.fetchone() if row: return ConfigBacktestingDefaults( - starting_balances=orjson.loads(row[1]) if row[1] else None, - trade_capital=row[2], - min_profit_threshold=row[3], - slippage_bps=row[4], - execution_latency_ms=row[5], + starting_balances=orjson.loads( + row["starting_balances"]) if row["starting_balances"] else None, + trade_capital=row["trade_capital"], + min_profit_threshold=row["min_profit_threshold"], + slippage_bps=row["slippage_bps"], + execution_latency_ms=row["execution_latency_ms"], ) raise ValueError("Failed to create backtesting defaults") - def get_defaults(self) -> ConfigBacktestingDefaults | None: + async def get_defaults(self) -> ConfigBacktestingDefaults | None: """Get the current backtesting defaults.""" - with self._store.connect() as conn: - cursor = conn.execute(""" + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow(""" SELECT id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms FROM config_backtesting_defaults ORDER BY id DESC LIMIT 1 """) - row = cursor.fetchone() if row: return ConfigBacktestingDefaults( - starting_balances=orjson.loads(row[1]) if row[1] else None, - trade_capital=row[2], - min_profit_threshold=row[3], - slippage_bps=row[4], - execution_latency_ms=row[5], + starting_balances=orjson.loads( + row["starting_balances"]) if row["starting_balances"] else None, + trade_capital=row["trade_capital"], + min_profit_threshold=row["min_profit_threshold"], + slippage_bps=row["slippage_bps"], + execution_latency_ms=row["execution_latency_ms"], ) return None - def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: + async def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: """Update the backtesting defaults.""" - with self._store.connect() as conn: + async with self._store.pool.acquire() as conn: starting_balances_json = ( orjson.dumps(defaults.starting_balances).decode("utf-8") if defaults.starting_balances else None ) - cursor = conn.execute( + row = await conn.fetchrow( """ UPDATE config_backtesting_defaults - SET starting_balances = ?, trade_capital = ?, min_profit_threshold = ?, slippage_bps = ?, execution_latency_ms = ? + SET starting_balances = $1, trade_capital = $2, min_profit_threshold = $3, slippage_bps = $4, execution_latency_ms = $5 WHERE id = ( SELECT id FROM config_backtesting_defaults ORDER BY id DESC LIMIT 1 ) RETURNING id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms """, - ( - starting_balances_json, - defaults.trade_capital, - defaults.min_profit_threshold, - defaults.slippage_bps, - defaults.execution_latency_ms, - ), + starting_balances_json, + defaults.trade_capital, + defaults.min_profit_threshold, + defaults.slippage_bps, + defaults.execution_latency_ms, ) - row = cursor.fetchone() if row: return ConfigBacktestingDefaults( - starting_balances=orjson.loads(row[1]) if row[1] else None, - trade_capital=row[2], - min_profit_threshold=row[3], - slippage_bps=row[4], - execution_latency_ms=row[5], + starting_balances=orjson.loads( + row["starting_balances"]) if row["starting_balances"] else None, + trade_capital=row["trade_capital"], + min_profit_threshold=row["min_profit_threshold"], + slippage_bps=row["slippage_bps"], + execution_latency_ms=row["execution_latency_ms"], ) raise ValueError("Failed to update backtesting defaults") @@ -862,11 +822,11 @@ class KrakenAccountSnapshot: class KrakenAccountSnapshotRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def insert_snapshot(self, snapshot: KrakenAccountSnapshot) -> None: - with self._store.connect() as conn: + async def insert_snapshot(self, snapshot: KrakenAccountSnapshot) -> None: + async with self._store.pool.acquire() as conn: trade_balance_json = ( orjson.dumps(snapshot.trade_balance_raw).decode("utf-8") if snapshot.trade_balance_raw @@ -877,43 +837,43 @@ class KrakenAccountSnapshotRepository: if snapshot.fee_schedule_raw else None ) - conn.execute( + await conn.execute( """ INSERT INTO kraken_account_snapshots (snapshot_at, fee_tier, maker_fee, taker_fee, thirty_day_volume, trade_balance_raw, fee_schedule_raw) - VALUES (?, ?, ?, ?, ?, ?, ?) + VALUES ($1, $2, $3, $4, $5, $6, $7) """, - ( - snapshot.snapshot_at, - snapshot.fee_tier, - snapshot.maker_fee, - snapshot.taker_fee, - snapshot.thirty_day_volume, - trade_balance_json, - fee_schedule_json, - ), + snapshot.snapshot_at, + snapshot.fee_tier, + snapshot.maker_fee, + snapshot.taker_fee, + snapshot.thirty_day_volume, + trade_balance_json, + fee_schedule_json, ) - def latest_snapshot(self) -> KrakenAccountSnapshot | None: - with self._store.connect() as conn: - row = conn.execute(""" + async def latest_snapshot(self) -> KrakenAccountSnapshot | None: + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow(""" SELECT snapshot_at, fee_tier, maker_fee, taker_fee, thirty_day_volume, trade_balance_raw, fee_schedule_raw FROM kraken_account_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """).fetchone() + """) if row is None: return None return KrakenAccountSnapshot( - snapshot_at=row[0], - fee_tier=row[1], - maker_fee=row[2], - taker_fee=row[3], - thirty_day_volume=row[4], - trade_balance_raw=orjson.loads(row[5]) if row[5] else None, - fee_schedule_raw=orjson.loads(row[6]) if row[6] else None, + snapshot_at=row["snapshot_at"], + fee_tier=row["fee_tier"], + maker_fee=row["maker_fee"], + taker_fee=row["taker_fee"], + thirty_day_volume=row["thirty_day_volume"], + trade_balance_raw=orjson.loads( + row["trade_balance_raw"]) if row["trade_balance_raw"] else None, + fee_schedule_raw=orjson.loads( + row["fee_schedule_raw"]) if row["fee_schedule_raw"] else None, ) @@ -931,109 +891,109 @@ class BacktestJobRecord: class BacktestJobRepository: - def __init__(self, store: DuckDBStore) -> None: + def __init__(self, store: PgStore) -> None: self._store = store - def create_job( + async def create_job( self, events_path: str, config: dict[str, Any] | None = None ) -> BacktestJobRecord: - with self._store.connect() as conn: + async with self._store.pool.acquire() as conn: if config is not None: job_config_json = orjson.dumps(config).decode("utf-8") else: raise ValueError("Config is required.") - row = conn.execute( + row = await conn.fetchrow( """ INSERT INTO backtest_jobs (events_path, config) - VALUES (?, ?) + VALUES ($1, $2) RETURNING id, status, events_path, config, created_at """, - (events_path, job_config_json), - ).fetchone() + events_path, job_config_json, + ) if row is None: raise ValueError("Failed to create backtest job") return BacktestJobRecord( - id=str(row[0]), - status=str(row[1]), - events_path=str(row[2]), - config=orjson.loads(row[3]) if row[3] else None, - created_at=row[4], + id=str(row["id"]), + status=str(row["status"]), + events_path=str(row["events_path"]), + config=orjson.loads(row["config"]) if row["config"] else None, + created_at=row["created_at"], ) - def update_status(self, job_id: str, status: str, error: str | None = None) -> None: - with self._store.connect() as conn: + async def update_status(self, job_id: str, status: str, error: str | None = None) -> None: + async with self._store.pool.acquire() as conn: if status == "running": - conn.execute( - "UPDATE backtest_jobs SET status = ?, started_at = current_timestamp WHERE id = ?", - (status, job_id), + await conn.execute( + "UPDATE backtest_jobs SET status = $1, started_at = CURRENT_TIMESTAMP WHERE id = $2", + status, job_id, ) elif status in ("completed", "failed"): - conn.execute( - "UPDATE backtest_jobs SET status = ?, finished_at = current_timestamp, error = ? WHERE id = ?", - (status, error, job_id), + await conn.execute( + "UPDATE backtest_jobs SET status = $1, finished_at = CURRENT_TIMESTAMP, error = $2 WHERE id = $3", + status, error, job_id, ) else: - conn.execute( - "UPDATE backtest_jobs SET status = ?, error = ? WHERE id = ?", - (status, error, job_id), + await conn.execute( + "UPDATE backtest_jobs SET status = $1, error = $2 WHERE id = $3", + status, error, job_id, ) - def store_report(self, job_id: str, report: dict[str, Any]) -> None: - with self._store.connect() as conn: - conn.execute( - "UPDATE backtest_jobs SET report = ? WHERE id = ?", - (orjson.dumps(report).decode("utf-8"), job_id), + async def store_report(self, job_id: str, report: dict[str, Any]) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( + "UPDATE backtest_jobs SET report = $1 WHERE id = $2", + orjson.dumps(report).decode("utf-8"), job_id, ) - def get_job(self, job_id: str) -> BacktestJobRecord | None: - with self._store.connect() as conn: - row = conn.execute( + async def get_job(self, job_id: str) -> BacktestJobRecord | None: + async with self._store.pool.acquire() as conn: + row = await conn.fetchrow( """SELECT id, status, events_path, config, report, error, created_at, started_at, finished_at - FROM backtest_jobs WHERE id = ?""", - (job_id,), - ).fetchone() + FROM backtest_jobs WHERE id = $1""", + job_id, + ) if row is None: return None return BacktestJobRecord( - id=str(row[0]), - status=str(row[1]), - events_path=str(row[2]), - config=orjson.loads(row[3]) if row[3] else None, - report=orjson.loads(row[4]) if row[4] else None, - error=str(row[5]) if row[5] else None, - created_at=row[6], - started_at=row[7], - finished_at=row[8], + id=str(row["id"]), + status=str(row["status"]), + events_path=str(row["events_path"]), + config=orjson.loads(row["config"]) if row["config"] else None, + report=orjson.loads(row["report"]) if row["report"] else None, + error=str(row["error"]) if row["error"] else None, + created_at=row["created_at"], + started_at=row["started_at"], + finished_at=row["finished_at"], ) - def list_jobs(self, limit: int = 20) -> list[BacktestJobRecord]: - with self._store.connect() as conn: - rows = conn.execute( + async def list_jobs(self, limit: int = 20) -> list[BacktestJobRecord]: + async with self._store.pool.acquire() as conn: + rows = await conn.fetch( """SELECT id, status, events_path, config, report, error, created_at, started_at, finished_at - FROM backtest_jobs ORDER BY created_at DESC LIMIT ?""", - (limit,), - ).fetchall() + FROM backtest_jobs ORDER BY created_at DESC LIMIT $1""", + limit, + ) return [ BacktestJobRecord( - id=str(r[0]), - status=str(r[1]), - events_path=str(r[2]), - config=orjson.loads(r[3]) if r[3] else None, - report=orjson.loads(r[4]) if r[4] else None, - error=str(r[5]) if r[5] else None, - created_at=r[6], - started_at=r[7], - finished_at=r[8], + id=str(r["id"]), + status=str(r["status"]), + events_path=str(r["events_path"]), + config=orjson.loads(r["config"]) if r["config"] else None, + report=orjson.loads(r["report"]) if r["report"] else None, + error=str(r["error"]) if r["error"] else None, + created_at=r["created_at"], + started_at=r["started_at"], + finished_at=r["finished_at"], ) for r in rows ] - def delete_job(self, job_id: str) -> bool: - with self._store.connect() as conn: - result = conn.execute( - "DELETE FROM backtest_jobs WHERE id = ?", - (job_id,), + async def delete_job(self, job_id: str) -> bool: + async with self._store.pool.acquire() as conn: + result = await conn.execute( + "DELETE FROM backtest_jobs WHERE id = $1", + job_id, ) - return result.rowcount > 0 + return result != "DELETE 0" diff --git a/src/arbitrade/storage/schema_pg.sql b/src/arbitrade/storage/schema_pg.sql new file mode 100644 index 0000000..926577f --- /dev/null +++ b/src/arbitrade/storage/schema_pg.sql @@ -0,0 +1,167 @@ +-- PostgreSQL schema for arbitrade bot +-- Requires pgcrypto extension for gen_random_uuid() + +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +-- ======================================== +-- Schema version tracking +-- ======================================== +CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +-- ======================================== +-- Configuration +-- ======================================== +CREATE TABLE IF NOT EXISTS config_sections ( + id SERIAL PRIMARY KEY, + name VARCHAR UNIQUE NOT NULL, + description TEXT, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS config_settings ( + key VARCHAR PRIMARY KEY, + section VARCHAR NOT NULL, + value_json TEXT NOT NULL, + value_type VARCHAR NOT NULL, + is_secret BOOLEAN DEFAULT FALSE, + is_runtime_reloadable BOOLEAN DEFAULT FALSE, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_by VARCHAR +); + +CREATE TABLE IF NOT EXISTS config_pairings ( + id SERIAL PRIMARY KEY, + base_asset VARCHAR NOT NULL, + quote_asset VARCHAR NOT NULL, + enabled BOOLEAN DEFAULT TRUE, + source VARCHAR NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(base_asset, quote_asset) +); + +CREATE TABLE IF NOT EXISTS config_backtesting_defaults ( + id SERIAL PRIMARY KEY, + starting_balances JSONB, + trade_capital DOUBLE PRECISION, + min_profit_threshold DOUBLE PRECISION, + slippage_bps INTEGER, + execution_latency_ms INTEGER, + fee_source VARCHAR DEFAULT 'api' +); + +-- ======================================== +-- Detection & Execution +-- ======================================== +CREATE TABLE IF NOT EXISTS opportunities ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + detected_at TIMESTAMP NOT NULL, + cycle VARCHAR NOT NULL, + gross_pct DOUBLE PRECISION, + net_pct DOUBLE PRECISION, + est_profit DOUBLE PRECISION, + executed BOOLEAN DEFAULT FALSE +); + +CREATE TABLE IF NOT EXISTS trades ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + trade_ref VARCHAR NOT NULL, + started_at TIMESTAMP NOT NULL, + finished_at TIMESTAMP, + status VARCHAR NOT NULL, + realized_pnl DOUBLE PRECISION, + estimated_pnl DOUBLE PRECISION, + capital_used DOUBLE PRECISION, + cycle VARCHAR, + leg_count INTEGER +); + +CREATE TABLE IF NOT EXISTS orders ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + trade_ref VARCHAR NOT NULL, + order_ref VARCHAR NOT NULL, + leg_index INTEGER NOT NULL, + pair VARCHAR NOT NULL, + side VARCHAR NOT NULL, + volume DOUBLE PRECISION NOT NULL, + user_ref INTEGER, + status VARCHAR, + filled_volume DOUBLE PRECISION, + avg_price DOUBLE PRECISION, + raw_response JSONB, + recorded_at TIMESTAMP NOT NULL +); + +CREATE TABLE IF NOT EXISTS pnl_events ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + trade_ref VARCHAR NOT NULL, + recorded_at TIMESTAMP NOT NULL, + kind VARCHAR NOT NULL, + pnl_usd DOUBLE PRECISION NOT NULL, + source VARCHAR NOT NULL +); + +-- ======================================== +-- Snapshots & Monitoring +-- ======================================== +CREATE TABLE IF NOT EXISTS portfolio_snapshots ( + snapshot_at TIMESTAMP NOT NULL, + balances JSONB, + total_value_usd DOUBLE PRECISION +); + +CREATE TABLE IF NOT EXISTS market_snapshots ( + snapshot_at TIMESTAMP NOT NULL, + symbol VARCHAR NOT NULL, + source VARCHAR NOT NULL, + payload JSONB NOT NULL, + latency_ms DOUBLE PRECISION +); + +CREATE TABLE IF NOT EXISTS audit_events ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + occurred_at TIMESTAMP NOT NULL, + actor VARCHAR NOT NULL, + event_type VARCHAR NOT NULL, + decision VARCHAR NOT NULL, + payload JSONB, + correlation_id VARCHAR +); + +CREATE TABLE IF NOT EXISTS runtime_state_snapshots ( + snapshot_at TIMESTAMP NOT NULL, + is_running BOOLEAN NOT NULL, + kill_switch_active BOOLEAN NOT NULL, + kill_switch_reason VARCHAR, + open_trade_count INTEGER NOT NULL, + last_known_balances JSONB, + note VARCHAR +); + +CREATE TABLE IF NOT EXISTS kraken_account_snapshots ( + snapshot_at TIMESTAMP NOT NULL, + fee_tier VARCHAR, + maker_fee DOUBLE PRECISION, + taker_fee DOUBLE PRECISION, + thirty_day_volume DOUBLE PRECISION, + trade_balance_raw JSONB, + fee_schedule_raw JSONB +); + +-- ======================================== +-- Backtesting +-- ======================================== +CREATE TABLE IF NOT EXISTS backtest_jobs ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + status VARCHAR NOT NULL DEFAULT 'pending', + events_path VARCHAR NOT NULL, + config JSONB, + report JSONB, + error VARCHAR, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + started_at TIMESTAMP, + finished_at TIMESTAMP +); \ No newline at end of file diff --git a/src/arbitrade/web/templates/partials/charts.html b/src/arbitrade/web/templates/partials/charts.html index 91c51df..e1ed201 100644 --- a/src/arbitrade/web/templates/partials/charts.html +++ b/src/arbitrade/web/templates/partials/charts.html @@ -7,7 +7,7 @@
Opportunity Trend
-
Recent opportunities from DuckDB. Updated {{ generated_at }}
+
Recent opportunities from PostgreSQL. Updated {{ generated_at }}