feat: Add performance metrics dashboard and metrics calculator

This commit is contained in:
2026-06-01 12:06:04 +02:00
parent 93f4f62d42
commit 0c232b7aee
10 changed files with 623 additions and 22 deletions
+2
View File
@@ -21,3 +21,5 @@
- Added deterministic order idempotency via Kraken userref plus reconciliation helpers for Kraken order history responses.
- Added execution journaling for trades, orders, and estimated P&L, plus a DuckDB startup fallback when the default file path is unavailable.
- Added a mocked execution integration test that drives the triangular sequencer through the execution journal and DuckDB persistence.
- Added a DuckDB-backed performance metrics calculator for realized P&L, win rate, trade duration, opportunities/min, fill rate, and latency percentiles.
- Added dashboard page plus HTMX metrics fragment and SSE metrics stream.
+3
View File
@@ -5,6 +5,7 @@ from fastapi import FastAPI
from arbitrade.api.routes import router
from arbitrade.config.settings import Settings
from arbitrade.logging_setup import configure_logging
from arbitrade.metrics import MetricsCalculator
from arbitrade.storage.db import DuckDBStore
@@ -15,5 +16,7 @@ def create_app(settings: Settings) -> FastAPI:
db.migrate()
app = FastAPI(title="arbitrade", version="0.1.0")
app.state.store = db
app.state.metrics = MetricsCalculator(db)
app.include_router(router)
return app
+87 -6
View File
@@ -1,28 +1,109 @@
from __future__ import annotations
import json
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from pathlib import Path
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
router = APIRouter()
templates = Jinja2Templates(directory="web/templates")
templates = Jinja2Templates(
directory=str(Path(__file__).resolve().parents[3] / "web" / "templates")
)
def _format_metric(value: float | None, *, precision: int = 2, suffix: str = "") -> str:
if value is None:
return ""
return f"{value:.{precision}f}{suffix}"
def _dashboard_metrics(request: Request) -> dict[str, str]:
metrics = request.app.state.metrics.compute()
return {
"realized_pnl": _format_metric(metrics.realized_pnl_usd, precision=2, suffix=" USD"),
"win_rate": _format_metric(
metrics.win_rate * 100.0 if metrics.win_rate is not None else None,
precision=1,
suffix="%",
),
"avg_trade_duration": _format_metric(
metrics.avg_trade_duration_seconds, precision=1, suffix=" s"
),
"opportunities_per_minute": _format_metric(
metrics.opportunities_per_minute, precision=1, suffix=" /min"
),
"fill_rate": _format_metric(
metrics.fill_rate * 100.0 if metrics.fill_rate is not None else None,
precision=1,
suffix="%",
),
"latency_p50": _format_metric(metrics.latency_p50_seconds, precision=3, suffix=" s"),
"latency_p95": _format_metric(metrics.latency_p95_seconds, precision=3, suffix=" s"),
"latency_p99": _format_metric(metrics.latency_p99_seconds, precision=3, suffix=" s"),
"generated_at": datetime.now(UTC).isoformat(),
}
@router.get("/", response_class=HTMLResponse)
async def home(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
name="health.html",
name="dashboard.html",
context={
"status": "ok",
"time": datetime.now(UTC).isoformat(),
"title": "Arbitrade Health",
"title": "Arbitrade Dashboard",
"request": request,
"metrics_endpoint": "/dashboard/fragment/metrics",
"stream_endpoint": "/dashboard/stream/metrics",
},
)
@router.get("/dashboard", response_class=HTMLResponse)
async def dashboard(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
name="dashboard.html",
context={
"title": "Arbitrade Dashboard",
"request": request,
"metrics_endpoint": "/dashboard/fragment/metrics",
"stream_endpoint": "/dashboard/stream/metrics",
},
)
@router.get("/dashboard/fragment/metrics", response_class=HTMLResponse)
async def dashboard_metrics(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
name="partials/metrics.html",
context={"request": request, **_dashboard_metrics(request)},
)
@router.get("/dashboard/stream/metrics")
async def dashboard_metrics_stream(request: Request) -> StreamingResponse:
fragment = (
templates.get_template("partials/metrics.html")
.render(
request=request,
**_dashboard_metrics(request),
)
.strip()
.replace("\n", "")
)
async def _event_stream() -> AsyncIterator[bytes]:
payload = json.dumps(fragment)
yield f"event: metrics\ndata: {payload}\n\n".encode()
return StreamingResponse(_event_stream(), media_type="text/event-stream")
@router.get("/health", response_class=JSONResponse)
async def health() -> JSONResponse:
return JSONResponse({"status": "ok", "service": "arbitrade"})
+7 -14
View File
@@ -40,13 +40,11 @@ class TriangularExecutionSequencer:
rest_client: SupportsOrderPlacement,
*,
available_pairs: Sequence[str],
volume_for_leg: Callable[[OpportunityEvent,
ExecutionLeg, int], float] | None = None,
volume_for_leg: Callable[[OpportunityEvent, ExecutionLeg, int], float] | None = None,
execution_writer: AsyncExecutionWriter | None = None,
) -> None:
self._rest_client = rest_client
self._available_pairs = {self._normalize_pair(
pair) for pair in available_pairs}
self._available_pairs = {self._normalize_pair(pair) for pair in available_pairs}
self._volume_for_leg = volume_for_leg or self._default_volume_for_leg
self._execution_writer = execution_writer
@@ -91,15 +89,12 @@ class TriangularExecutionSequencer:
raise ValueError(f"No tradable pair for leg {from_cur}->{to_cur}")
def _build_legs(self, event: OpportunityEvent) -> tuple[ExecutionLeg, ...]:
currencies = [part.strip().upper()
for part in event.cycle.split("->") if part.strip()]
currencies = [part.strip().upper() for part in event.cycle.split("->") if part.strip()]
if len(currencies) < 4 or currencies[0] != currencies[-1]:
raise ValueError(
"cycle must be a closed triangular path like A->B->C->A")
raise ValueError("cycle must be a closed triangular path like A->B->C->A")
if len(currencies) != 4:
raise ValueError(
"cycle must contain exactly three unique currencies")
raise ValueError("cycle must contain exactly three unique currencies")
legs: list[ExecutionLeg] = []
for idx in range(3):
@@ -114,8 +109,7 @@ class TriangularExecutionSequencer:
)
volume = self._volume_for_leg(event, placeholder_leg, idx)
if volume <= 0.0:
raise ValueError(
"volume_for_leg must return a positive volume")
raise ValueError("volume_for_leg must return a positive volume")
legs.append(self._resolve_leg(from_currency, to_currency, volume))
return tuple(legs)
@@ -177,8 +171,7 @@ class TriangularExecutionSequencer:
responses.append(response)
if self._execution_writer is not None:
order_ref = self._order_ref_from_response(
response, f"leg-{idx}")
order_ref = self._order_ref_from_response(response, f"leg-{idx}")
await self._execution_writer.enqueue(
OrderRecord(
trade_ref=trade_ref,
+109
View File
@@ -0,0 +1,109 @@
from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime
from statistics import fmean
from arbitrade.storage.db import DuckDBStore
@dataclass(frozen=True, slots=True)
class PerformanceMetrics:
realized_pnl_usd: float
win_rate: float | None
avg_trade_duration_seconds: float | None
opportunities_per_minute: float | None
fill_rate: float | None
latency_p50_seconds: float | None
latency_p95_seconds: float | None
latency_p99_seconds: float | None
class MetricsCalculator:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
@staticmethod
def _percentile(values: Iterable[float], percentile: float) -> float | None:
samples = sorted(values)
if not samples:
return None
if percentile <= 0.0:
return samples[0]
if percentile >= 100.0:
return samples[-1]
rank = (len(samples) - 1) * (percentile / 100.0)
lower_index = int(rank)
upper_index = min(lower_index + 1, len(samples) - 1)
weight = rank - lower_index
return samples[lower_index] * (1.0 - weight) + samples[upper_index] * weight
def compute(self) -> PerformanceMetrics:
with self._store.connect() as conn:
trade_rows = conn.execute("""
SELECT started_at, finished_at, realized_pnl
FROM trades
WHERE finished_at IS NOT NULL
""").fetchall()
opportunity_rows = conn.execute("""
SELECT detected_at
FROM opportunities
""").fetchall()
order_rows = conn.execute("""
SELECT volume, filled_volume
FROM orders
WHERE volume > 0
""").fetchall()
realized_values = [float(row[2]) for row in trade_rows if row[2] is not None]
realized_pnl_usd = sum(realized_values)
total_trades = len(trade_rows)
winning_trades = sum(1 for row in trade_rows if row[2] is not None and float(row[2]) > 0.0)
win_rate = winning_trades / total_trades if total_trades > 0 else None
durations_seconds = [
(row[1] - row[0]).total_seconds()
for row in trade_rows
if isinstance(row[0], datetime) and isinstance(row[1], datetime)
]
avg_trade_duration_seconds = fmean(durations_seconds) if durations_seconds else None
opportunity_times = [row[0] for row in opportunity_rows if isinstance(row[0], datetime)]
opportunities_per_minute: float | None
if len(opportunity_times) >= 2:
span_seconds = (max(opportunity_times) - min(opportunity_times)).total_seconds()
opportunities_per_minute = (
len(opportunity_times) / (span_seconds / 60.0)
if span_seconds > 0.0
else float(len(opportunity_times))
)
elif len(opportunity_times) == 1:
opportunities_per_minute = 60.0
else:
opportunities_per_minute = None
fill_samples = [
float(filled) / float(volume)
for volume, filled in order_rows
if filled is not None and float(volume) > 0.0
]
fill_rate = fmean(fill_samples) if fill_samples else None
latency_p50_seconds = self._percentile(durations_seconds, 50.0)
latency_p95_seconds = self._percentile(durations_seconds, 95.0)
latency_p99_seconds = self._percentile(durations_seconds, 99.0)
return PerformanceMetrics(
realized_pnl_usd=realized_pnl_usd,
win_rate=win_rate,
avg_trade_duration_seconds=avg_trade_duration_seconds,
opportunities_per_minute=opportunities_per_minute,
fill_rate=fill_rate,
latency_p50_seconds=latency_p50_seconds,
latency_p95_seconds=latency_p95_seconds,
latency_p99_seconds=latency_p99_seconds,
)
+112
View File
@@ -0,0 +1,112 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import httpx
from arbitrade.api.app import create_app
from arbitrade.config.settings import Settings
def _seed_metrics_data(app) -> None:
store = app.state.store
started = datetime.now(UTC)
finished = started + timedelta(seconds=20)
with store.connect() as conn:
conn.execute(
"""
INSERT INTO trades (
trade_ref,
started_at,
finished_at,
status,
realized_pnl,
estimated_pnl,
capital_used,
cycle,
leg_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
"trade-1",
started,
finished,
"filled",
15.0,
10.0,
100.0,
"USD->BTC->ETH->USD",
3,
],
)
conn.execute(
"""
INSERT INTO opportunities (
detected_at,
cycle,
gross_pct,
net_pct,
est_profit,
executed
) VALUES (?, ?, ?, ?, ?, ?)
""",
[started, "USD->BTC->ETH->USD", 4.0, 3.0, 0.03, True],
)
conn.execute(
"""
INSERT INTO orders (
trade_ref,
order_ref,
leg_index,
pair,
side,
volume,
user_ref,
status,
filled_volume,
avg_price,
raw_response,
recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
"trade-1",
"order-1",
0,
"BTC/USD",
"buy",
2.0,
100,
"closed",
2.0,
100.0,
"{}",
started,
],
)
async def test_dashboard_page_and_fragment_and_sse(tmp_path) -> None:
app = create_app(
Settings(_env_file=None, DUCKDB_PATH=tmp_path / "dash.duckdb"))
_seed_metrics_data(app)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
page = await client.get("/")
fragment = await client.get("/dashboard/fragment/metrics")
stream = await client.get("/dashboard/stream/metrics")
assert page.status_code == 200
assert "EventSource" in page.text
assert 'hx-get="/dashboard/fragment/metrics"' in page.text
assert fragment.status_code == 200
assert "Realized P&amp;L" in fragment.text
assert "15.00 USD" in fragment.text
assert "100.0%" in fragment.text
assert stream.status_code == 200
assert stream.headers["content-type"].startswith("text/event-stream")
assert "event: metrics" in stream.text
assert "Realized P&amp;L" in stream.text
+1 -2
View File
@@ -69,8 +69,7 @@ async def test_execution_writer_persists_trade_order_and_pnl(tmp_path) -> None:
"SELECT trade_ref, order_ref, leg_index, pair, side, volume, status "
"FROM orders ORDER BY leg_index"
).fetchall()
pnls = conn.execute(
"SELECT trade_ref, kind, pnl_usd, source FROM pnl_events").fetchall()
pnls = conn.execute("SELECT trade_ref, kind, pnl_usd, source FROM pnl_events").fetchall()
assert len(trades) == 1
assert trades[0][1] == "filled"
+144
View File
@@ -0,0 +1,144 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import pytest
from arbitrade.config.settings import Settings
from arbitrade.metrics import MetricsCalculator
from arbitrade.storage.db import DuckDBStore
def test_metrics_calculator_summarizes_execution_data(tmp_path) -> None:
settings = Settings(_env_file=None, DUCKDB_PATH=tmp_path / "metrics.duckdb")
store = DuckDBStore(settings)
store.migrate()
started = datetime.now(UTC)
finished = started + timedelta(seconds=30)
started_two = started + timedelta(minutes=1)
finished_two = started_two + timedelta(seconds=90)
with store.connect() as conn:
conn.execute(
"""
INSERT INTO trades (
trade_ref,
started_at,
finished_at,
status,
realized_pnl,
estimated_pnl,
capital_used,
cycle,
leg_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
"trade-1",
started,
finished,
"filled",
12.5,
10.0,
100.0,
"USD->BTC->ETH->USD",
3,
"trade-2",
started_two,
finished_two,
"filled",
-4.5,
-2.0,
200.0,
"USD->ETH->BTC->USD",
3,
],
)
conn.execute(
"""
INSERT INTO opportunities (
detected_at,
cycle,
gross_pct,
net_pct,
est_profit,
executed
) VALUES (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?)
""",
[
started,
"USD->BTC->ETH->USD",
4.0,
3.0,
0.03,
True,
started_two,
"USD->ETH->BTC->USD",
2.0,
1.0,
0.01,
False,
started_two + timedelta(seconds=30),
"USD->BTC->ETH->USD",
5.0,
4.0,
0.04,
True,
],
)
conn.execute(
"""
INSERT INTO orders (
trade_ref,
order_ref,
leg_index,
pair,
side,
volume,
user_ref,
status,
filled_volume,
avg_price,
raw_response,
recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
"trade-1",
"order-1",
0,
"BTC/USD",
"buy",
2.0,
101,
"closed",
2.0,
100.0,
"{}",
started,
"trade-2",
"order-2",
0,
"ETH/USD",
"sell",
4.0,
202,
"closed",
3.0,
200.0,
"{}",
started_two,
],
)
metrics = MetricsCalculator(store).compute()
assert metrics.realized_pnl_usd == 8.0
assert metrics.win_rate == 0.5
assert metrics.avg_trade_duration_seconds == 60.0
assert metrics.opportunities_per_minute == 2.0
assert metrics.fill_rate == 0.875
assert metrics.latency_p50_seconds == 60.0
assert metrics.latency_p95_seconds == 87.0
assert metrics.latency_p99_seconds == pytest.approx(89.4)
+127
View File
@@ -0,0 +1,127 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>{{ title }}</title>
<script src="https://unpkg.com/htmx.org@1.9.12"></script>
<style>
body {
margin: 0;
font-family: Arial, sans-serif;
background: #0b1220;
color: #e5eefb;
}
.shell {
max-width: 1120px;
margin: 0 auto;
padding: 32px 20px 48px;
}
.hero {
display: flex;
justify-content: space-between;
gap: 24px;
align-items: end;
margin-bottom: 24px;
}
.title {
font-size: 2rem;
margin: 0 0 8px;
}
.subtitle {
margin: 0;
color: #9fb2d0;
}
.panel {
background: rgba(255, 255, 255, 0.04);
border: 1px solid rgba(255, 255, 255, 0.08);
border-radius: 16px;
padding: 20px;
}
.grid {
display: grid;
gap: 16px;
grid-template-columns: repeat(auto-fit, minmax(220px, 1fr));
}
.card {
background: rgba(255, 255, 255, 0.04);
border-radius: 14px;
padding: 16px;
border: 1px solid rgba(255, 255, 255, 0.08);
}
.label {
color: #9fb2d0;
font-size: 0.85rem;
margin-bottom: 8px;
}
.value {
font-size: 1.4rem;
font-weight: 700;
}
.meta {
margin-top: 18px;
color: #7f95b7;
font-size: 0.85rem;
}
.toolbar {
display: flex;
gap: 12px;
flex-wrap: wrap;
}
.button {
display: inline-flex;
align-items: center;
padding: 10px 14px;
border-radius: 999px;
background: #2d6cdf;
color: white;
text-decoration: none;
}
.button.secondary {
background: transparent;
border: 1px solid rgba(255, 255, 255, 0.14);
}
</style>
</head>
<body>
<main class="shell">
<section class="hero">
<div>
<h1 class="title">Arbitrade Dashboard</h1>
<p class="subtitle">Live execution, P&amp;L, and system state.</p>
</div>
<div class="toolbar">
<a
class="button"
href="{{ metrics_endpoint }}"
hx-get="{{ metrics_endpoint }}"
hx-target="#metrics-panel"
hx-swap="outerHTML"
>Refresh metrics</a
>
<a class="button secondary" href="/health">Health</a>
</div>
</section>
<section
id="metrics-shell"
hx-get="{{ metrics_endpoint }}"
hx-target="this"
hx-trigger="load, every 15s"
hx-swap="outerHTML"
>
{% include "partials/metrics.html" %}
</section>
<script>
const stream = new EventSource("{{ stream_endpoint }}");
stream.addEventListener("metrics", (event) => {
const panel = document.getElementById("metrics-panel");
if (panel) {
panel.outerHTML = JSON.parse(event.data);
}
});
</script>
</main>
</body>
</html>
+31
View File
@@ -0,0 +1,31 @@
<div id="metrics-panel" class="panel">
<div class="grid">
<article class="card">
<div class="label">Realized P&amp;L</div>
<div class="value">{{ realized_pnl }}</div>
</article>
<article class="card">
<div class="label">Win Rate</div>
<div class="value">{{ win_rate }}</div>
</article>
<article class="card">
<div class="label">Avg Trade Duration</div>
<div class="value">{{ avg_trade_duration }}</div>
</article>
<article class="card">
<div class="label">Opportunities / Min</div>
<div class="value">{{ opportunities_per_minute }}</div>
</article>
<article class="card">
<div class="label">Fill Rate</div>
<div class="value">{{ fill_rate }}</div>
</article>
<article class="card">
<div class="label">Latency p50 / p95 / p99</div>
<div class="value">
{{ latency_p50 }} | {{ latency_p95 }} | {{ latency_p99 }}
</div>
</article>
</div>
<div class="meta">Updated {{ generated_at }}</div>
</div>