diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..18ca329 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,20 @@ +# EditorConfig is awesome: https://editorconfig.org + +# top-most EditorConfig file +root = true + +# Unix-style newlines with a newline ending every file +[*] +end_of_line = lf +insert_final_newline = true + +# Matches multiple files with brace expansion notation +# Set default charset +[*.{js,py}] +charset = utf-8 + +# 4 space indentation +[*.py] +indent_style = space +indent_size = 4 +max_line_length = 120 \ No newline at end of file diff --git a/scripts/backtest_replay.py b/scripts/backtest_replay.py index 7acddfd..ba16912 100644 --- a/scripts/backtest_replay.py +++ b/scripts/backtest_replay.py @@ -19,12 +19,10 @@ def _resolve_fee_rate(fee_rate: float | None, db_path: str | None = None) -> flo if db_path is not None: try: conn = duckdb.connect(db_path) - row = conn.execute( - """ + row = conn.execute(""" SELECT maker_fee FROM kraken_account_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """ - ).fetchone() + """).fetchone() conn.close() if row is not None and row[0] is not None: return float(row[0]) @@ -53,14 +51,13 @@ def _parse_balances(raw: str) -> Mapping[str, float]: def main() -> int: - parser = argparse.ArgumentParser(description="Run a deterministic replay backtest.") + parser = argparse.ArgumentParser(description="Run backtest.") parser.add_argument("--events", type=Path, required=True) parser.add_argument("--starting-balances", type=str, default="USD=1000.0") parser.add_argument("--trade-capital", type=float, default=100.0) parser.add_argument("--fee-rate", type=float, default=None) parser.add_argument("--slippage-bps", type=float, default=4.0) parser.add_argument("--execution-latency-ms", type=float, default=20.0) - parser.add_argument("--db-path", type=str, default=None, help="DuckDB path for fee lookup") args = parser.parse_args() cycles_by_pair, available_pairs = _build_graph() @@ -79,24 +76,23 @@ def main() -> int: config=config, started_at=events[0].occurred_at if events else datetime.now(UTC), ) - report = asyncio.run( - engine.run(events, starting_balances=_parse_balances(args.starting_balances)) - ) + starting_balances = _parse_balances(args.starting_balances) + r = asyncio.run(engine.run(events, starting_balances=starting_balances)) print("Backtest report:") - print(f"- processed_events: {report.processed_events}") - print(f"- opportunities_seen: {report.opportunities_seen}") - print(f"- trades_executed: {report.trades_executed}") - print(f"- win_rate: {report.win_rate if report.win_rate is not None else 'n/a'}") - print(f"- fill_rate: {report.fill_rate if report.fill_rate is not None else 'n/a'}") - print(f"- realized_pnl_usd: {report.realized_pnl_usd:.4f}") - print(f"- max_drawdown_usd: {report.max_drawdown_usd:.4f}") - print(f"- miss_reasons: {dict(report.miss_reasons)}") + print(f"- processed_events: {r.processed_events}") + print(f"- opportunities_seen: {r.opportunities_seen}") + print(f"- trades_executed: {r.trades_executed}") + print(f"- win_rate: {r.win_rate if r.win_rate is not None else 'n/a'}") + print(f"- fill_rate: {r.fill_rate if r.fill_rate is not None else 'n/a'}") + print(f"- realized_pnl_usd: {r.realized_pnl_usd:.4f}") + print(f"- max_drawdown_usd: {r.max_drawdown_usd:.4f}") + print(f"- miss_reasons: {dict(r.miss_reasons)}") print( "- execution_latency_ms: " - f"p50={report.execution_latency_p50_ms or 0.0:.4f}, " - f"p95={report.execution_latency_p95_ms or 0.0:.4f}, " - f"p99={report.execution_latency_p99_ms or 0.0:.4f}" + f"p50={r.execution_latency_p50_ms or 0.0:.4f}, " + f"p95={r.execution_latency_p95_ms or 0.0:.4f}, " + f"p99={r.execution_latency_p99_ms or 0.0:.4f}" ) return 0 diff --git a/scripts/benchmark_metrics_compute.py b/scripts/benchmark_metrics_compute.py index 92ac1e7..27606f3 100644 --- a/scripts/benchmark_metrics_compute.py +++ b/scripts/benchmark_metrics_compute.py @@ -13,14 +13,13 @@ from arbitrade.storage.db import DuckDBStore def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float | None]: with store.connect() as conn: - trade_rows = conn.execute( - """ + trade_rows = conn.execute(""" SELECT started_at, finished_at, realized_pnl FROM trades WHERE finished_at IS NOT NULL - """ - ).fetchall() - opportunity_rows = conn.execute("SELECT detected_at FROM opportunities").fetchall() + """).fetchall() + sql_d = "SELECT detected_at FROM opportunities" + orows = conn.execute(sql_d).fetchall() realized = sum(float(row[2]) for row in trade_rows if row[2] is not None) durations = [ @@ -30,10 +29,10 @@ def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float ] avg_duration = fmean(durations) if durations else None - times = [row[0] for row in opportunity_rows if isinstance(row[0], datetime)] + times = [row[0] for row in orows if isinstance(row[0], datetime)] if len(times) >= 2: - span_seconds = (max(times) - min(times)).total_seconds() - opm = len(times) / (span_seconds / 60.0) if span_seconds > 0.0 else float(len(times)) + ss = (max(times) - min(times)).total_seconds() + opm = len(times) / (ss / 60.0) if ss > 0.0 else float(len(times)) elif len(times) == 1: opm = 60.0 else: diff --git a/src/arbitrade/api/app.py b/src/arbitrade/api/app.py index 32cfe3c..b553fb7 100644 --- a/src/arbitrade/api/app.py +++ b/src/arbitrade/api/app.py @@ -4,22 +4,88 @@ import asyncio from collections.abc import AsyncIterator from contextlib import asynccontextmanager +import structlog from fastapi import FastAPI from arbitrade.alerting.notifier import build_notifier_from_settings from arbitrade.api.control_state import DashboardControlState from arbitrade.api.routes import public_router, router from arbitrade.backtesting.runner import backtest_worker +from arbitrade.config.pairing_sync import run_pairing_sync_loop from arbitrade.config.service import ConfigurationService from arbitrade.config.settings import Settings from arbitrade.exchange.fee_service import run_fee_sync_loop from arbitrade.exchange.kraken_rest import KrakenRestClient +from arbitrade.exchange.kraken_ws import KrakenWsClient from arbitrade.logging_setup import configure_logging +from arbitrade.market_data.feed import MarketDataFeed +from arbitrade.market_data.feed_builder import ( + build_detector_from_enabled_pairings, + get_enabled_pair_symbols, +) from arbitrade.metrics import MetricsCalculator from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter +from arbitrade.storage.opportunities import AsyncOpportunityWriter from arbitrade.storage.repositories import AuditRepository, RuntimeStateRepository +_LOG = structlog.get_logger(__name__) + + +def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task[None] | None: + """Create and start a MarketDataFeed task from enabled pairings. + + If kill_switch_only=True, only create a kill-switch-bound stub (no detector/feed). + Returns the task or None if no enabled pairings. + """ + settings = app.state.settings + db = app.state.store + alert_notifier = getattr(app.state, "alert_notifier", None) + controls = app.state.dashboard_controls + + # Build detector from enabled pairings + detector = build_detector_from_enabled_pairings( + db, + fee_rate=0.0, # will be overridden by fee sync + max_depth_levels=controls.strategy_max_depth_levels, + min_profit_threshold=controls.strategy_profit_threshold, + ) + + symbols = get_enabled_pair_symbols(db) + if not symbols and not kill_switch_only: + _LOG.warning("no_enabled_pair_symbols_feed_not_started") + return None + + ws_client: KrakenWsClient = getattr(app.state, "ws_client", None) + if ws_client is None: + ws_client = KrakenWsClient(settings, alert_notifier=alert_notifier) + app.state.ws_client = ws_client + + ws_client.set_subscribed_symbols(symbols) + + snapshot_writer = AsyncMarketSnapshotWriter(db) + opportunity_writer = AsyncOpportunityWriter(db) + + feed = MarketDataFeed( + ws_client=ws_client, + snapshot_writer=snapshot_writer, + detector=detector, + opportunity_writer=opportunity_writer, + paper_trading_mode=settings.paper_trading_mode, + trade_capital=settings.trade_capital_usd, + max_trade_capital=settings.max_trade_capital_usd, + kill_switch=controls.kill_switch, + alert_notifier=alert_notifier, + audit_repository=getattr(app.state, "audit_repository", None), + ) + + app.state.feed = feed + task = asyncio.create_task(feed.run(), name="market_data_feed") + app.state.feed_task = task + _LOG.info("market_data_feed_started", symbols=symbols) + return task + def create_app(settings: Settings) -> FastAPI: configure_logging(settings.log_level, settings.log_json) @@ -28,6 +94,7 @@ def create_app(settings: Settings) -> FastAPI: db.migrate() kraken_client = KrakenRestClient(settings) fee_sync_stop_event = asyncio.Event() + pairing_sync_stop_event = asyncio.Event() backtest_queue: asyncio.Queue[tuple[str, str, dict[str, object] | None] | None] = ( asyncio.Queue() ) @@ -43,19 +110,49 @@ def create_app(settings: Settings) -> FastAPI: ), name="fee_sync_loop", ) + pairing_sync_task = asyncio.create_task( + run_pairing_sync_loop( + kraken_client, + db, + pairing_sync_stop_event, + ), + name="pairing_sync_loop", + ) backtest_task = asyncio.create_task( backtest_worker(backtest_queue, db), # type: ignore name="backtest_worker", ) + # Start market data feed from enabled pairings + _start_feed(app) app.state.fee_sync_task = fee_sync_task + app.state.pairing_sync_task = pairing_sync_task app.state.backtest_task = backtest_task yield fee_sync_stop_event.set() + pairing_sync_stop_event.set() + # Stop feed + feed = getattr(app.state, "feed", None) + if feed is not None: + ws_client = getattr(app.state, "ws_client", None) + if ws_client is not None: + await ws_client.stop() + ft = getattr(app.state, "feed_task", None) + if ft is not None: + ft.cancel() + try: + await ft + except asyncio.CancelledError: + pass fee_sync_task.cancel() try: await fee_sync_task except asyncio.CancelledError: pass + pairing_sync_task.cancel() + try: + await pairing_sync_task + except asyncio.CancelledError: + pass await backtest_queue.put(None) # poison pill backtest_task.cancel() try: @@ -70,12 +167,14 @@ def create_app(settings: Settings) -> FastAPI: app.state.store = db app.state.kraken_client = kraken_client app.state.fee_sync_stop_event = fee_sync_stop_event + app.state.pairing_sync_stop_event = pairing_sync_stop_event app.state.backtest_queue = backtest_queue app.state.metrics = MetricsCalculator(db) app.state.audit_repository = AuditRepository(db) app.state.runtime_state_repository = RuntimeStateRepository(db) app.state.alert_notifier = build_notifier_from_settings(settings) - app.state.configuration_service = ConfigurationService(settings, db, AuditRepository(db)) + svc = ConfigurationService(settings, db, app.state.audit_repository) + app.state.configuration_service = svc app.state.backtest_recent_reports = [] app.state.dashboard_controls = DashboardControlState( is_running=not settings.kill_switch_active, diff --git a/src/arbitrade/api/routes.py b/src/arbitrade/api/routes.py index 664064d..8fe50b1 100644 --- a/src/arbitrade/api/routes.py +++ b/src/arbitrade/api/routes.py @@ -18,11 +18,14 @@ from fastapi.templating import Jinja2Templates from arbitrade.alerting.notifier import SupportsAlerts, SupportsAlertStatus from arbitrade.api.auth import require_dashboard_auth from arbitrade.api.control_state import DashboardControlState +from arbitrade.config.pairing_sync import sync_pairings_from_kraken +from arbitrade.config.service import ConfigPairing from arbitrade.detection.graph import CurrencyGraph, TriangularCycle from arbitrade.storage.repositories import ( AuditRecord, AuditRepository, BacktestJobRepository, + ConfigPairingRepository, KrakenAccountSnapshotRepository, ) @@ -104,37 +107,29 @@ def _dashboard_overview(request: Request) -> dict[str, object]: else: open_trade_filter = "LOWER(status) NOT IN ('filled', 'closed', 'cancelled', 'canceled')" - portfolio_row = conn.execute( - """ + portfolio_row = conn.execute(""" SELECT balances, total_value_usd FROM portfolio_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """ - ).fetchone() - open_trades = conn.execute( - f""" + """).fetchone() + open_trades = conn.execute(f""" SELECT {trade_ref_expr}, status, started_at, {cycle_expr} FROM trades WHERE {open_trade_filter} ORDER BY started_at DESC LIMIT 5 - """ - ).fetchall() - rpnl = conn.execute( - """ + """).fetchall() + rpnl = conn.execute(""" SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) FROM trades - """ - ).fetchone() - latest_opportunities = conn.execute( - """ + """).fetchone() + latest_opportunities = conn.execute(""" SELECT cycle, net_pct, est_profit, detected_at FROM opportunities ORDER BY detected_at DESC LIMIT 5 - """ - ).fetchall() + """).fetchall() balances_value = "—" total_value = "—" @@ -164,14 +159,12 @@ def _dashboard_overview(request: Request) -> dict[str, object]: # Query equity from kraken_account_snapshots try: - equity_row = conn.execute( - """ + equity_row = conn.execute(""" SELECT trade_balance_raw FROM kraken_account_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """ - ).fetchone() + """).fetchone() if equity_row is not None and equity_row[0] is not None: tb_raw = equity_row[0] if isinstance(tb_raw, str): @@ -207,14 +200,12 @@ def _dashboard_overview(request: Request) -> dict[str, object]: taker_fee = "—" thirty_day_volume = "—" try: - acct_row = conn.execute( - """ + acct_row = conn.execute(""" SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume FROM kraken_account_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """ - ).fetchone() + """).fetchone() if acct_row is not None: fee_tier = str(acct_row[0]) if acct_row[0] is not None else "—" maker_fee = f"{float(acct_row[1]):.4%}" if acct_row[1] is not None else "—" @@ -244,14 +235,12 @@ def _dashboard_overview(request: Request) -> dict[str, object]: def _dashboard_charts(request: Request) -> dict[str, object]: store = request.app.state.store with store.connect() as conn: - opportunity_rows = conn.execute( - """ + opportunity_rows = conn.execute(""" SELECT detected_at, cycle, net_pct, est_profit FROM opportunities ORDER BY detected_at DESC LIMIT 10 - """ - ).fetchall() + """).fetchall() cr = list(reversed(opportunity_rows)) labels = [] @@ -588,7 +577,16 @@ def _dashboard_controls(request: Request) -> dict[str, object]: def _parse_form_body(body: bytes) -> dict[str, str]: parsed = parse_qs(body.decode("utf-8"), keep_blank_values=True) - return {key: values[-1] for key, values in parsed.items() if values} + result: dict[str, str] = {} + for key, values in parsed.items(): + if not values: + continue + if len(values) == 1: + result[key] = values[0] + else: + # Multi-value fields (e.g. checkboxes) -> join with comma + result[key] = ",".join(values) + return result def _form_bool(value: str | None) -> bool: @@ -823,6 +821,20 @@ async def dashboard_backtesting_fragment(request: Request) -> HTMLResponse: ) +@router.get("/dashboard/fragment/backtesting-pairings", response_class=HTMLResponse) +async def dashboard_backtesting_pairings_fragment(request: Request) -> HTMLResponse: + """HTMX fragment: pairing checkboxes for backtest form.""" + store = request.app.state.store + repo = ConfigPairingRepository(store) + pairings = repo.list_pairings() + pairings.sort(key=lambda p: (p.base_asset, p.quote_asset)) + return templates.TemplateResponse( + request=request, + name="partials/backtesting_pairings.html", + context={"request": request, "pairings": pairings}, + ) + + @router.get("/dashboard/fragment/metrics", response_class=HTMLResponse) async def dashboard_metrics(request: Request) -> HTMLResponse: return templates.TemplateResponse( @@ -1282,3 +1294,166 @@ async def dashboard_overview_stream(request: Request) -> StreamingResponse: @public_router.get("/health", response_class=JSONResponse) async def health() -> JSONResponse: return JSONResponse({"status": "ok", "service": "arbitrade"}) + + +# ── Pairing API ───────────────────────────────────────────────────────────── + + +def _pairing_repo(request: Request) -> ConfigPairingRepository: + return ConfigPairingRepository(request.app.state.store) + + +@router.get("/dashboard/api/pairings", response_class=JSONResponse) +async def dashboard_api_pairings( + request: Request, + search: str | None = None, + enabled: str | None = None, + source: str | None = None, + base: str | None = None, + quote: str | None = None, + sort: str = "base_asset", + order: str = "asc", +) -> JSONResponse: + """List pairings with optional filters.""" + repo = _pairing_repo(request) + pairings = repo.list_pairings() + + # Apply filters + if search: + search_lower = search.lower() + pairings = [ + p + for p in pairings + if search_lower in p.base_asset.lower() or search_lower in p.quote_asset.lower() + ] + if enabled is not None: + enabled_bool = enabled.lower() == "true" + pairings = [p for p in pairings if p.enabled == enabled_bool] + if source: + pairings = [p for p in pairings if p.source.lower() == source.lower()] + if base: + pairings = [p for p in pairings if p.base_asset.lower() == base.lower()] + if quote: + pairings = [p for p in pairings if p.quote_asset.lower() == quote.lower()] + + # Sort + reverse = order.lower() == "desc" + if sort == "base_asset": + pairings.sort(key=lambda p: p.base_asset, reverse=reverse) + elif sort == "quote_asset": + pairings.sort(key=lambda p: p.quote_asset, reverse=reverse) + elif sort == "enabled": + pairings.sort(key=lambda p: p.enabled, reverse=reverse) + + return JSONResponse( + [ + { + "id": p.id, + "base_asset": p.base_asset, + "quote_asset": p.quote_asset, + "pair": f"{p.base_asset}/{p.quote_asset}", + "enabled": p.enabled, + "source": p.source, + "created_at": p.created_at.isoformat() if p.created_at else None, + "updated_at": p.updated_at.isoformat() if p.updated_at else None, + } + for p in pairings + ] + ) + + +@router.get("/dashboard/fragment/pairings", response_class=HTMLResponse) +async def dashboard_pairings_fragment( + request: Request, + search: str | None = None, + enabled: str | None = None, +) -> HTMLResponse: + """HTMX fragment: pairing table for config page.""" + repo = _pairing_repo(request) + pairings = repo.list_pairings() + + # Apply search filter + if search: + sl = search.lower() + pairings = [ + p for p in pairings if sl in p.base_asset.lower() or sl in p.quote_asset.lower() + ] + if enabled is not None and enabled.lower() != "all": + eb = enabled.lower() == "true" + pairings = [p for p in pairings if p.enabled == eb] + + pairings.sort(key=lambda p: (p.base_asset, p.quote_asset)) + return templates.TemplateResponse( + request=request, + name="partials/pairings_table.html", + context={"request": request, "pairings": pairings}, + ) + + +@router.post("/dashboard/api/pairings/toggle") +async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse: + """Toggle enabled/disabled for a pairing. Expects JSON or form body with base_asset and quote_asset.""" + ctype = request.headers.get("content-type", "") + if "application/json" in ctype: + body = await request.json() + else: + form = _parse_form_body(await request.body()) + body = form + + base_asset = str(body.get("base_asset", "")).upper() + quote_asset = str(body.get("quote_asset", "")).upper() + if not base_asset or not quote_asset: + return HTMLResponse("Missing base_asset or quote_asset", status_code=400) + + repo = _pairing_repo(request) + existing = repo.get_pairing(base_asset, quote_asset) + if existing is None: + return HTMLResponse("Pairing not found", status_code=404) + + toggled = ConfigPairing( + base_asset=existing.base_asset, + quote_asset=existing.quote_asset, + enabled=not existing.enabled, + source=existing.source, + ) + repo.update_pairing(base_asset, quote_asset, toggled) + + _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.pairings.toggle", + decision="approved", + payload={ + "base_asset": base_asset, + "quote_asset": quote_asset, + "enabled": toggled.enabled, + }, + ) + + # Return refreshed fragment + pairings_repo = _pairing_repo(request) + pairings = pairings_repo.list_pairings() + pairings.sort(key=lambda p: (p.base_asset, p.quote_asset)) + return templates.TemplateResponse( + request=request, + name="partials/pairings_table.html", + context={"request": request, "pairings": pairings}, + ) + + +@router.post("/dashboard/api/pairings/sync") +async def dashboard_api_pairings_sync(request: Request) -> JSONResponse: + """Trigger a re-sync of pairings from Kraken.""" + kraken_client = request.app.state.kraken_client + store = request.app.state.store + summary = await sync_pairings_from_kraken(kraken_client, store) + + _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.pairings.sync", + decision="approved", + payload=summary, # type: ignore + ) + + return JSONResponse(summary) diff --git a/src/arbitrade/config/pairing_sync.py b/src/arbitrade/config/pairing_sync.py new file mode 100644 index 0000000..3149796 --- /dev/null +++ b/src/arbitrade/config/pairing_sync.py @@ -0,0 +1,79 @@ +"""Sync available Kraken asset pairs into the config_pairings table.""" + +from __future__ import annotations + +import asyncio + +import structlog + +from arbitrade.config.service import ConfigPairing +from arbitrade.detection.graph import CurrencyGraph +from arbitrade.exchange.kraken_rest import KrakenRestClient +from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.repositories import ConfigPairingRepository + +_LOG = structlog.get_logger(__name__) + + +async def sync_pairings_from_kraken( + kraken_client: KrakenRestClient, + store: DuckDBStore, +) -> dict[str, int]: + """Fetch all asset pairs from Kraken and upsert into config_pairings. + + Returns a summary dict with 'added', 'updated', 'total' counts. + """ + asset_pairs = await kraken_client.asset_pairs() + graph = CurrencyGraph.from_kraken_asset_pairs(asset_pairs) + repo = ConfigPairingRepository(store) + + added = 0 + updated = 0 + total = 0 + + # Dedupe: pair_by_direction has entries for both (base,quote) and (quote,base). + seen_symbols: set[str] = set() + for (base, quote), symbol in graph.pair_by_direction.items(): + if symbol in seen_symbols: + continue + seen_symbols.add(symbol) + existing = repo.get_pairing(base, quote) + pairing = ConfigPairing( + base_asset=base, + quote_asset=quote, + enabled=existing.enabled if existing else False, + source="kraken", + ) + try: + repo.upsert_pairing(pairing) + total += 1 + if existing: + updated += 1 + else: + added += 1 + except Exception: + _LOG.warning("sync_pairing_failed", base=base, quote=quote) + + _LOG.info( + "pairing_sync_complete", + added=added, + updated=updated, + total=total, + ) + return {"added": added, "updated": updated, "total": total} + + +async def run_pairing_sync_loop( + kraken_client: KrakenRestClient, + store: DuckDBStore, + stop_event: asyncio.Event, + interval_seconds: int = 86400, +) -> None: + """Periodically sync pairings from Kraken (default daily).""" + await sync_pairings_from_kraken(kraken_client, store) + try: + while not stop_event.is_set(): + await asyncio.wait_for(stop_event.wait(), timeout=interval_seconds) + await sync_pairings_from_kraken(kraken_client, store) + except (TimeoutError, asyncio.CancelledError): + pass diff --git a/src/arbitrade/exchange/kraken_ws.py b/src/arbitrade/exchange/kraken_ws.py index e962228..a250106 100644 --- a/src/arbitrade/exchange/kraken_ws.py +++ b/src/arbitrade/exchange/kraken_ws.py @@ -32,6 +32,7 @@ class KrakenWsClient: self._alert_notifier = alert_notifier self._has_connected_once = False self._was_disconnected = False + self._subscribed_symbols: list[str] = [] @property def is_stale(self) -> bool: @@ -44,6 +45,35 @@ class KrakenWsClient: async def stop(self) -> None: self._stop.set() + def set_subscribed_symbols(self, symbols: list[str]) -> None: + """Set the list of symbols to subscribe to on (re)connect.""" + self._subscribed_symbols = list(symbols) + + async def _subscribe(self, ws: Any) -> None: + """Send Kraken WS v2 subscribe message for book channel.""" + if not self._subscribed_symbols: + _LOG.warning("kraken_ws_no_symbols_to_subscribe") + return + depth = 10 + if hasattr(self._settings, "kraken_ws_book_depth"): + depth = self._settings.kraken_ws_book_depth + msg = orjson.dumps( + { + "method": "subscribe", + "params": { + "channel": "book", + "symbol": self._subscribed_symbols, + "depth": depth, + }, + } + ) + await ws.send(msg) + _LOG.info( + "kraken_ws_subscribed", + symbol_count=len(self._subscribed_symbols), + symbols=self._subscribed_symbols, + ) + async def connect_stream(self) -> AsyncIterator[WsMessage]: delay = 1.0 while not self._stop.is_set(): @@ -51,7 +81,8 @@ class KrakenWsClient: async with websockets.connect( self._settings.kraken_ws_url, max_size=2_000_000 ) as ws: - _LOG.info("kraken_ws_connected", url=self._settings.kraken_ws_url) + _LOG.info("kraken_ws_connected", + url=self._settings.kraken_ws_url) if self._has_connected_once and self._was_disconnected: await self._notify( category="system", @@ -63,10 +94,12 @@ class KrakenWsClient: self._has_connected_once = True self._was_disconnected = False delay = 1.0 + await self._subscribe(ws) async for raw in self._recv_loop(ws): yield raw except Exception as exc: - _LOG.warning("kraken_ws_disconnected", error=str(exc), reconnect_in=delay) + _LOG.warning("kraken_ws_disconnected", + error=str(exc), reconnect_in=delay) self._was_disconnected = True await self._notify( category="system", diff --git a/src/arbitrade/market_data/feed_builder.py b/src/arbitrade/market_data/feed_builder.py new file mode 100644 index 0000000..be09ee9 --- /dev/null +++ b/src/arbitrade/market_data/feed_builder.py @@ -0,0 +1,62 @@ +"""Build production MarketDataFeed components from enabled pairings.""" + +from __future__ import annotations + +import structlog + +from arbitrade.detection.engine import IncrementalCycleDetector +from arbitrade.detection.graph import CurrencyGraph +from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.repositories import ConfigPairingRepository + +_LOG = structlog.get_logger(__name__) + + +def build_detector_from_enabled_pairings( + store: DuckDBStore, + *, + fee_rate: float = 0.0, + max_depth_levels: int = 10, + min_profit_threshold: float = 0.0005, +) -> IncrementalCycleDetector | None: + """Build an IncrementalCycleDetector using only enabled pairings from DB. + + Returns None if no enabled pairings exist. + """ + repo = ConfigPairingRepository(store) + pairings = repo.list_pairings(enabled_only=True) + if not pairings: + _LOG.warning("no_enabled_pairings_found_detector_not_created") + return None + + # Build CurrencyGraph from enabled pairings and discover cycles + graph = CurrencyGraph() + for p in pairings: + symbol = f"{p.base_asset}/{p.quote_asset}" + graph.add_pair(p.base_asset, p.quote_asset, symbol) + + cycles = graph.triangular_cycles() + if not cycles: + _LOG.warning("no_triangular_cycles_from_enabled_pairings") + return None + + cycles_by_pair = graph.index_cycles_by_pair(cycles) + _LOG.info( + "detector_built_from_enabled_pairings", + enabled_count=len(pairings), + cycle_count=len(cycles), + ) + + return IncrementalCycleDetector( + cycles_by_pair, + fee_rate=fee_rate, + max_depth_levels=max_depth_levels, + min_profit_threshold=min_profit_threshold, + ) + + +def get_enabled_pair_symbols(store: DuckDBStore) -> list[str]: + """Return list of enabled pair symbols (e.g. ['BTC/USD', 'ETH/BTC']).""" + repo = ConfigPairingRepository(store) + pairings = repo.list_pairings(enabled_only=True) + return [f"{p.base_asset}/{p.quote_asset}" for p in pairings if p.enabled] diff --git a/src/arbitrade/metrics.py b/src/arbitrade/metrics.py index 995ef56..aadaf32 100644 --- a/src/arbitrade/metrics.py +++ b/src/arbitrade/metrics.py @@ -24,8 +24,7 @@ class MetricsCalculator: def compute(self) -> PerformanceMetrics: with self._store.connect() as conn: - tm = conn.execute( - """ + tm = conn.execute(""" SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) AS realized_pnl_usd, COUNT(*) AS total_trades, @@ -45,26 +44,21 @@ class MetricsCalculator: ) AS latency_p99_seconds FROM trades WHERE finished_at IS NOT NULL - """ - ).fetchone() + """).fetchone() - om = conn.execute( - """ + om = conn.execute(""" SELECT COUNT(*) AS opportunity_count, MIN(detected_at) AS first_detected_at, MAX(detected_at) AS last_detected_at FROM opportunities - """ - ).fetchone() + """).fetchone() - fm = conn.execute( - """ + fm = conn.execute(""" SELECT AVG(filled_volume / volume) AS fill_rate FROM orders WHERE volume > 0 AND filled_volume IS NOT NULL - """ - ).fetchone() + """).fetchone() r_pnl_usd = float(tm[0]) if tm and tm[0] is not None else 0.0 tt = int(tm[1]) if tm and tm[1] is not None else 0 diff --git a/src/arbitrade/runtime/lifecycle.py b/src/arbitrade/runtime/lifecycle.py index 021c277..c00a0e0 100644 --- a/src/arbitrade/runtime/lifecycle.py +++ b/src/arbitrade/runtime/lifecycle.py @@ -45,26 +45,22 @@ def _runtime_repository(app: FastAPI) -> RuntimeStateRepository | None: def _open_trade_count(store: DuckDBStore) -> int: with store.connect() as conn: - row = conn.execute( - """ + row = conn.execute(""" SELECT COUNT(*) FROM trades WHERE finished_at IS NULL - """ - ).fetchone() + """).fetchone() return int(row[0]) if row is not None else 0 def _latest_balances(store: DuckDBStore) -> dict[str, Any] | None: with store.connect() as conn: - row = conn.execute( - """ + row = conn.execute(""" SELECT balances FROM portfolio_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """ - ).fetchone() + """).fetchone() if row is None or row[0] is None: return None diff --git a/src/arbitrade/storage/db.py b/src/arbitrade/storage/db.py index 0dca02e..6b79e63 100644 --- a/src/arbitrade/storage/db.py +++ b/src/arbitrade/storage/db.py @@ -219,87 +219,12 @@ class DuckDBStore: # Ensure schema_migrations table exists and get current version if not self._table_exists(conn, "schema_migrations"): - conn.execute( - """ + conn.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( version INTEGER PRIMARY KEY, applied_at TIMESTAMP DEFAULT current_timestamp ) - """ - ) - - # Get current schema version - try: - row = conn.execute( - "SELECT version FROM schema_migrations ORDER BY version DESC LIMIT 1" - ).fetchone() - current_version = row[0] if row else 0 - except Exception: - current_version = 0 - - # Apply migrations for each version - if current_version < 1: - # Migration v1: Add missing columns to trades table - # Note: DuckDB does not support ADD COLUMN with constraints - conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS trade_ref VARCHAR") - conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS estimated_pnl DOUBLE") - conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS capital_used DOUBLE") - conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS cycle VARCHAR") - conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS leg_count INTEGER") - conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (1)") - _LOG.info("migration_applied", version=1) - - if current_version < 2: - # Migration v2: Ensure config_backtesting_defaults table - # config_backtesting_defaults already created by SCHEMA_SQL - conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (2)") - _LOG.info("migration_applied", version=2) - - if current_version < 3: - # Migration v3: Add kraken_account_snapshots table - conn.execute( - """ - CREATE TABLE IF NOT EXISTS kraken_account_snapshots ( - snapshot_at TIMESTAMP NOT NULL, - fee_tier VARCHAR, - maker_fee DOUBLE, - taker_fee DOUBLE, - thirty_day_volume DOUBLE, - trade_balance_raw JSON, - fee_schedule_raw JSON - ) - """ - ) - conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (3)") - _LOG.info("migration_applied", version=3) - - if current_version < 4: - # Migration v4: Add fee_source to backtesting defaults - conn.execute( - "ALTER TABLE config_backtesting_defaults" - " ADD COLUMN IF NOT EXISTS fee_source VARCHAR DEFAULT 'api'" - ) - conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (4)") - _LOG.info("migration_applied", version=4) - - if current_version < 5: - conn.execute( - """ - CREATE TABLE IF NOT EXISTS backtest_jobs ( - id UUID DEFAULT uuid(), - status VARCHAR NOT NULL DEFAULT 'pending', - events_path VARCHAR NOT NULL, - config JSON, - report JSON, - error VARCHAR, - created_at TIMESTAMP DEFAULT current_timestamp, - started_at TIMESTAMP, - finished_at TIMESTAMP - ) - """ - ) - conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (5)") - _LOG.info("migration_applied", version=5) + """) # Update version to current conn.execute( diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index 418f83d..b35c5b1 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -349,8 +349,7 @@ class RuntimeStateRepository: def latest(self) -> RuntimeStateRecord | None: with self._store.connect() as conn: - row = conn.execute( - """ + row = conn.execute(""" SELECT snapshot_at, is_running, @@ -362,8 +361,7 @@ class RuntimeStateRepository: FROM runtime_state_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """ - ).fetchone() + """).fetchone() if row is None: return None @@ -426,15 +424,14 @@ class ConfigSectionRepository: def list_sections(self) -> list[ConfigSection]: """List all configuration sections.""" with self._store.connect() as conn: - cursor = conn.execute( - """ + cursor = conn.execute(""" SELECT id, name, description, updated_at FROM config_sections ORDER BY name - """ - ) + """) return [ - ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3]) + ConfigSection(id=row[0], name=row[1], + description=row[2], updated_at=row[3]) for row in cursor.fetchall() ] @@ -561,13 +558,11 @@ class ConfigSettingRepository: (section,), ) else: - cursor = conn.execute( - """ + cursor = conn.execute(""" SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by FROM config_settings ORDER BY key - """ - ) + """) return [ ConfigSetting( key=row[0], @@ -585,12 +580,10 @@ class ConfigSettingRepository: def get_latest_updated_at(self) -> datetime | None: """Get the latest updated_at timestamp from config_settings table.""" with self._store.connect() as conn: - cursor = conn.execute( - """ + cursor = conn.execute(""" SELECT MAX(updated_at) as latest_updated_at FROM config_settings - """ - ) + """) row = cursor.fetchone() if row and row[0]: # Convert string timestamp to datetime @@ -699,16 +692,51 @@ class ConfigPairingRepository: ) return cursor.rowcount > 0 - def list_pairings(self) -> list[ConfigPairing]: - """List all currency pairings.""" + def upsert_pairing(self, pairing: ConfigPairing) -> ConfigPairing: + """Insert or update a currency pairing (upsert on base_asset, quote_asset).""" with self._store.connect() as conn: cursor = conn.execute( """ + INSERT INTO config_pairings (base_asset, quote_asset, enabled, source) + VALUES (?, ?, ?, ?) + ON CONFLICT(base_asset, quote_asset) DO UPDATE SET + enabled = EXCLUDED.enabled, + source = EXCLUDED.source, + updated_at = current_timestamp + RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at + """, + ( + pairing.base_asset, + pairing.quote_asset, + pairing.enabled, + pairing.source, + ), + ) + row = cursor.fetchone() + if row: + return ConfigPairing( + id=row[0], + base_asset=row[1], + quote_asset=row[2], + enabled=bool(row[3]), + source=row[4], + created_at=row[5], + updated_at=row[6], + ) + raise ValueError("Failed to upsert pairing") + + def list_pairings(self, enabled_only: bool = False) -> list[ConfigPairing]: + """List all currency pairings. If enabled_only=True, only enabled pairings.""" + with self._store.connect() as conn: + query = """ SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at FROM config_pairings - ORDER BY base_asset, quote_asset - """ - ) + """ + params: list[object] = [] + if enabled_only: + query += " WHERE enabled = TRUE" + query += " ORDER BY base_asset, quote_asset" + cursor = conn.execute(query, params) return [ ConfigPairing( id=row[0], @@ -738,7 +766,8 @@ class ConfigBacktestingDefaultsRepository: """, ( ( - orjson.dumps(defaults.starting_balances).decode("utf-8") + orjson.dumps( + defaults.starting_balances).decode("utf-8") if defaults.starting_balances else None ), @@ -762,14 +791,12 @@ class ConfigBacktestingDefaultsRepository: def get_defaults(self) -> ConfigBacktestingDefaults | None: """Get the current backtesting defaults.""" with self._store.connect() as conn: - cursor = conn.execute( - """ + cursor = conn.execute(""" SELECT id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms FROM config_backtesting_defaults ORDER BY id DESC LIMIT 1 - """ - ) + """) row = cursor.fetchone() if row: return ConfigBacktestingDefaults( @@ -795,7 +822,8 @@ class ConfigBacktestingDefaultsRepository: """, ( ( - orjson.dumps(defaults.starting_balances).decode("utf-8") + orjson.dumps( + defaults.starting_balances).decode("utf-8") if defaults.starting_balances else None ), @@ -848,7 +876,8 @@ class KrakenAccountSnapshotRepository: snapshot.taker_fee, snapshot.thirty_day_volume, ( - orjson.dumps(snapshot.trade_balance_raw).decode("utf-8") + orjson.dumps( + snapshot.trade_balance_raw).decode("utf-8") if snapshot.trade_balance_raw else None ), @@ -862,15 +891,13 @@ class KrakenAccountSnapshotRepository: def latest_snapshot(self) -> KrakenAccountSnapshot | None: with self._store.connect() as conn: - row = conn.execute( - """ + row = conn.execute(""" SELECT snapshot_at, fee_tier, maker_fee, taker_fee, thirty_day_volume, trade_balance_raw, fee_schedule_raw FROM kraken_account_snapshots ORDER BY snapshot_at DESC LIMIT 1 - """ - ).fetchone() + """).fetchone() if row is None: return None return KrakenAccountSnapshot( @@ -911,7 +938,8 @@ class BacktestJobRepository: VALUES (?, ?) RETURNING id, status, events_path, config, created_at """, - (events_path, orjson.dumps(config).decode("utf-8") if config else None), + (events_path, orjson.dumps(config).decode( + "utf-8") if config else None), ).fetchone() if row is None: raise ValueError("Failed to create backtest job") diff --git a/src/arbitrade/web/templates/partials/backtesting_pairings.html b/src/arbitrade/web/templates/partials/backtesting_pairings.html new file mode 100644 index 0000000..0512b20 --- /dev/null +++ b/src/arbitrade/web/templates/partials/backtesting_pairings.html @@ -0,0 +1,31 @@ +{% for p in pairings %} + +{% endfor %} {% if not pairings %} +No pairings available. Sync from Kraken in config page. +{% endif %} diff --git a/src/arbitrade/web/templates/partials/backtesting_panel.html b/src/arbitrade/web/templates/partials/backtesting_panel.html index 3070391..0fa79aa 100644 --- a/src/arbitrade/web/templates/partials/backtesting_panel.html +++ b/src/arbitrade/web/templates/partials/backtesting_panel.html @@ -53,13 +53,15 @@ > + +
| Base | +Quote | +Source | +Enabled | +
|---|---|---|---|
| {{ p.base_asset }} | +{{ p.quote_asset }} | +{{ p.source }} | ++ + | +
| + No pairings found. Click "Sync from Kraken" to fetch available pairs. + | +|||