feat: implement backtesting job management with database integration and UI updates

This commit is contained in:
2026-06-03 21:34:19 +02:00
parent ff71fc5feb
commit 6acd6bbbc9
6 changed files with 447 additions and 90 deletions
+17 -1
View File
@@ -10,6 +10,7 @@ import asyncio
from arbitrade.alerting.notifier import build_notifier_from_settings
from arbitrade.api.control_state import DashboardControlState
from arbitrade.api.routes import public_router, router
from arbitrade.backtesting.runner import backtest_worker
from arbitrade.config.settings import Settings
from arbitrade.config.service import ConfigurationService
from arbitrade.exchange.fee_service import run_fee_sync_loop
@@ -28,6 +29,8 @@ def create_app(settings: Settings) -> FastAPI:
db.migrate()
kraken_client = KrakenRestClient(settings)
fee_sync_stop_event = asyncio.Event()
backtest_queue: asyncio.Queue[tuple[str, str,
dict[str, object] | None] | None] = asyncio.Queue()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
@@ -41,7 +44,12 @@ def create_app(settings: Settings) -> FastAPI:
),
name="fee_sync_loop",
)
backtest_task = asyncio.create_task(
backtest_worker(backtest_queue, db),
name="backtest_worker",
)
app.state.fee_sync_task = fee_sync_task
app.state.backtest_task = backtest_task
yield
fee_sync_stop_event.set()
fee_sync_task.cancel()
@@ -49,6 +57,12 @@ def create_app(settings: Settings) -> FastAPI:
await fee_sync_task
except asyncio.CancelledError:
pass
await backtest_queue.put(None) # poison pill
backtest_task.cancel()
try:
await backtest_task
except asyncio.CancelledError:
pass
await kraken_client.close()
await graceful_shutdown(app)
@@ -75,11 +89,13 @@ def create_app(settings: Settings) -> FastAPI:
app.state.store = db
app.state.kraken_client = kraken_client
app.state.fee_sync_stop_event = fee_sync_stop_event
app.state.backtest_queue = backtest_queue
app.state.metrics = MetricsCalculator(db)
app.state.audit_repository = AuditRepository(db)
app.state.runtime_state_repository = RuntimeStateRepository(db)
app.state.alert_notifier = build_notifier_from_settings(settings)
app.state.configuration_service = ConfigurationService(settings, db, AuditRepository(db))
app.state.configuration_service = ConfigurationService(
settings, db, AuditRepository(db))
app.state.backtest_recent_reports = []
app.state.dashboard_controls = DashboardControlState(
is_running=not settings.kill_switch_active,
+83 -79
View File
@@ -19,7 +19,7 @@ from arbitrade.api.auth import require_dashboard_auth
from arbitrade.api.control_state import DashboardControlState
from arbitrade.backtesting.replay import BacktestConfig, BacktestReplayEngine, load_replay_events
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
from arbitrade.storage.repositories import AuditRecord, AuditRepository, KrakenAccountSnapshotRepository
from arbitrade.storage.repositories import AuditRecord, AuditRepository, BacktestJobRepository, KrakenAccountSnapshotRepository
router = APIRouter(dependencies=[Depends(require_dashboard_auth)])
public_router = APIRouter()
@@ -660,10 +660,22 @@ def _build_cycles_from_events(
def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
reports = getattr(request.app.state, "backtest_recent_reports", [])
if isinstance(reports, list):
return cast(list[dict[str, object]], reports)
return []
"""Fetch recent backtest jobs from DB."""
store = request.app.state.store
repo = BacktestJobRepository(store)
jobs = repo.list_jobs(limit=20)
return [
{
"job_id": j.id or "",
"run_at": j.created_at.isoformat() if j.created_at else "",
"events_path": j.events_path,
"status": j.status,
"config": j.config or {},
"report": j.report or {},
"error": j.error,
}
for j in jobs
]
def _backtesting_panel_context(
@@ -945,6 +957,7 @@ async def dashboard_backtesting_reports(request: Request) -> JSONResponse:
@router.post("/dashboard/backtesting/run", response_class=HTMLResponse)
async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
"""Submit a backtest job to the async queue. Returns panel with job list."""
form = _parse_form_body(await request.body())
defaults = {
"events_path": form.get("events_path", ""),
@@ -963,96 +976,44 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
raise ValueError(
"events_path must reference an existing JSONL file")
events = load_replay_events(events_path)
if not events:
raise ValueError("events file contains no replay events")
custom_fee_rate = (
float(defaults["custom_fee_rate"]
) if defaults["custom_fee_rate"].strip() else None
float(defaults["custom_fee_rate"])
if defaults["custom_fee_rate"].strip() else None
)
fee_rate = _fee_rate_for_profile(
defaults["fee_profile"], custom_fee_rate, request=request)
starting_balances = _parse_balances(defaults["starting_balances"])
trade_capital = float(defaults["trade_capital"])
min_profit_threshold = float(defaults["min_profit_threshold"])
slippage_bps = float(defaults["slippage_bps"])
execution_latency_ms = float(defaults["execution_latency_ms"])
cycles_by_pair, available_pairs = _build_cycles_from_events(
{event.symbol.upper() for event in events}
)
if not cycles_by_pair:
raise ValueError(
"unable to derive triangular cycles from provided events")
config = BacktestConfig(
fee_rate=fee_rate,
min_profit_threshold=min_profit_threshold,
trade_capital=trade_capital,
slippage_bps=slippage_bps,
execution_latency_ms=execution_latency_ms,
)
async with _BACKTEST_RUN_LOCK:
engine = BacktestReplayEngine(
cycles_by_pair=cycles_by_pair,
available_pairs=available_pairs,
config=config,
started_at=events[0].occurred_at,
)
report = await engine.run(events, starting_balances=starting_balances)
report_item: dict[str, object] = {
"run_at": datetime.now(UTC).isoformat(),
config_dict = {
"events_path": _display_path(events_path),
"status": "completed",
"config": {
"trade_capital": trade_capital,
"min_profit_threshold": min_profit_threshold,
"fee_profile": defaults["fee_profile"],
"fee_rate": fee_rate,
"slippage_bps": slippage_bps,
"execution_latency_ms": execution_latency_ms,
},
"report": {
"processed_events": report.processed_events,
"opportunities_seen": report.opportunities_seen,
"trades_executed": report.trades_executed,
"win_rate": report.win_rate,
"fill_rate": report.fill_rate,
"realized_pnl_usd": report.realized_pnl_usd,
"max_drawdown_usd": report.max_drawdown_usd,
"miss_reasons": dict(report.miss_reasons),
"execution_latency_p50_ms": report.execution_latency_p50_ms,
"execution_latency_p95_ms": report.execution_latency_p95_ms,
"execution_latency_p99_ms": report.execution_latency_p99_ms,
},
"starting_balances": defaults["starting_balances"],
"trade_capital": float(defaults["trade_capital"]),
"min_profit_threshold": float(defaults["min_profit_threshold"]),
"fee_rate": fee_rate,
"fee_profile": defaults["fee_profile"],
"slippage_bps": float(defaults["slippage_bps"]),
"execution_latency_ms": float(defaults["execution_latency_ms"]),
}
reports = _recent_backtest_reports(request)
reports.insert(0, report_item)
del reports[20:]
store = request.app.state.store
repo = BacktestJobRepository(store)
job = repo.create_job(str(events_path), config_dict)
queue = request.app.state.backtest_queue
await queue.put((job.id or "", str(events_path), config_dict))
_record_audit(
request,
actor="dashboard_user",
event_type="dashboard.backtesting.run",
decision="completed",
payload={
"events_path": report_item["events_path"],
"processed_events": report.processed_events,
"trades_executed": report.trades_executed,
"realized_pnl_usd": report.realized_pnl_usd,
},
event_type="dashboard.backtesting.submit",
decision="queued",
payload={"job_id": job.id,
"events_path": _display_path(events_path)},
)
context = _backtesting_panel_context(
request,
status="completed",
message="Backtest run completed successfully.",
latest_report=report_item,
status="submitted",
message=f"Job {job.id[:8]}... queued. Refresh to see results.",
defaults=defaults,
)
except ValueError as exc:
@@ -1070,6 +1031,49 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
)
@router.post("/dashboard/backtesting/job/{job_id}/delete", response_class=HTMLResponse)
async def dashboard_backtesting_delete(request: Request, job_id: str) -> HTMLResponse:
store = request.app.state.store
repo = BacktestJobRepository(store)
repo.delete_job(job_id)
return templates.TemplateResponse(
request=request,
name="partials/backtesting_panel.html",
context={"request": request, **_backtesting_panel_context(request)},
)
@router.get("/dashboard/backtesting/job/{job_id}", response_class=HTMLResponse)
async def dashboard_backtesting_job_detail(request: Request, job_id: str) -> HTMLResponse:
store = request.app.state.store
repo = BacktestJobRepository(store)
job = repo.get_job(job_id)
if job is None:
return HTMLResponse("<p>Job not found</p>", status_code=404)
report_html = "<div class='meta'>No report yet</div>"
if job.report:
r = job.report
report_html = (
f"<div class='panel'>"
f"<div class='label'>Job {job.id[:8]}... Report</div>"
f"<div class='meta'>Status: {job.status}</div>"
f"<div class='meta'>Events: {job.events_path}</div>"
f"<div class='meta'>Processed: {r.get('processed_events', '')}</div>"
f"<div class='meta'>Opportunities: {r.get('opportunities_seen', '')}</div>"
f"<div class='meta'>Trades: {r.get('trades_executed', '')}</div>"
f"<div class='meta'>Realized P&L: {r.get('realized_pnl_usd', '')} USD</div>"
f"<div class='meta'>Max drawdown: {r.get('max_drawdown_usd', '')} USD</div>"
f"<div class='meta'>Win rate: {r.get('win_rate', '')}</div>"
f"<div class='meta'>Fill rate: {r.get('fill_rate', '')}</div>"
f"<div class='meta'>Latency p50: {r.get('execution_latency_p50_ms', '')} ms</div>"
f"<div class='meta'>Created: {job.created_at}</div>"
f"</div>"
)
return HTMLResponse(report_html)
@router.post("/dashboard/control/start", response_class=HTMLResponse)
async def dashboard_control_start(request: Request) -> HTMLResponse:
controls = _dashboard_controls_state(request)
+143
View File
@@ -0,0 +1,143 @@
"""Async backtest job runner — picks up pending jobs from DB and executes them."""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime
from pathlib import Path
import structlog
from arbitrade.backtesting.replay import BacktestConfig, BacktestReplayEngine, load_replay_events
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.repositories import BacktestJobRepository
_LOG = structlog.get_logger(__name__)
def _build_cycles_from_events(
symbols: set[str],
) -> tuple[dict[str, list[TriangularCycle]], list[str]]:
graph = CurrencyGraph()
for symbol in sorted(symbols):
if "/" not in symbol:
continue
base, quote = symbol.upper().split("/", 1)
graph.add_pair(base, quote, f"{base}/{quote}")
cycles = graph.triangular_cycles()
return graph.index_cycles_by_pair(cycles), sorted(symbols)
def _parse_balances(raw: str) -> dict[str, float]:
balances: dict[str, float] = {}
for entry in raw.split(","):
stripped = entry.strip()
if not stripped:
continue
if "=" not in stripped:
continue
asset, value = stripped.split("=", 1)
balances[asset.strip().upper()] = float(value)
return balances or {"USD": 1000.0}
async def run_backtest_job(
job_id: str,
events_path: str,
config_dict: dict[str, object] | None,
store: DuckDBStore,
) -> None:
"""Execute a single backtest job: load events, run engine, store report to DB."""
repo = BacktestJobRepository(store)
repo.update_status(job_id, "running")
_LOG.info("backtest_job_started", job_id=job_id, events_path=events_path)
try:
path = Path(events_path)
if not path.is_absolute():
path = Path("data") / path
path = path.resolve()
events = load_replay_events(path)
if not events:
raise ValueError(f"No events found in {path}")
config = config_dict or {}
starting_balances_raw = str(config.get(
"starting_balances", "USD=1000.0"))
starting_balances = _parse_balances(starting_balances_raw)
fee_rate = float(config.get("fee_rate", 0.0026))
trade_capital = float(config.get("trade_capital", 100.0))
min_profit_threshold = float(
config.get("min_profit_threshold", 0.0005))
slippage_bps = float(config.get("slippage_bps", 4.0))
execution_latency_ms = float(config.get("execution_latency_ms", 20.0))
cycles_by_pair, available_pairs = _build_cycles_from_events(
{e.symbol.upper() for e in events}
)
if not cycles_by_pair:
raise ValueError("No triangular cycles found in event data")
bt_config = BacktestConfig(
fee_rate=fee_rate,
min_profit_threshold=min_profit_threshold,
trade_capital=trade_capital,
slippage_bps=slippage_bps,
execution_latency_ms=execution_latency_ms,
)
engine = BacktestReplayEngine(
cycles_by_pair=cycles_by_pair,
available_pairs=available_pairs,
config=bt_config,
started_at=events[0].occurred_at,
)
report = await engine.run(events, starting_balances=starting_balances)
report_dict = {
"processed_events": report.processed_events,
"opportunities_seen": report.opportunities_seen,
"trades_executed": report.trades_executed,
"win_rate": report.win_rate,
"fill_rate": report.fill_rate,
"realized_pnl_usd": report.realized_pnl_usd,
"max_drawdown_usd": report.max_drawdown_usd,
"execution_latency_p50_ms": report.execution_latency_p50_ms,
"execution_latency_p95_ms": report.execution_latency_p95_ms,
"execution_latency_p99_ms": report.execution_latency_p99_ms,
"started_at": report.started_at.isoformat(),
"finished_at": report.finished_at.isoformat(),
}
repo.store_report(job_id, report_dict)
repo.update_status(job_id, "completed")
_LOG.info("backtest_job_completed", job_id=job_id,
pnl=report.realized_pnl_usd)
except Exception as exc:
repo.update_status(job_id, "failed", error=str(exc))
_LOG.exception("backtest_job_failed", job_id=job_id, error=str(exc))
async def backtest_worker(
queue: asyncio.Queue[tuple[str, str, dict[str, object] | None] | None],
store: DuckDBStore,
) -> None:
"""Worker coroutine: pull jobs from queue and execute them one at a time."""
_LOG.info("backtest_worker_started")
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
job_id, events_path, config = item
try:
await run_backtest_job(job_id, events_path, config, store)
except Exception:
_LOG.exception("backtest_worker_unhandled_error", job_id=job_id)
finally:
queue.task_done()
_LOG.info("backtest_worker_stopped")
+31 -1
View File
@@ -156,11 +156,23 @@ CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
trade_balance_raw JSON,
fee_schedule_raw JSON
);
CREATE TABLE IF NOT EXISTS backtest_jobs (
id UUID DEFAULT uuid(),
status VARCHAR NOT NULL DEFAULT 'pending',
events_path VARCHAR NOT NULL,
config JSON,
report JSON,
error VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
started_at TIMESTAMP,
finished_at TIMESTAMP
);
"""
class DuckDBStore:
SCHEMA_VERSION = 4
SCHEMA_VERSION = 5
def __init__(self, settings: Settings) -> None:
self._db_path = Path(settings.duckdb_path)
@@ -283,6 +295,24 @@ class DuckDBStore:
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (4)")
_LOG.info("migration_applied", version=4)
if current_version < 5:
conn.execute("""
CREATE TABLE IF NOT EXISTS backtest_jobs (
id UUID DEFAULT uuid(),
status VARCHAR NOT NULL DEFAULT 'pending',
events_path VARCHAR NOT NULL,
config JSON,
report JSON,
error VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
started_at TIMESTAMP,
finished_at TIMESTAMP
)
""")
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (5)")
_LOG.info("migration_applied", version=5)
# Update version to current
conn.execute(
f"INSERT OR REPLACE INTO schema_migrations (version, applied_at) "
+106
View File
@@ -999,3 +999,109 @@ class KrakenAccountSnapshotRepository:
trade_balance_raw=orjson.loads(row[5]) if row[5] else None,
fee_schedule_raw=orjson.loads(row[6]) if row[6] else None,
)
@dataclass(slots=True)
class BacktestJobRecord:
id: str | None = None
status: str = "pending"
events_path: str = ""
config: dict[str, Any] | None = None
report: dict[str, Any] | None = None
error: str | None = None
created_at: datetime | None = None
started_at: datetime | None = None
finished_at: datetime | None = None
class BacktestJobRepository:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
def create_job(self, events_path: str, config: dict[str, Any] | None = None) -> BacktestJobRecord:
with self._store.connect() as conn:
row = conn.execute(
"""
INSERT INTO backtest_jobs (events_path, config)
VALUES (?, ?)
RETURNING id, status, events_path, config, created_at
""",
(events_path, orjson.dumps(config).decode(
"utf-8") if config else None),
).fetchone()
if row is None:
raise ValueError("Failed to create backtest job")
return BacktestJobRecord(
id=str(row[0]), status=str(row[1]), events_path=str(row[2]),
config=orjson.loads(row[3]) if row[3] else None,
created_at=row[4],
)
def update_status(self, job_id: str, status: str, error: str | None = None) -> None:
with self._store.connect() as conn:
if status == "running":
conn.execute(
"UPDATE backtest_jobs SET status = ?, started_at = current_timestamp WHERE id = ?",
(status, job_id),
)
elif status in ("completed", "failed"):
conn.execute(
"UPDATE backtest_jobs SET status = ?, finished_at = current_timestamp, error = ? WHERE id = ?",
(status, error, job_id),
)
else:
conn.execute(
"UPDATE backtest_jobs SET status = ?, error = ? WHERE id = ?",
(status, error, job_id),
)
def store_report(self, job_id: str, report: dict[str, Any]) -> None:
with self._store.connect() as conn:
conn.execute(
"UPDATE backtest_jobs SET report = ? WHERE id = ?",
(orjson.dumps(report).decode("utf-8"), job_id),
)
def get_job(self, job_id: str) -> BacktestJobRecord | None:
with self._store.connect() as conn:
row = conn.execute(
"""SELECT id, status, events_path, config, report, error,
created_at, started_at, finished_at
FROM backtest_jobs WHERE id = ?""",
(job_id,),
).fetchone()
if row is None:
return None
return BacktestJobRecord(
id=str(row[0]), status=str(row[1]), events_path=str(row[2]),
config=orjson.loads(row[3]) if row[3] else None,
report=orjson.loads(row[4]) if row[4] else None,
error=str(row[5]) if row[5] else None,
created_at=row[6], started_at=row[7], finished_at=row[8],
)
def list_jobs(self, limit: int = 20) -> list[BacktestJobRecord]:
with self._store.connect() as conn:
rows = conn.execute(
"""SELECT id, status, events_path, config, report, error,
created_at, started_at, finished_at
FROM backtest_jobs ORDER BY created_at DESC LIMIT ?""",
(limit,),
).fetchall()
return [
BacktestJobRecord(
id=str(r[0]), status=str(r[1]), events_path=str(r[2]),
config=orjson.loads(r[3]) if r[3] else None,
report=orjson.loads(r[4]) if r[4] else None,
error=str(r[5]) if r[5] else None,
created_at=r[6], started_at=r[7], finished_at=r[8],
)
for r in rows
]
def delete_job(self, job_id: str) -> bool:
with self._store.connect() as conn:
result = conn.execute(
"DELETE FROM backtest_jobs WHERE id = ?", (job_id,),
)
return result.rowcount > 0
@@ -125,20 +125,78 @@
value="{{ execution_latency_ms }}"
/>
</label>
<button type="submit" class="button">Run backtest</button>
<button type="submit" class="button">Submit Job</button>
</form>
</article>
<article class="card" style="margin-top: 16px">
<div class="label">Recent Runs</div>
{% if recent_reports %} {% for item in recent_reports %}
<div class="meta">
{{ item.run_at }} | {{ item.events_path }} | trades={{
item.report.trades_executed }} | pnl={{
'%.4f'|format(item.report.realized_pnl_usd) }} USD
<div class="label">Recent Jobs</div>
{% if recent_reports %}
<div style="overflow-x: auto">
<table style="width: 100%; border-collapse: collapse; font-size: 0.85rem">
<thead>
<tr style="border-bottom: 1px solid rgba(255, 255, 255, 0.14)">
<th style="text-align: left; padding: 8px; color: #9fb2d0">Job</th>
<th style="text-align: left; padding: 8px; color: #9fb2d0">
Status
</th>
<th style="text-align: left; padding: 8px; color: #9fb2d0">
Events
</th>
<th style="text-align: left; padding: 8px; color: #9fb2d0">
Trades
</th>
<th style="text-align: left; padding: 8px; color: #9fb2d0">P&L</th>
<th style="text-align: left; padding: 8px; color: #9fb2d0">
Created
</th>
<th style="text-align: left; padding: 8px; color: #9fb2d0"></th>
</tr>
</thead>
<tbody>
{% for item in recent_reports %}
<tr style="border-bottom: 1px solid rgba(255, 255, 255, 0.06)">
<td style="padding: 8px">
<button
class="button secondary"
style="padding: 2px 8px; font-size: 0.8rem"
hx-get="/dashboard/backtesting/job/{{ item.job_id }}"
hx-target="#job-detail-{{ loop.index }}"
hx-swap="innerHTML"
>
{{ item.job_id[:8] }}...
</button>
</td>
<td style="padding: 8px">{{ item.status }}</td>
<td style="padding: 8px; color: #7f95b7">{{ item.events_path }}</td>
<td style="padding: 8px">
{{ item.report.trades_executed if item.report else "—" }}
</td>
<td style="padding: 8px">
{{ '%.2f'|format(item.report.realized_pnl_usd) if item.report and
item.report.realized_pnl_usd else "—" }}
</td>
<td style="padding: 8px; color: #7f95b7">{{ item.run_at[:19] }}</td>
<td style="padding: 8px">
<button
class="button danger"
style="padding: 2px 8px; font-size: 0.8rem"
hx-post="/dashboard/backtesting/job/{{ item.job_id }}/delete"
hx-target="#backtesting-shell"
hx-swap="outerHTML"
onclick="return confirm('Delete job {{ item.job_id[:8] }}...?');"
>
Del
</button>
</td>
</tr>
<tr id="job-detail-{{ loop.index }}"></tr>
{% endfor %}
</tbody>
</table>
</div>
{% endfor %} {% else %}
<div class="meta">No recent reports yet.</div>
{% else %}
<div class="meta">No jobs submitted yet.</div>
{% endif %}
</article>
</div>