diff --git a/src/arbitrade/api/app.py b/src/arbitrade/api/app.py index b92cc30..b12dd32 100644 --- a/src/arbitrade/api/app.py +++ b/src/arbitrade/api/app.py @@ -10,6 +10,7 @@ import asyncio from arbitrade.alerting.notifier import build_notifier_from_settings from arbitrade.api.control_state import DashboardControlState from arbitrade.api.routes import public_router, router +from arbitrade.backtesting.runner import backtest_worker from arbitrade.config.settings import Settings from arbitrade.config.service import ConfigurationService from arbitrade.exchange.fee_service import run_fee_sync_loop @@ -28,6 +29,8 @@ def create_app(settings: Settings) -> FastAPI: db.migrate() kraken_client = KrakenRestClient(settings) fee_sync_stop_event = asyncio.Event() + backtest_queue: asyncio.Queue[tuple[str, str, + dict[str, object] | None] | None] = asyncio.Queue() @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: @@ -41,7 +44,12 @@ def create_app(settings: Settings) -> FastAPI: ), name="fee_sync_loop", ) + backtest_task = asyncio.create_task( + backtest_worker(backtest_queue, db), + name="backtest_worker", + ) app.state.fee_sync_task = fee_sync_task + app.state.backtest_task = backtest_task yield fee_sync_stop_event.set() fee_sync_task.cancel() @@ -49,6 +57,12 @@ def create_app(settings: Settings) -> FastAPI: await fee_sync_task except asyncio.CancelledError: pass + await backtest_queue.put(None) # poison pill + backtest_task.cancel() + try: + await backtest_task + except asyncio.CancelledError: + pass await kraken_client.close() await graceful_shutdown(app) @@ -75,11 +89,13 @@ def create_app(settings: Settings) -> FastAPI: app.state.store = db app.state.kraken_client = kraken_client app.state.fee_sync_stop_event = fee_sync_stop_event + app.state.backtest_queue = backtest_queue app.state.metrics = MetricsCalculator(db) app.state.audit_repository = AuditRepository(db) app.state.runtime_state_repository = RuntimeStateRepository(db) app.state.alert_notifier = build_notifier_from_settings(settings) - app.state.configuration_service = ConfigurationService(settings, db, AuditRepository(db)) + app.state.configuration_service = ConfigurationService( + settings, db, AuditRepository(db)) app.state.backtest_recent_reports = [] app.state.dashboard_controls = DashboardControlState( is_running=not settings.kill_switch_active, diff --git a/src/arbitrade/api/routes.py b/src/arbitrade/api/routes.py index dcc0eb3..910b9da 100644 --- a/src/arbitrade/api/routes.py +++ b/src/arbitrade/api/routes.py @@ -19,7 +19,7 @@ from arbitrade.api.auth import require_dashboard_auth from arbitrade.api.control_state import DashboardControlState from arbitrade.backtesting.replay import BacktestConfig, BacktestReplayEngine, load_replay_events from arbitrade.detection.graph import CurrencyGraph, TriangularCycle -from arbitrade.storage.repositories import AuditRecord, AuditRepository, KrakenAccountSnapshotRepository +from arbitrade.storage.repositories import AuditRecord, AuditRepository, BacktestJobRepository, KrakenAccountSnapshotRepository router = APIRouter(dependencies=[Depends(require_dashboard_auth)]) public_router = APIRouter() @@ -660,10 +660,22 @@ def _build_cycles_from_events( def _recent_backtest_reports(request: Request) -> list[dict[str, object]]: - reports = getattr(request.app.state, "backtest_recent_reports", []) - if isinstance(reports, list): - return cast(list[dict[str, object]], reports) - return [] + """Fetch recent backtest jobs from DB.""" + store = request.app.state.store + repo = BacktestJobRepository(store) + jobs = repo.list_jobs(limit=20) + return [ + { + "job_id": j.id or "", + "run_at": j.created_at.isoformat() if j.created_at else "—", + "events_path": j.events_path, + "status": j.status, + "config": j.config or {}, + "report": j.report or {}, + "error": j.error, + } + for j in jobs + ] def _backtesting_panel_context( @@ -945,6 +957,7 @@ async def dashboard_backtesting_reports(request: Request) -> JSONResponse: @router.post("/dashboard/backtesting/run", response_class=HTMLResponse) async def dashboard_backtesting_run(request: Request) -> HTMLResponse: + """Submit a backtest job to the async queue. Returns panel with job list.""" form = _parse_form_body(await request.body()) defaults = { "events_path": form.get("events_path", ""), @@ -963,96 +976,44 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse: raise ValueError( "events_path must reference an existing JSONL file") - events = load_replay_events(events_path) - if not events: - raise ValueError("events file contains no replay events") - custom_fee_rate = ( - float(defaults["custom_fee_rate"] - ) if defaults["custom_fee_rate"].strip() else None + float(defaults["custom_fee_rate"]) + if defaults["custom_fee_rate"].strip() else None ) fee_rate = _fee_rate_for_profile( defaults["fee_profile"], custom_fee_rate, request=request) - starting_balances = _parse_balances(defaults["starting_balances"]) - trade_capital = float(defaults["trade_capital"]) - min_profit_threshold = float(defaults["min_profit_threshold"]) - slippage_bps = float(defaults["slippage_bps"]) - execution_latency_ms = float(defaults["execution_latency_ms"]) - - cycles_by_pair, available_pairs = _build_cycles_from_events( - {event.symbol.upper() for event in events} - ) - if not cycles_by_pair: - raise ValueError( - "unable to derive triangular cycles from provided events") - - config = BacktestConfig( - fee_rate=fee_rate, - min_profit_threshold=min_profit_threshold, - trade_capital=trade_capital, - slippage_bps=slippage_bps, - execution_latency_ms=execution_latency_ms, - ) - - async with _BACKTEST_RUN_LOCK: - engine = BacktestReplayEngine( - cycles_by_pair=cycles_by_pair, - available_pairs=available_pairs, - config=config, - started_at=events[0].occurred_at, - ) - report = await engine.run(events, starting_balances=starting_balances) - - report_item: dict[str, object] = { - "run_at": datetime.now(UTC).isoformat(), + config_dict = { "events_path": _display_path(events_path), - "status": "completed", - "config": { - "trade_capital": trade_capital, - "min_profit_threshold": min_profit_threshold, - "fee_profile": defaults["fee_profile"], - "fee_rate": fee_rate, - "slippage_bps": slippage_bps, - "execution_latency_ms": execution_latency_ms, - }, - "report": { - "processed_events": report.processed_events, - "opportunities_seen": report.opportunities_seen, - "trades_executed": report.trades_executed, - "win_rate": report.win_rate, - "fill_rate": report.fill_rate, - "realized_pnl_usd": report.realized_pnl_usd, - "max_drawdown_usd": report.max_drawdown_usd, - "miss_reasons": dict(report.miss_reasons), - "execution_latency_p50_ms": report.execution_latency_p50_ms, - "execution_latency_p95_ms": report.execution_latency_p95_ms, - "execution_latency_p99_ms": report.execution_latency_p99_ms, - }, + "starting_balances": defaults["starting_balances"], + "trade_capital": float(defaults["trade_capital"]), + "min_profit_threshold": float(defaults["min_profit_threshold"]), + "fee_rate": fee_rate, + "fee_profile": defaults["fee_profile"], + "slippage_bps": float(defaults["slippage_bps"]), + "execution_latency_ms": float(defaults["execution_latency_ms"]), } - reports = _recent_backtest_reports(request) - reports.insert(0, report_item) - del reports[20:] + store = request.app.state.store + repo = BacktestJobRepository(store) + job = repo.create_job(str(events_path), config_dict) + + queue = request.app.state.backtest_queue + await queue.put((job.id or "", str(events_path), config_dict)) _record_audit( request, actor="dashboard_user", - event_type="dashboard.backtesting.run", - decision="completed", - payload={ - "events_path": report_item["events_path"], - "processed_events": report.processed_events, - "trades_executed": report.trades_executed, - "realized_pnl_usd": report.realized_pnl_usd, - }, + event_type="dashboard.backtesting.submit", + decision="queued", + payload={"job_id": job.id, + "events_path": _display_path(events_path)}, ) context = _backtesting_panel_context( request, - status="completed", - message="Backtest run completed successfully.", - latest_report=report_item, + status="submitted", + message=f"Job {job.id[:8]}... queued. Refresh to see results.", defaults=defaults, ) except ValueError as exc: @@ -1070,6 +1031,49 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse: ) +@router.post("/dashboard/backtesting/job/{job_id}/delete", response_class=HTMLResponse) +async def dashboard_backtesting_delete(request: Request, job_id: str) -> HTMLResponse: + store = request.app.state.store + repo = BacktestJobRepository(store) + repo.delete_job(job_id) + return templates.TemplateResponse( + request=request, + name="partials/backtesting_panel.html", + context={"request": request, **_backtesting_panel_context(request)}, + ) + + +@router.get("/dashboard/backtesting/job/{job_id}", response_class=HTMLResponse) +async def dashboard_backtesting_job_detail(request: Request, job_id: str) -> HTMLResponse: + store = request.app.state.store + repo = BacktestJobRepository(store) + job = repo.get_job(job_id) + if job is None: + return HTMLResponse("

