diff --git a/CHANGELOG.md b/CHANGELOG.md index 6daee44..2ce4e79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,3 +21,5 @@ - Added deterministic order idempotency via Kraken userref plus reconciliation helpers for Kraken order history responses. - Added execution journaling for trades, orders, and estimated P&L, plus a DuckDB startup fallback when the default file path is unavailable. - Added a mocked execution integration test that drives the triangular sequencer through the execution journal and DuckDB persistence. +- Added a DuckDB-backed performance metrics calculator for realized P&L, win rate, trade duration, opportunities/min, fill rate, and latency percentiles. +- Added dashboard page plus HTMX metrics fragment and SSE metrics stream. diff --git a/src/arbitrade/api/app.py b/src/arbitrade/api/app.py index cdcef84..2fffbca 100644 --- a/src/arbitrade/api/app.py +++ b/src/arbitrade/api/app.py @@ -5,6 +5,7 @@ from fastapi import FastAPI from arbitrade.api.routes import router from arbitrade.config.settings import Settings from arbitrade.logging_setup import configure_logging +from arbitrade.metrics import MetricsCalculator from arbitrade.storage.db import DuckDBStore @@ -15,5 +16,7 @@ def create_app(settings: Settings) -> FastAPI: db.migrate() app = FastAPI(title="arbitrade", version="0.1.0") + app.state.store = db + app.state.metrics = MetricsCalculator(db) app.include_router(router) return app diff --git a/src/arbitrade/api/routes.py b/src/arbitrade/api/routes.py index da884f6..29ab4c8 100644 --- a/src/arbitrade/api/routes.py +++ b/src/arbitrade/api/routes.py @@ -1,28 +1,109 @@ from __future__ import annotations +import json +from collections.abc import AsyncIterator from datetime import UTC, datetime +from pathlib import Path from fastapi import APIRouter, Request -from fastapi.responses import HTMLResponse, JSONResponse +from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.templating import Jinja2Templates router = APIRouter() -templates = Jinja2Templates(directory="web/templates") +templates = Jinja2Templates( + directory=str(Path(__file__).resolve().parents[3] / "web" / "templates") +) + + +def _format_metric(value: float | None, *, precision: int = 2, suffix: str = "") -> str: + if value is None: + return "—" + return f"{value:.{precision}f}{suffix}" + + +def _dashboard_metrics(request: Request) -> dict[str, str]: + metrics = request.app.state.metrics.compute() + return { + "realized_pnl": _format_metric(metrics.realized_pnl_usd, precision=2, suffix=" USD"), + "win_rate": _format_metric( + metrics.win_rate * 100.0 if metrics.win_rate is not None else None, + precision=1, + suffix="%", + ), + "avg_trade_duration": _format_metric( + metrics.avg_trade_duration_seconds, precision=1, suffix=" s" + ), + "opportunities_per_minute": _format_metric( + metrics.opportunities_per_minute, precision=1, suffix=" /min" + ), + "fill_rate": _format_metric( + metrics.fill_rate * 100.0 if metrics.fill_rate is not None else None, + precision=1, + suffix="%", + ), + "latency_p50": _format_metric(metrics.latency_p50_seconds, precision=3, suffix=" s"), + "latency_p95": _format_metric(metrics.latency_p95_seconds, precision=3, suffix=" s"), + "latency_p99": _format_metric(metrics.latency_p99_seconds, precision=3, suffix=" s"), + "generated_at": datetime.now(UTC).isoformat(), + } @router.get("/", response_class=HTMLResponse) async def home(request: Request) -> HTMLResponse: return templates.TemplateResponse( request=request, - name="health.html", + name="dashboard.html", context={ - "status": "ok", - "time": datetime.now(UTC).isoformat(), - "title": "Arbitrade Health", + "title": "Arbitrade Dashboard", + "request": request, + "metrics_endpoint": "/dashboard/fragment/metrics", + "stream_endpoint": "/dashboard/stream/metrics", }, ) +@router.get("/dashboard", response_class=HTMLResponse) +async def dashboard(request: Request) -> HTMLResponse: + return templates.TemplateResponse( + request=request, + name="dashboard.html", + context={ + "title": "Arbitrade Dashboard", + "request": request, + "metrics_endpoint": "/dashboard/fragment/metrics", + "stream_endpoint": "/dashboard/stream/metrics", + }, + ) + + +@router.get("/dashboard/fragment/metrics", response_class=HTMLResponse) +async def dashboard_metrics(request: Request) -> HTMLResponse: + return templates.TemplateResponse( + request=request, + name="partials/metrics.html", + context={"request": request, **_dashboard_metrics(request)}, + ) + + +@router.get("/dashboard/stream/metrics") +async def dashboard_metrics_stream(request: Request) -> StreamingResponse: + fragment = ( + templates.get_template("partials/metrics.html") + .render( + request=request, + **_dashboard_metrics(request), + ) + .strip() + .replace("\n", "") + ) + + async def _event_stream() -> AsyncIterator[bytes]: + payload = json.dumps(fragment) + yield f"event: metrics\ndata: {payload}\n\n".encode() + + return StreamingResponse(_event_stream(), media_type="text/event-stream") + + @router.get("/health", response_class=JSONResponse) async def health() -> JSONResponse: return JSONResponse({"status": "ok", "service": "arbitrade"}) diff --git a/src/arbitrade/execution/sequencer.py b/src/arbitrade/execution/sequencer.py index bb22007..45e4eb7 100644 --- a/src/arbitrade/execution/sequencer.py +++ b/src/arbitrade/execution/sequencer.py @@ -40,13 +40,11 @@ class TriangularExecutionSequencer: rest_client: SupportsOrderPlacement, *, available_pairs: Sequence[str], - volume_for_leg: Callable[[OpportunityEvent, - ExecutionLeg, int], float] | None = None, + volume_for_leg: Callable[[OpportunityEvent, ExecutionLeg, int], float] | None = None, execution_writer: AsyncExecutionWriter | None = None, ) -> None: self._rest_client = rest_client - self._available_pairs = {self._normalize_pair( - pair) for pair in available_pairs} + self._available_pairs = {self._normalize_pair(pair) for pair in available_pairs} self._volume_for_leg = volume_for_leg or self._default_volume_for_leg self._execution_writer = execution_writer @@ -91,15 +89,12 @@ class TriangularExecutionSequencer: raise ValueError(f"No tradable pair for leg {from_cur}->{to_cur}") def _build_legs(self, event: OpportunityEvent) -> tuple[ExecutionLeg, ...]: - currencies = [part.strip().upper() - for part in event.cycle.split("->") if part.strip()] + currencies = [part.strip().upper() for part in event.cycle.split("->") if part.strip()] if len(currencies) < 4 or currencies[0] != currencies[-1]: - raise ValueError( - "cycle must be a closed triangular path like A->B->C->A") + raise ValueError("cycle must be a closed triangular path like A->B->C->A") if len(currencies) != 4: - raise ValueError( - "cycle must contain exactly three unique currencies") + raise ValueError("cycle must contain exactly three unique currencies") legs: list[ExecutionLeg] = [] for idx in range(3): @@ -114,8 +109,7 @@ class TriangularExecutionSequencer: ) volume = self._volume_for_leg(event, placeholder_leg, idx) if volume <= 0.0: - raise ValueError( - "volume_for_leg must return a positive volume") + raise ValueError("volume_for_leg must return a positive volume") legs.append(self._resolve_leg(from_currency, to_currency, volume)) return tuple(legs) @@ -177,8 +171,7 @@ class TriangularExecutionSequencer: responses.append(response) if self._execution_writer is not None: - order_ref = self._order_ref_from_response( - response, f"leg-{idx}") + order_ref = self._order_ref_from_response(response, f"leg-{idx}") await self._execution_writer.enqueue( OrderRecord( trade_ref=trade_ref, diff --git a/src/arbitrade/metrics.py b/src/arbitrade/metrics.py new file mode 100644 index 0000000..869ce22 --- /dev/null +++ b/src/arbitrade/metrics.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass +from datetime import datetime +from statistics import fmean + +from arbitrade.storage.db import DuckDBStore + + +@dataclass(frozen=True, slots=True) +class PerformanceMetrics: + realized_pnl_usd: float + win_rate: float | None + avg_trade_duration_seconds: float | None + opportunities_per_minute: float | None + fill_rate: float | None + latency_p50_seconds: float | None + latency_p95_seconds: float | None + latency_p99_seconds: float | None + + +class MetricsCalculator: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + @staticmethod + def _percentile(values: Iterable[float], percentile: float) -> float | None: + samples = sorted(values) + if not samples: + return None + + if percentile <= 0.0: + return samples[0] + if percentile >= 100.0: + return samples[-1] + + rank = (len(samples) - 1) * (percentile / 100.0) + lower_index = int(rank) + upper_index = min(lower_index + 1, len(samples) - 1) + weight = rank - lower_index + return samples[lower_index] * (1.0 - weight) + samples[upper_index] * weight + + def compute(self) -> PerformanceMetrics: + with self._store.connect() as conn: + trade_rows = conn.execute(""" + SELECT started_at, finished_at, realized_pnl + FROM trades + WHERE finished_at IS NOT NULL + """).fetchall() + opportunity_rows = conn.execute(""" + SELECT detected_at + FROM opportunities + """).fetchall() + order_rows = conn.execute(""" + SELECT volume, filled_volume + FROM orders + WHERE volume > 0 + """).fetchall() + + realized_values = [float(row[2]) for row in trade_rows if row[2] is not None] + realized_pnl_usd = sum(realized_values) + + total_trades = len(trade_rows) + winning_trades = sum(1 for row in trade_rows if row[2] is not None and float(row[2]) > 0.0) + win_rate = winning_trades / total_trades if total_trades > 0 else None + + durations_seconds = [ + (row[1] - row[0]).total_seconds() + for row in trade_rows + if isinstance(row[0], datetime) and isinstance(row[1], datetime) + ] + avg_trade_duration_seconds = fmean(durations_seconds) if durations_seconds else None + + opportunity_times = [row[0] for row in opportunity_rows if isinstance(row[0], datetime)] + opportunities_per_minute: float | None + if len(opportunity_times) >= 2: + span_seconds = (max(opportunity_times) - min(opportunity_times)).total_seconds() + opportunities_per_minute = ( + len(opportunity_times) / (span_seconds / 60.0) + if span_seconds > 0.0 + else float(len(opportunity_times)) + ) + elif len(opportunity_times) == 1: + opportunities_per_minute = 60.0 + else: + opportunities_per_minute = None + + fill_samples = [ + float(filled) / float(volume) + for volume, filled in order_rows + if filled is not None and float(volume) > 0.0 + ] + fill_rate = fmean(fill_samples) if fill_samples else None + + latency_p50_seconds = self._percentile(durations_seconds, 50.0) + latency_p95_seconds = self._percentile(durations_seconds, 95.0) + latency_p99_seconds = self._percentile(durations_seconds, 99.0) + + return PerformanceMetrics( + realized_pnl_usd=realized_pnl_usd, + win_rate=win_rate, + avg_trade_duration_seconds=avg_trade_duration_seconds, + opportunities_per_minute=opportunities_per_minute, + fill_rate=fill_rate, + latency_p50_seconds=latency_p50_seconds, + latency_p95_seconds=latency_p95_seconds, + latency_p99_seconds=latency_p99_seconds, + ) diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py new file mode 100644 index 0000000..3b37c1f --- /dev/null +++ b/tests/test_dashboard.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +import httpx + +from arbitrade.api.app import create_app +from arbitrade.config.settings import Settings + + +def _seed_metrics_data(app) -> None: + store = app.state.store + started = datetime.now(UTC) + finished = started + timedelta(seconds=20) + with store.connect() as conn: + conn.execute( + """ + INSERT INTO trades ( + trade_ref, + started_at, + finished_at, + status, + realized_pnl, + estimated_pnl, + capital_used, + cycle, + leg_count + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + "trade-1", + started, + finished, + "filled", + 15.0, + 10.0, + 100.0, + "USD->BTC->ETH->USD", + 3, + ], + ) + conn.execute( + """ + INSERT INTO opportunities ( + detected_at, + cycle, + gross_pct, + net_pct, + est_profit, + executed + ) VALUES (?, ?, ?, ?, ?, ?) + """, + [started, "USD->BTC->ETH->USD", 4.0, 3.0, 0.03, True], + ) + conn.execute( + """ + INSERT INTO orders ( + trade_ref, + order_ref, + leg_index, + pair, + side, + volume, + user_ref, + status, + filled_volume, + avg_price, + raw_response, + recorded_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + "trade-1", + "order-1", + 0, + "BTC/USD", + "buy", + 2.0, + 100, + "closed", + 2.0, + 100.0, + "{}", + started, + ], + ) + + +async def test_dashboard_page_and_fragment_and_sse(tmp_path) -> None: + app = create_app( + Settings(_env_file=None, DUCKDB_PATH=tmp_path / "dash.duckdb")) + _seed_metrics_data(app) + + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + page = await client.get("/") + fragment = await client.get("/dashboard/fragment/metrics") + stream = await client.get("/dashboard/stream/metrics") + + assert page.status_code == 200 + assert "EventSource" in page.text + assert 'hx-get="/dashboard/fragment/metrics"' in page.text + + assert fragment.status_code == 200 + assert "Realized P&L" in fragment.text + assert "15.00 USD" in fragment.text + assert "100.0%" in fragment.text + + assert stream.status_code == 200 + assert stream.headers["content-type"].startswith("text/event-stream") + assert "event: metrics" in stream.text + assert "Realized P&L" in stream.text diff --git a/tests/unit/test_execution_persistence.py b/tests/unit/test_execution_persistence.py index 1f8d4ce..b7bd887 100644 --- a/tests/unit/test_execution_persistence.py +++ b/tests/unit/test_execution_persistence.py @@ -69,8 +69,7 @@ async def test_execution_writer_persists_trade_order_and_pnl(tmp_path) -> None: "SELECT trade_ref, order_ref, leg_index, pair, side, volume, status " "FROM orders ORDER BY leg_index" ).fetchall() - pnls = conn.execute( - "SELECT trade_ref, kind, pnl_usd, source FROM pnl_events").fetchall() + pnls = conn.execute("SELECT trade_ref, kind, pnl_usd, source FROM pnl_events").fetchall() assert len(trades) == 1 assert trades[0][1] == "filled" diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py new file mode 100644 index 0000000..5893c9b --- /dev/null +++ b/tests/unit/test_metrics.py @@ -0,0 +1,144 @@ +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +import pytest + +from arbitrade.config.settings import Settings +from arbitrade.metrics import MetricsCalculator +from arbitrade.storage.db import DuckDBStore + + +def test_metrics_calculator_summarizes_execution_data(tmp_path) -> None: + settings = Settings(_env_file=None, DUCKDB_PATH=tmp_path / "metrics.duckdb") + store = DuckDBStore(settings) + store.migrate() + + started = datetime.now(UTC) + finished = started + timedelta(seconds=30) + started_two = started + timedelta(minutes=1) + finished_two = started_two + timedelta(seconds=90) + + with store.connect() as conn: + conn.execute( + """ + INSERT INTO trades ( + trade_ref, + started_at, + finished_at, + status, + realized_pnl, + estimated_pnl, + capital_used, + cycle, + leg_count + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + "trade-1", + started, + finished, + "filled", + 12.5, + 10.0, + 100.0, + "USD->BTC->ETH->USD", + 3, + "trade-2", + started_two, + finished_two, + "filled", + -4.5, + -2.0, + 200.0, + "USD->ETH->BTC->USD", + 3, + ], + ) + conn.execute( + """ + INSERT INTO opportunities ( + detected_at, + cycle, + gross_pct, + net_pct, + est_profit, + executed + ) VALUES (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?) + """, + [ + started, + "USD->BTC->ETH->USD", + 4.0, + 3.0, + 0.03, + True, + started_two, + "USD->ETH->BTC->USD", + 2.0, + 1.0, + 0.01, + False, + started_two + timedelta(seconds=30), + "USD->BTC->ETH->USD", + 5.0, + 4.0, + 0.04, + True, + ], + ) + conn.execute( + """ + INSERT INTO orders ( + trade_ref, + order_ref, + leg_index, + pair, + side, + volume, + user_ref, + status, + filled_volume, + avg_price, + raw_response, + recorded_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + "trade-1", + "order-1", + 0, + "BTC/USD", + "buy", + 2.0, + 101, + "closed", + 2.0, + 100.0, + "{}", + started, + "trade-2", + "order-2", + 0, + "ETH/USD", + "sell", + 4.0, + 202, + "closed", + 3.0, + 200.0, + "{}", + started_two, + ], + ) + + metrics = MetricsCalculator(store).compute() + + assert metrics.realized_pnl_usd == 8.0 + assert metrics.win_rate == 0.5 + assert metrics.avg_trade_duration_seconds == 60.0 + assert metrics.opportunities_per_minute == 2.0 + assert metrics.fill_rate == 0.875 + assert metrics.latency_p50_seconds == 60.0 + assert metrics.latency_p95_seconds == 87.0 + assert metrics.latency_p99_seconds == pytest.approx(89.4) diff --git a/web/templates/dashboard.html b/web/templates/dashboard.html new file mode 100644 index 0000000..8a0dcce --- /dev/null +++ b/web/templates/dashboard.html @@ -0,0 +1,127 @@ + + + + + + {{ title }} + + + + +
+
+
+

Arbitrade Dashboard

+

Live execution, P&L, and system state.

+
+ +
+ +
+ {% include "partials/metrics.html" %} +
+ + +
+ + diff --git a/web/templates/partials/metrics.html b/web/templates/partials/metrics.html new file mode 100644 index 0000000..1748e29 --- /dev/null +++ b/web/templates/partials/metrics.html @@ -0,0 +1,31 @@ +
+
+
+
Realized P&L
+
{{ realized_pnl }}
+
+
+
Win Rate
+
{{ win_rate }}
+
+
+
Avg Trade Duration
+
{{ avg_trade_duration }}
+
+
+
Opportunities / Min
+
{{ opportunities_per_minute }}
+
+
+
Fill Rate
+
{{ fill_rate }}
+
+
+
Latency p50 / p95 / p99
+
+ {{ latency_p50 }} | {{ latency_p95 }} | {{ latency_p99 }} +
+
+
+
Updated {{ generated_at }}
+