Job not found

", status_code=404) + + report_html = "
No report yet
" + if job.report: + r = job.report + report_html = ( + f"
" + f"
Job {job.id[:8]}... Report
" + f"
Status: {job.status}
" + f"
Events: {job.events_path}
" + f"
Processed: {r.get('processed_events', '—')}
" + f"
Opportunities: {r.get('opportunities_seen', '—')}
" + f"
Trades: {r.get('trades_executed', '—')}
" + f"
Realized P&L: {r.get('realized_pnl_usd', '—')} USD
" + f"
Max drawdown: {r.get('max_drawdown_usd', '—')} USD
" + f"
Win rate: {r.get('win_rate', '—')}
" + f"
Fill rate: {r.get('fill_rate', '—')}
" + f"
Latency p50: {r.get('execution_latency_p50_ms', '—')} ms
" + f"
Created: {job.created_at}
" + f"
" + ) + + return HTMLResponse(report_html) + + @router.post("/dashboard/control/start", response_class=HTMLResponse) async def dashboard_control_start(request: Request) -> HTMLResponse: controls = _dashboard_controls_state(request) diff --git a/src/arbitrade/backtesting/runner.py b/src/arbitrade/backtesting/runner.py new file mode 100644 index 0000000..260d9d2 --- /dev/null +++ b/src/arbitrade/backtesting/runner.py @@ -0,0 +1,143 @@ +"""Async backtest job runner — picks up pending jobs from DB and executes them.""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime +from pathlib import Path + +import structlog + +from arbitrade.backtesting.replay import BacktestConfig, BacktestReplayEngine, load_replay_events +from arbitrade.detection.graph import CurrencyGraph, TriangularCycle +from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.repositories import BacktestJobRepository + +_LOG = structlog.get_logger(__name__) + + +def _build_cycles_from_events( + symbols: set[str], +) -> tuple[dict[str, list[TriangularCycle]], list[str]]: + graph = CurrencyGraph() + for symbol in sorted(symbols): + if "/" not in symbol: + continue + base, quote = symbol.upper().split("/", 1) + graph.add_pair(base, quote, f"{base}/{quote}") + cycles = graph.triangular_cycles() + return graph.index_cycles_by_pair(cycles), sorted(symbols) + + +def _parse_balances(raw: str) -> dict[str, float]: + balances: dict[str, float] = {} + for entry in raw.split(","): + stripped = entry.strip() + if not stripped: + continue + if "=" not in stripped: + continue + asset, value = stripped.split("=", 1) + balances[asset.strip().upper()] = float(value) + return balances or {"USD": 1000.0} + + +async def run_backtest_job( + job_id: str, + events_path: str, + config_dict: dict[str, object] | None, + store: DuckDBStore, +) -> None: + """Execute a single backtest job: load events, run engine, store report to DB.""" + repo = BacktestJobRepository(store) + repo.update_status(job_id, "running") + _LOG.info("backtest_job_started", job_id=job_id, events_path=events_path) + + try: + path = Path(events_path) + if not path.is_absolute(): + path = Path("data") / path + path = path.resolve() + + events = load_replay_events(path) + if not events: + raise ValueError(f"No events found in {path}") + + config = config_dict or {} + starting_balances_raw = str(config.get( + "starting_balances", "USD=1000.0")) + starting_balances = _parse_balances(starting_balances_raw) + + fee_rate = float(config.get("fee_rate", 0.0026)) + trade_capital = float(config.get("trade_capital", 100.0)) + min_profit_threshold = float( + config.get("min_profit_threshold", 0.0005)) + slippage_bps = float(config.get("slippage_bps", 4.0)) + execution_latency_ms = float(config.get("execution_latency_ms", 20.0)) + + cycles_by_pair, available_pairs = _build_cycles_from_events( + {e.symbol.upper() for e in events} + ) + if not cycles_by_pair: + raise ValueError("No triangular cycles found in event data") + + bt_config = BacktestConfig( + fee_rate=fee_rate, + min_profit_threshold=min_profit_threshold, + trade_capital=trade_capital, + slippage_bps=slippage_bps, + execution_latency_ms=execution_latency_ms, + ) + + engine = BacktestReplayEngine( + cycles_by_pair=cycles_by_pair, + available_pairs=available_pairs, + config=bt_config, + started_at=events[0].occurred_at, + ) + report = await engine.run(events, starting_balances=starting_balances) + + report_dict = { + "processed_events": report.processed_events, + "opportunities_seen": report.opportunities_seen, + "trades_executed": report.trades_executed, + "win_rate": report.win_rate, + "fill_rate": report.fill_rate, + "realized_pnl_usd": report.realized_pnl_usd, + "max_drawdown_usd": report.max_drawdown_usd, + "execution_latency_p50_ms": report.execution_latency_p50_ms, + "execution_latency_p95_ms": report.execution_latency_p95_ms, + "execution_latency_p99_ms": report.execution_latency_p99_ms, + "started_at": report.started_at.isoformat(), + "finished_at": report.finished_at.isoformat(), + } + + repo.store_report(job_id, report_dict) + repo.update_status(job_id, "completed") + _LOG.info("backtest_job_completed", job_id=job_id, + pnl=report.realized_pnl_usd) + + except Exception as exc: + repo.update_status(job_id, "failed", error=str(exc)) + _LOG.exception("backtest_job_failed", job_id=job_id, error=str(exc)) + + +async def backtest_worker( + queue: asyncio.Queue[tuple[str, str, dict[str, object] | None] | None], + store: DuckDBStore, +) -> None: + """Worker coroutine: pull jobs from queue and execute them one at a time.""" + _LOG.info("backtest_worker_started") + while True: + item = await queue.get() + if item is None: + queue.task_done() + break + job_id, events_path, config = item + try: + await run_backtest_job(job_id, events_path, config, store) + except Exception: + _LOG.exception("backtest_worker_unhandled_error", job_id=job_id) + finally: + queue.task_done() + _LOG.info("backtest_worker_stopped") diff --git a/src/arbitrade/storage/db.py b/src/arbitrade/storage/db.py index 6acfd62..1f7f8c9 100644 --- a/src/arbitrade/storage/db.py +++ b/src/arbitrade/storage/db.py @@ -156,11 +156,23 @@ CREATE TABLE IF NOT EXISTS kraken_account_snapshots ( trade_balance_raw JSON, fee_schedule_raw JSON ); + +CREATE TABLE IF NOT EXISTS backtest_jobs ( + id UUID DEFAULT uuid(), + status VARCHAR NOT NULL DEFAULT 'pending', + events_path VARCHAR NOT NULL, + config JSON, + report JSON, + error VARCHAR, + created_at TIMESTAMP DEFAULT current_timestamp, + started_at TIMESTAMP, + finished_at TIMESTAMP +); """ class DuckDBStore: - SCHEMA_VERSION = 4 + SCHEMA_VERSION = 5 def __init__(self, settings: Settings) -> None: self._db_path = Path(settings.duckdb_path) @@ -283,6 +295,24 @@ class DuckDBStore: "INSERT OR IGNORE INTO schema_migrations (version) VALUES (4)") _LOG.info("migration_applied", version=4) + if current_version < 5: + conn.execute(""" + CREATE TABLE IF NOT EXISTS backtest_jobs ( + id UUID DEFAULT uuid(), + status VARCHAR NOT NULL DEFAULT 'pending', + events_path VARCHAR NOT NULL, + config JSON, + report JSON, + error VARCHAR, + created_at TIMESTAMP DEFAULT current_timestamp, + started_at TIMESTAMP, + finished_at TIMESTAMP + ) + """) + conn.execute( + "INSERT OR IGNORE INTO schema_migrations (version) VALUES (5)") + _LOG.info("migration_applied", version=5) + # Update version to current conn.execute( f"INSERT OR REPLACE INTO schema_migrations (version, applied_at) " diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index 05f3ebf..769f345 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -999,3 +999,109 @@ class KrakenAccountSnapshotRepository: trade_balance_raw=orjson.loads(row[5]) if row[5] else None, fee_schedule_raw=orjson.loads(row[6]) if row[6] else None, ) + + +@dataclass(slots=True) +class BacktestJobRecord: + id: str | None = None + status: str = "pending" + events_path: str = "" + config: dict[str, Any] | None = None + report: dict[str, Any] | None = None + error: str | None = None + created_at: datetime | None = None + started_at: datetime | None = None + finished_at: datetime | None = None + + +class BacktestJobRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def create_job(self, events_path: str, config: dict[str, Any] | None = None) -> BacktestJobRecord: + with self._store.connect() as conn: + row = conn.execute( + """ + INSERT INTO backtest_jobs (events_path, config) + VALUES (?, ?) + RETURNING id, status, events_path, config, created_at + """, + (events_path, orjson.dumps(config).decode( + "utf-8") if config else None), + ).fetchone() + if row is None: + raise ValueError("Failed to create backtest job") + return BacktestJobRecord( + id=str(row[0]), status=str(row[1]), events_path=str(row[2]), + config=orjson.loads(row[3]) if row[3] else None, + created_at=row[4], + ) + + def update_status(self, job_id: str, status: str, error: str | None = None) -> None: + with self._store.connect() as conn: + if status == "running": + conn.execute( + "UPDATE backtest_jobs SET status = ?, started_at = current_timestamp WHERE id = ?", + (status, job_id), + ) + elif status in ("completed", "failed"): + conn.execute( + "UPDATE backtest_jobs SET status = ?, finished_at = current_timestamp, error = ? WHERE id = ?", + (status, error, job_id), + ) + else: + conn.execute( + "UPDATE backtest_jobs SET status = ?, error = ? WHERE id = ?", + (status, error, job_id), + ) + + def store_report(self, job_id: str, report: dict[str, Any]) -> None: + with self._store.connect() as conn: + conn.execute( + "UPDATE backtest_jobs SET report = ? WHERE id = ?", + (orjson.dumps(report).decode("utf-8"), job_id), + ) + + def get_job(self, job_id: str) -> BacktestJobRecord | None: + with self._store.connect() as conn: + row = conn.execute( + """SELECT id, status, events_path, config, report, error, + created_at, started_at, finished_at + FROM backtest_jobs WHERE id = ?""", + (job_id,), + ).fetchone() + if row is None: + return None + return BacktestJobRecord( + id=str(row[0]), status=str(row[1]), events_path=str(row[2]), + config=orjson.loads(row[3]) if row[3] else None, + report=orjson.loads(row[4]) if row[4] else None, + error=str(row[5]) if row[5] else None, + created_at=row[6], started_at=row[7], finished_at=row[8], + ) + + def list_jobs(self, limit: int = 20) -> list[BacktestJobRecord]: + with self._store.connect() as conn: + rows = conn.execute( + """SELECT id, status, events_path, config, report, error, + created_at, started_at, finished_at + FROM backtest_jobs ORDER BY created_at DESC LIMIT ?""", + (limit,), + ).fetchall() + return [ + BacktestJobRecord( + id=str(r[0]), status=str(r[1]), events_path=str(r[2]), + config=orjson.loads(r[3]) if r[3] else None, + report=orjson.loads(r[4]) if r[4] else None, + error=str(r[5]) if r[5] else None, + created_at=r[6], started_at=r[7], finished_at=r[8], + ) + for r in rows + ] + + def delete_job(self, job_id: str) -> bool: + with self._store.connect() as conn: + result = conn.execute( + "DELETE FROM backtest_jobs WHERE id = ?", (job_id,), + ) + return result.rowcount > 0 diff --git a/src/arbitrade/web/templates/partials/backtesting_panel.html b/src/arbitrade/web/templates/partials/backtesting_panel.html index 41320b7..bcccd8a 100644 --- a/src/arbitrade/web/templates/partials/backtesting_panel.html +++ b/src/arbitrade/web/templates/partials/backtesting_panel.html @@ -125,20 +125,78 @@ value="{{ execution_latency_ms }}" /> - +
-
Recent Runs
- {% if recent_reports %} {% for item in recent_reports %} -
- {{ item.run_at }} | {{ item.events_path }} | trades={{ - item.report.trades_executed }} | pnl={{ - '%.4f'|format(item.report.realized_pnl_usd) }} USD +
Recent Jobs
+ {% if recent_reports %} +
+ + + + + + + + + + + + + + {% for item in recent_reports %} + + + + + + + + + + + {% endfor %} + +
Job + Status + + Events + + Trades + P&L + Created +
+ + {{ item.status }}{{ item.events_path }} + {{ item.report.trades_executed if item.report else "—" }} + + {{ '%.2f'|format(item.report.realized_pnl_usd) if item.report and + item.report.realized_pnl_usd else "—" }} + {{ item.run_at[:19] }} + +
- {% endfor %} {% else %} -
No recent reports yet.
+ {% else %} +
No jobs submitted yet.
{% endif %}