feat: Implement pairing synchronization from Kraken and enhance market data feed

- Added `sync_pairings_from_kraken` function to fetch and upsert asset pairs into the config_pairings table.
- Introduced `run_pairing_sync_loop` for periodic synchronization of pairings.
- Enhanced `KrakenWsClient` to manage subscribed symbols for market data feeds.
- Created `build_detector_from_enabled_pairings` to initialize cycle detection based on enabled pairings.
- Updated FastAPI app to start market data feed and pairing synchronization tasks.
- Added new API routes for managing pairings, including listing, toggling, and syncing from Kraken.
- Improved dashboard templates to display pairing options and allow user interaction for backtesting.
- Refactored database queries to streamline fetching and updating of pairing data.
This commit is contained in:
2026-06-04 22:10:06 +02:00
parent 92b0b49535
commit 4c59a0e4cb
16 changed files with 733 additions and 198 deletions
+20
View File
@@ -0,0 +1,20 @@
# EditorConfig is awesome: https://editorconfig.org
# top-most EditorConfig file
root = true
# Unix-style newlines with a newline ending every file
[*]
end_of_line = lf
insert_final_newline = true
# Matches multiple files with brace expansion notation
# Set default charset
[*.{js,py}]
charset = utf-8
# 4 space indentation
[*.py]
indent_style = space
indent_size = 4
max_line_length = 120
+16 -20
View File
@@ -19,12 +19,10 @@ def _resolve_fee_rate(fee_rate: float | None, db_path: str | None = None) -> flo
if db_path is not None:
try:
conn = duckdb.connect(db_path)
row = conn.execute(
"""
row = conn.execute("""
SELECT maker_fee FROM kraken_account_snapshots
ORDER BY snapshot_at DESC LIMIT 1
"""
).fetchone()
""").fetchone()
conn.close()
if row is not None and row[0] is not None:
return float(row[0])
@@ -53,14 +51,13 @@ def _parse_balances(raw: str) -> Mapping[str, float]:
def main() -> int:
parser = argparse.ArgumentParser(description="Run a deterministic replay backtest.")
parser = argparse.ArgumentParser(description="Run backtest.")
parser.add_argument("--events", type=Path, required=True)
parser.add_argument("--starting-balances", type=str, default="USD=1000.0")
parser.add_argument("--trade-capital", type=float, default=100.0)
parser.add_argument("--fee-rate", type=float, default=None)
parser.add_argument("--slippage-bps", type=float, default=4.0)
parser.add_argument("--execution-latency-ms", type=float, default=20.0)
parser.add_argument("--db-path", type=str, default=None, help="DuckDB path for fee lookup")
args = parser.parse_args()
cycles_by_pair, available_pairs = _build_graph()
@@ -79,24 +76,23 @@ def main() -> int:
config=config,
started_at=events[0].occurred_at if events else datetime.now(UTC),
)
report = asyncio.run(
engine.run(events, starting_balances=_parse_balances(args.starting_balances))
)
starting_balances = _parse_balances(args.starting_balances)
r = asyncio.run(engine.run(events, starting_balances=starting_balances))
print("Backtest report:")
print(f"- processed_events: {report.processed_events}")
print(f"- opportunities_seen: {report.opportunities_seen}")
print(f"- trades_executed: {report.trades_executed}")
print(f"- win_rate: {report.win_rate if report.win_rate is not None else 'n/a'}")
print(f"- fill_rate: {report.fill_rate if report.fill_rate is not None else 'n/a'}")
print(f"- realized_pnl_usd: {report.realized_pnl_usd:.4f}")
print(f"- max_drawdown_usd: {report.max_drawdown_usd:.4f}")
print(f"- miss_reasons: {dict(report.miss_reasons)}")
print(f"- processed_events: {r.processed_events}")
print(f"- opportunities_seen: {r.opportunities_seen}")
print(f"- trades_executed: {r.trades_executed}")
print(f"- win_rate: {r.win_rate if r.win_rate is not None else 'n/a'}")
print(f"- fill_rate: {r.fill_rate if r.fill_rate is not None else 'n/a'}")
print(f"- realized_pnl_usd: {r.realized_pnl_usd:.4f}")
print(f"- max_drawdown_usd: {r.max_drawdown_usd:.4f}")
print(f"- miss_reasons: {dict(r.miss_reasons)}")
print(
"- execution_latency_ms: "
f"p50={report.execution_latency_p50_ms or 0.0:.4f}, "
f"p95={report.execution_latency_p95_ms or 0.0:.4f}, "
f"p99={report.execution_latency_p99_ms or 0.0:.4f}"
f"p50={r.execution_latency_p50_ms or 0.0:.4f}, "
f"p95={r.execution_latency_p95_ms or 0.0:.4f}, "
f"p99={r.execution_latency_p99_ms or 0.0:.4f}"
)
return 0
+7 -8
View File
@@ -13,14 +13,13 @@ from arbitrade.storage.db import DuckDBStore
def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float | None]:
with store.connect() as conn:
trade_rows = conn.execute(
"""
trade_rows = conn.execute("""
SELECT started_at, finished_at, realized_pnl
FROM trades
WHERE finished_at IS NOT NULL
"""
).fetchall()
opportunity_rows = conn.execute("SELECT detected_at FROM opportunities").fetchall()
""").fetchall()
sql_d = "SELECT detected_at FROM opportunities"
orows = conn.execute(sql_d).fetchall()
realized = sum(float(row[2]) for row in trade_rows if row[2] is not None)
durations = [
@@ -30,10 +29,10 @@ def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float
]
avg_duration = fmean(durations) if durations else None
times = [row[0] for row in opportunity_rows if isinstance(row[0], datetime)]
times = [row[0] for row in orows if isinstance(row[0], datetime)]
if len(times) >= 2:
span_seconds = (max(times) - min(times)).total_seconds()
opm = len(times) / (span_seconds / 60.0) if span_seconds > 0.0 else float(len(times))
ss = (max(times) - min(times)).total_seconds()
opm = len(times) / (ss / 60.0) if ss > 0.0 else float(len(times))
elif len(times) == 1:
opm = 60.0
else:
+100 -1
View File
@@ -4,22 +4,88 @@ import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import structlog
from fastapi import FastAPI
from arbitrade.alerting.notifier import build_notifier_from_settings
from arbitrade.api.control_state import DashboardControlState
from arbitrade.api.routes import public_router, router
from arbitrade.backtesting.runner import backtest_worker
from arbitrade.config.pairing_sync import run_pairing_sync_loop
from arbitrade.config.service import ConfigurationService
from arbitrade.config.settings import Settings
from arbitrade.exchange.fee_service import run_fee_sync_loop
from arbitrade.exchange.kraken_rest import KrakenRestClient
from arbitrade.exchange.kraken_ws import KrakenWsClient
from arbitrade.logging_setup import configure_logging
from arbitrade.market_data.feed import MarketDataFeed
from arbitrade.market_data.feed_builder import (
build_detector_from_enabled_pairings,
get_enabled_pair_symbols,
)
from arbitrade.metrics import MetricsCalculator
from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter
from arbitrade.storage.opportunities import AsyncOpportunityWriter
from arbitrade.storage.repositories import AuditRepository, RuntimeStateRepository
_LOG = structlog.get_logger(__name__)
def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task[None] | None:
"""Create and start a MarketDataFeed task from enabled pairings.
If kill_switch_only=True, only create a kill-switch-bound stub (no detector/feed).
Returns the task or None if no enabled pairings.
"""
settings = app.state.settings
db = app.state.store
alert_notifier = getattr(app.state, "alert_notifier", None)
controls = app.state.dashboard_controls
# Build detector from enabled pairings
detector = build_detector_from_enabled_pairings(
db,
fee_rate=0.0, # will be overridden by fee sync
max_depth_levels=controls.strategy_max_depth_levels,
min_profit_threshold=controls.strategy_profit_threshold,
)
symbols = get_enabled_pair_symbols(db)
if not symbols and not kill_switch_only:
_LOG.warning("no_enabled_pair_symbols_feed_not_started")
return None
ws_client: KrakenWsClient = getattr(app.state, "ws_client", None)
if ws_client is None:
ws_client = KrakenWsClient(settings, alert_notifier=alert_notifier)
app.state.ws_client = ws_client
ws_client.set_subscribed_symbols(symbols)
snapshot_writer = AsyncMarketSnapshotWriter(db)
opportunity_writer = AsyncOpportunityWriter(db)
feed = MarketDataFeed(
ws_client=ws_client,
snapshot_writer=snapshot_writer,
detector=detector,
opportunity_writer=opportunity_writer,
paper_trading_mode=settings.paper_trading_mode,
trade_capital=settings.trade_capital_usd,
max_trade_capital=settings.max_trade_capital_usd,
kill_switch=controls.kill_switch,
alert_notifier=alert_notifier,
audit_repository=getattr(app.state, "audit_repository", None),
)
app.state.feed = feed
task = asyncio.create_task(feed.run(), name="market_data_feed")
app.state.feed_task = task
_LOG.info("market_data_feed_started", symbols=symbols)
return task
def create_app(settings: Settings) -> FastAPI:
configure_logging(settings.log_level, settings.log_json)
@@ -28,6 +94,7 @@ def create_app(settings: Settings) -> FastAPI:
db.migrate()
kraken_client = KrakenRestClient(settings)
fee_sync_stop_event = asyncio.Event()
pairing_sync_stop_event = asyncio.Event()
backtest_queue: asyncio.Queue[tuple[str, str, dict[str, object] | None] | None] = (
asyncio.Queue()
)
@@ -43,19 +110,49 @@ def create_app(settings: Settings) -> FastAPI:
),
name="fee_sync_loop",
)
pairing_sync_task = asyncio.create_task(
run_pairing_sync_loop(
kraken_client,
db,
pairing_sync_stop_event,
),
name="pairing_sync_loop",
)
backtest_task = asyncio.create_task(
backtest_worker(backtest_queue, db), # type: ignore
name="backtest_worker",
)
# Start market data feed from enabled pairings
_start_feed(app)
app.state.fee_sync_task = fee_sync_task
app.state.pairing_sync_task = pairing_sync_task
app.state.backtest_task = backtest_task
yield
fee_sync_stop_event.set()
pairing_sync_stop_event.set()
# Stop feed
feed = getattr(app.state, "feed", None)
if feed is not None:
ws_client = getattr(app.state, "ws_client", None)
if ws_client is not None:
await ws_client.stop()
ft = getattr(app.state, "feed_task", None)
if ft is not None:
ft.cancel()
try:
await ft
except asyncio.CancelledError:
pass
fee_sync_task.cancel()
try:
await fee_sync_task
except asyncio.CancelledError:
pass
pairing_sync_task.cancel()
try:
await pairing_sync_task
except asyncio.CancelledError:
pass
await backtest_queue.put(None) # poison pill
backtest_task.cancel()
try:
@@ -70,12 +167,14 @@ def create_app(settings: Settings) -> FastAPI:
app.state.store = db
app.state.kraken_client = kraken_client
app.state.fee_sync_stop_event = fee_sync_stop_event
app.state.pairing_sync_stop_event = pairing_sync_stop_event
app.state.backtest_queue = backtest_queue
app.state.metrics = MetricsCalculator(db)
app.state.audit_repository = AuditRepository(db)
app.state.runtime_state_repository = RuntimeStateRepository(db)
app.state.alert_notifier = build_notifier_from_settings(settings)
app.state.configuration_service = ConfigurationService(settings, db, AuditRepository(db))
svc = ConfigurationService(settings, db, app.state.audit_repository)
app.state.configuration_service = svc
app.state.backtest_recent_reports = []
app.state.dashboard_controls = DashboardControlState(
is_running=not settings.kill_switch_active,
+204 -29
View File
@@ -18,11 +18,14 @@ from fastapi.templating import Jinja2Templates
from arbitrade.alerting.notifier import SupportsAlerts, SupportsAlertStatus
from arbitrade.api.auth import require_dashboard_auth
from arbitrade.api.control_state import DashboardControlState
from arbitrade.config.pairing_sync import sync_pairings_from_kraken
from arbitrade.config.service import ConfigPairing
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
from arbitrade.storage.repositories import (
AuditRecord,
AuditRepository,
BacktestJobRepository,
ConfigPairingRepository,
KrakenAccountSnapshotRepository,
)
@@ -104,37 +107,29 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
else:
open_trade_filter = "LOWER(status) NOT IN ('filled', 'closed', 'cancelled', 'canceled')"
portfolio_row = conn.execute(
"""
portfolio_row = conn.execute("""
SELECT balances, total_value_usd
FROM portfolio_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
"""
).fetchone()
open_trades = conn.execute(
f"""
""").fetchone()
open_trades = conn.execute(f"""
SELECT {trade_ref_expr}, status, started_at, {cycle_expr}
FROM trades
WHERE {open_trade_filter}
ORDER BY started_at DESC
LIMIT 5
"""
).fetchall()
rpnl = conn.execute(
"""
""").fetchall()
rpnl = conn.execute("""
SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0)
FROM trades
"""
).fetchone()
latest_opportunities = conn.execute(
"""
""").fetchone()
latest_opportunities = conn.execute("""
SELECT cycle, net_pct, est_profit, detected_at
FROM opportunities
ORDER BY detected_at DESC
LIMIT 5
"""
).fetchall()
""").fetchall()
balances_value = ""
total_value = ""
@@ -164,14 +159,12 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
# Query equity from kraken_account_snapshots
try:
equity_row = conn.execute(
"""
equity_row = conn.execute("""
SELECT trade_balance_raw
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
"""
).fetchone()
""").fetchone()
if equity_row is not None and equity_row[0] is not None:
tb_raw = equity_row[0]
if isinstance(tb_raw, str):
@@ -207,14 +200,12 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
taker_fee = ""
thirty_day_volume = ""
try:
acct_row = conn.execute(
"""
acct_row = conn.execute("""
SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
"""
).fetchone()
""").fetchone()
if acct_row is not None:
fee_tier = str(acct_row[0]) if acct_row[0] is not None else ""
maker_fee = f"{float(acct_row[1]):.4%}" if acct_row[1] is not None else ""
@@ -244,14 +235,12 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
def _dashboard_charts(request: Request) -> dict[str, object]:
store = request.app.state.store
with store.connect() as conn:
opportunity_rows = conn.execute(
"""
opportunity_rows = conn.execute("""
SELECT detected_at, cycle, net_pct, est_profit
FROM opportunities
ORDER BY detected_at DESC
LIMIT 10
"""
).fetchall()
""").fetchall()
cr = list(reversed(opportunity_rows))
labels = []
@@ -588,7 +577,16 @@ def _dashboard_controls(request: Request) -> dict[str, object]:
def _parse_form_body(body: bytes) -> dict[str, str]:
parsed = parse_qs(body.decode("utf-8"), keep_blank_values=True)
return {key: values[-1] for key, values in parsed.items() if values}
result: dict[str, str] = {}
for key, values in parsed.items():
if not values:
continue
if len(values) == 1:
result[key] = values[0]
else:
# Multi-value fields (e.g. checkboxes) -> join with comma
result[key] = ",".join(values)
return result
def _form_bool(value: str | None) -> bool:
@@ -823,6 +821,20 @@ async def dashboard_backtesting_fragment(request: Request) -> HTMLResponse:
)
@router.get("/dashboard/fragment/backtesting-pairings", response_class=HTMLResponse)
async def dashboard_backtesting_pairings_fragment(request: Request) -> HTMLResponse:
"""HTMX fragment: pairing checkboxes for backtest form."""
store = request.app.state.store
repo = ConfigPairingRepository(store)
pairings = repo.list_pairings()
pairings.sort(key=lambda p: (p.base_asset, p.quote_asset))
return templates.TemplateResponse(
request=request,
name="partials/backtesting_pairings.html",
context={"request": request, "pairings": pairings},
)
@router.get("/dashboard/fragment/metrics", response_class=HTMLResponse)
async def dashboard_metrics(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
@@ -1282,3 +1294,166 @@ async def dashboard_overview_stream(request: Request) -> StreamingResponse:
@public_router.get("/health", response_class=JSONResponse)
async def health() -> JSONResponse:
return JSONResponse({"status": "ok", "service": "arbitrade"})
# ── Pairing API ─────────────────────────────────────────────────────────────
def _pairing_repo(request: Request) -> ConfigPairingRepository:
return ConfigPairingRepository(request.app.state.store)
@router.get("/dashboard/api/pairings", response_class=JSONResponse)
async def dashboard_api_pairings(
request: Request,
search: str | None = None,
enabled: str | None = None,
source: str | None = None,
base: str | None = None,
quote: str | None = None,
sort: str = "base_asset",
order: str = "asc",
) -> JSONResponse:
"""List pairings with optional filters."""
repo = _pairing_repo(request)
pairings = repo.list_pairings()
# Apply filters
if search:
search_lower = search.lower()
pairings = [
p
for p in pairings
if search_lower in p.base_asset.lower() or search_lower in p.quote_asset.lower()
]
if enabled is not None:
enabled_bool = enabled.lower() == "true"
pairings = [p for p in pairings if p.enabled == enabled_bool]
if source:
pairings = [p for p in pairings if p.source.lower() == source.lower()]
if base:
pairings = [p for p in pairings if p.base_asset.lower() == base.lower()]
if quote:
pairings = [p for p in pairings if p.quote_asset.lower() == quote.lower()]
# Sort
reverse = order.lower() == "desc"
if sort == "base_asset":
pairings.sort(key=lambda p: p.base_asset, reverse=reverse)
elif sort == "quote_asset":
pairings.sort(key=lambda p: p.quote_asset, reverse=reverse)
elif sort == "enabled":
pairings.sort(key=lambda p: p.enabled, reverse=reverse)
return JSONResponse(
[
{
"id": p.id,
"base_asset": p.base_asset,
"quote_asset": p.quote_asset,
"pair": f"{p.base_asset}/{p.quote_asset}",
"enabled": p.enabled,
"source": p.source,
"created_at": p.created_at.isoformat() if p.created_at else None,
"updated_at": p.updated_at.isoformat() if p.updated_at else None,
}
for p in pairings
]
)
@router.get("/dashboard/fragment/pairings", response_class=HTMLResponse)
async def dashboard_pairings_fragment(
request: Request,
search: str | None = None,
enabled: str | None = None,
) -> HTMLResponse:
"""HTMX fragment: pairing table for config page."""
repo = _pairing_repo(request)
pairings = repo.list_pairings()
# Apply search filter
if search:
sl = search.lower()
pairings = [
p for p in pairings if sl in p.base_asset.lower() or sl in p.quote_asset.lower()
]
if enabled is not None and enabled.lower() != "all":
eb = enabled.lower() == "true"
pairings = [p for p in pairings if p.enabled == eb]
pairings.sort(key=lambda p: (p.base_asset, p.quote_asset))
return templates.TemplateResponse(
request=request,
name="partials/pairings_table.html",
context={"request": request, "pairings": pairings},
)
@router.post("/dashboard/api/pairings/toggle")
async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse:
"""Toggle enabled/disabled for a pairing. Expects JSON or form body with base_asset and quote_asset."""
ctype = request.headers.get("content-type", "")
if "application/json" in ctype:
body = await request.json()
else:
form = _parse_form_body(await request.body())
body = form
base_asset = str(body.get("base_asset", "")).upper()
quote_asset = str(body.get("quote_asset", "")).upper()
if not base_asset or not quote_asset:
return HTMLResponse("Missing base_asset or quote_asset", status_code=400)
repo = _pairing_repo(request)
existing = repo.get_pairing(base_asset, quote_asset)
if existing is None:
return HTMLResponse("Pairing not found", status_code=404)
toggled = ConfigPairing(
base_asset=existing.base_asset,
quote_asset=existing.quote_asset,
enabled=not existing.enabled,
source=existing.source,
)
repo.update_pairing(base_asset, quote_asset, toggled)
_record_audit(
request,
actor="dashboard_user",
event_type="dashboard.pairings.toggle",
decision="approved",
payload={
"base_asset": base_asset,
"quote_asset": quote_asset,
"enabled": toggled.enabled,
},
)
# Return refreshed fragment
pairings_repo = _pairing_repo(request)
pairings = pairings_repo.list_pairings()
pairings.sort(key=lambda p: (p.base_asset, p.quote_asset))
return templates.TemplateResponse(
request=request,
name="partials/pairings_table.html",
context={"request": request, "pairings": pairings},
)
@router.post("/dashboard/api/pairings/sync")
async def dashboard_api_pairings_sync(request: Request) -> JSONResponse:
"""Trigger a re-sync of pairings from Kraken."""
kraken_client = request.app.state.kraken_client
store = request.app.state.store
summary = await sync_pairings_from_kraken(kraken_client, store)
_record_audit(
request,
actor="dashboard_user",
event_type="dashboard.pairings.sync",
decision="approved",
payload=summary, # type: ignore
)
return JSONResponse(summary)
+79
View File
@@ -0,0 +1,79 @@
"""Sync available Kraken asset pairs into the config_pairings table."""
from __future__ import annotations
import asyncio
import structlog
from arbitrade.config.service import ConfigPairing
from arbitrade.detection.graph import CurrencyGraph
from arbitrade.exchange.kraken_rest import KrakenRestClient
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.repositories import ConfigPairingRepository
_LOG = structlog.get_logger(__name__)
async def sync_pairings_from_kraken(
kraken_client: KrakenRestClient,
store: DuckDBStore,
) -> dict[str, int]:
"""Fetch all asset pairs from Kraken and upsert into config_pairings.
Returns a summary dict with 'added', 'updated', 'total' counts.
"""
asset_pairs = await kraken_client.asset_pairs()
graph = CurrencyGraph.from_kraken_asset_pairs(asset_pairs)
repo = ConfigPairingRepository(store)
added = 0
updated = 0
total = 0
# Dedupe: pair_by_direction has entries for both (base,quote) and (quote,base).
seen_symbols: set[str] = set()
for (base, quote), symbol in graph.pair_by_direction.items():
if symbol in seen_symbols:
continue
seen_symbols.add(symbol)
existing = repo.get_pairing(base, quote)
pairing = ConfigPairing(
base_asset=base,
quote_asset=quote,
enabled=existing.enabled if existing else False,
source="kraken",
)
try:
repo.upsert_pairing(pairing)
total += 1
if existing:
updated += 1
else:
added += 1
except Exception:
_LOG.warning("sync_pairing_failed", base=base, quote=quote)
_LOG.info(
"pairing_sync_complete",
added=added,
updated=updated,
total=total,
)
return {"added": added, "updated": updated, "total": total}
async def run_pairing_sync_loop(
kraken_client: KrakenRestClient,
store: DuckDBStore,
stop_event: asyncio.Event,
interval_seconds: int = 86400,
) -> None:
"""Periodically sync pairings from Kraken (default daily)."""
await sync_pairings_from_kraken(kraken_client, store)
try:
while not stop_event.is_set():
await asyncio.wait_for(stop_event.wait(), timeout=interval_seconds)
await sync_pairings_from_kraken(kraken_client, store)
except (TimeoutError, asyncio.CancelledError):
pass
+35 -2
View File
@@ -32,6 +32,7 @@ class KrakenWsClient:
self._alert_notifier = alert_notifier
self._has_connected_once = False
self._was_disconnected = False
self._subscribed_symbols: list[str] = []
@property
def is_stale(self) -> bool:
@@ -44,6 +45,35 @@ class KrakenWsClient:
async def stop(self) -> None:
self._stop.set()
def set_subscribed_symbols(self, symbols: list[str]) -> None:
"""Set the list of symbols to subscribe to on (re)connect."""
self._subscribed_symbols = list(symbols)
async def _subscribe(self, ws: Any) -> None:
"""Send Kraken WS v2 subscribe message for book channel."""
if not self._subscribed_symbols:
_LOG.warning("kraken_ws_no_symbols_to_subscribe")
return
depth = 10
if hasattr(self._settings, "kraken_ws_book_depth"):
depth = self._settings.kraken_ws_book_depth
msg = orjson.dumps(
{
"method": "subscribe",
"params": {
"channel": "book",
"symbol": self._subscribed_symbols,
"depth": depth,
},
}
)
await ws.send(msg)
_LOG.info(
"kraken_ws_subscribed",
symbol_count=len(self._subscribed_symbols),
symbols=self._subscribed_symbols,
)
async def connect_stream(self) -> AsyncIterator[WsMessage]:
delay = 1.0
while not self._stop.is_set():
@@ -51,7 +81,8 @@ class KrakenWsClient:
async with websockets.connect(
self._settings.kraken_ws_url, max_size=2_000_000
) as ws:
_LOG.info("kraken_ws_connected", url=self._settings.kraken_ws_url)
_LOG.info("kraken_ws_connected",
url=self._settings.kraken_ws_url)
if self._has_connected_once and self._was_disconnected:
await self._notify(
category="system",
@@ -63,10 +94,12 @@ class KrakenWsClient:
self._has_connected_once = True
self._was_disconnected = False
delay = 1.0
await self._subscribe(ws)
async for raw in self._recv_loop(ws):
yield raw
except Exception as exc:
_LOG.warning("kraken_ws_disconnected", error=str(exc), reconnect_in=delay)
_LOG.warning("kraken_ws_disconnected",
error=str(exc), reconnect_in=delay)
self._was_disconnected = True
await self._notify(
category="system",
+62
View File
@@ -0,0 +1,62 @@
"""Build production MarketDataFeed components from enabled pairings."""
from __future__ import annotations
import structlog
from arbitrade.detection.engine import IncrementalCycleDetector
from arbitrade.detection.graph import CurrencyGraph
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.repositories import ConfigPairingRepository
_LOG = structlog.get_logger(__name__)
def build_detector_from_enabled_pairings(
store: DuckDBStore,
*,
fee_rate: float = 0.0,
max_depth_levels: int = 10,
min_profit_threshold: float = 0.0005,
) -> IncrementalCycleDetector | None:
"""Build an IncrementalCycleDetector using only enabled pairings from DB.
Returns None if no enabled pairings exist.
"""
repo = ConfigPairingRepository(store)
pairings = repo.list_pairings(enabled_only=True)
if not pairings:
_LOG.warning("no_enabled_pairings_found_detector_not_created")
return None
# Build CurrencyGraph from enabled pairings and discover cycles
graph = CurrencyGraph()
for p in pairings:
symbol = f"{p.base_asset}/{p.quote_asset}"
graph.add_pair(p.base_asset, p.quote_asset, symbol)
cycles = graph.triangular_cycles()
if not cycles:
_LOG.warning("no_triangular_cycles_from_enabled_pairings")
return None
cycles_by_pair = graph.index_cycles_by_pair(cycles)
_LOG.info(
"detector_built_from_enabled_pairings",
enabled_count=len(pairings),
cycle_count=len(cycles),
)
return IncrementalCycleDetector(
cycles_by_pair,
fee_rate=fee_rate,
max_depth_levels=max_depth_levels,
min_profit_threshold=min_profit_threshold,
)
def get_enabled_pair_symbols(store: DuckDBStore) -> list[str]:
"""Return list of enabled pair symbols (e.g. ['BTC/USD', 'ETH/BTC'])."""
repo = ConfigPairingRepository(store)
pairings = repo.list_pairings(enabled_only=True)
return [f"{p.base_asset}/{p.quote_asset}" for p in pairings if p.enabled]
+6 -12
View File
@@ -24,8 +24,7 @@ class MetricsCalculator:
def compute(self) -> PerformanceMetrics:
with self._store.connect() as conn:
tm = conn.execute(
"""
tm = conn.execute("""
SELECT
COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) AS realized_pnl_usd,
COUNT(*) AS total_trades,
@@ -45,26 +44,21 @@ class MetricsCalculator:
) AS latency_p99_seconds
FROM trades
WHERE finished_at IS NOT NULL
"""
).fetchone()
""").fetchone()
om = conn.execute(
"""
om = conn.execute("""
SELECT
COUNT(*) AS opportunity_count,
MIN(detected_at) AS first_detected_at,
MAX(detected_at) AS last_detected_at
FROM opportunities
"""
).fetchone()
""").fetchone()
fm = conn.execute(
"""
fm = conn.execute("""
SELECT AVG(filled_volume / volume) AS fill_rate
FROM orders
WHERE volume > 0 AND filled_volume IS NOT NULL
"""
).fetchone()
""").fetchone()
r_pnl_usd = float(tm[0]) if tm and tm[0] is not None else 0.0
tt = int(tm[1]) if tm and tm[1] is not None else 0
+4 -8
View File
@@ -45,26 +45,22 @@ def _runtime_repository(app: FastAPI) -> RuntimeStateRepository | None:
def _open_trade_count(store: DuckDBStore) -> int:
with store.connect() as conn:
row = conn.execute(
"""
row = conn.execute("""
SELECT COUNT(*)
FROM trades
WHERE finished_at IS NULL
"""
).fetchone()
""").fetchone()
return int(row[0]) if row is not None else 0
def _latest_balances(store: DuckDBStore) -> dict[str, Any] | None:
with store.connect() as conn:
row = conn.execute(
"""
row = conn.execute("""
SELECT balances
FROM portfolio_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
"""
).fetchone()
""").fetchone()
if row is None or row[0] is None:
return None
+2 -77
View File
@@ -219,87 +219,12 @@ class DuckDBStore:
# Ensure schema_migrations table exists and get current version
if not self._table_exists(conn, "schema_migrations"):
conn.execute(
"""
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT current_timestamp
)
"""
)
# Get current schema version
try:
row = conn.execute(
"SELECT version FROM schema_migrations ORDER BY version DESC LIMIT 1"
).fetchone()
current_version = row[0] if row else 0
except Exception:
current_version = 0
# Apply migrations for each version
if current_version < 1:
# Migration v1: Add missing columns to trades table
# Note: DuckDB does not support ADD COLUMN with constraints
conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS trade_ref VARCHAR")
conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS estimated_pnl DOUBLE")
conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS capital_used DOUBLE")
conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS cycle VARCHAR")
conn.execute("ALTER TABLE trades ADD COLUMN IF NOT EXISTS leg_count INTEGER")
conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (1)")
_LOG.info("migration_applied", version=1)
if current_version < 2:
# Migration v2: Ensure config_backtesting_defaults table
# config_backtesting_defaults already created by SCHEMA_SQL
conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (2)")
_LOG.info("migration_applied", version=2)
if current_version < 3:
# Migration v3: Add kraken_account_snapshots table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
snapshot_at TIMESTAMP NOT NULL,
fee_tier VARCHAR,
maker_fee DOUBLE,
taker_fee DOUBLE,
thirty_day_volume DOUBLE,
trade_balance_raw JSON,
fee_schedule_raw JSON
)
"""
)
conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (3)")
_LOG.info("migration_applied", version=3)
if current_version < 4:
# Migration v4: Add fee_source to backtesting defaults
conn.execute(
"ALTER TABLE config_backtesting_defaults"
" ADD COLUMN IF NOT EXISTS fee_source VARCHAR DEFAULT 'api'"
)
conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (4)")
_LOG.info("migration_applied", version=4)
if current_version < 5:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS backtest_jobs (
id UUID DEFAULT uuid(),
status VARCHAR NOT NULL DEFAULT 'pending',
events_path VARCHAR NOT NULL,
config JSON,
report JSON,
error VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
started_at TIMESTAMP,
finished_at TIMESTAMP
)
"""
)
conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (5)")
_LOG.info("migration_applied", version=5)
""")
# Update version to current
conn.execute(
+62 -34
View File
@@ -349,8 +349,7 @@ class RuntimeStateRepository:
def latest(self) -> RuntimeStateRecord | None:
with self._store.connect() as conn:
row = conn.execute(
"""
row = conn.execute("""
SELECT
snapshot_at,
is_running,
@@ -362,8 +361,7 @@ class RuntimeStateRepository:
FROM runtime_state_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
"""
).fetchone()
""").fetchone()
if row is None:
return None
@@ -426,15 +424,14 @@ class ConfigSectionRepository:
def list_sections(self) -> list[ConfigSection]:
"""List all configuration sections."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
cursor = conn.execute("""
SELECT id, name, description, updated_at
FROM config_sections
ORDER BY name
"""
)
""")
return [
ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3])
ConfigSection(id=row[0], name=row[1],
description=row[2], updated_at=row[3])
for row in cursor.fetchall()
]
@@ -561,13 +558,11 @@ class ConfigSettingRepository:
(section,),
)
else:
cursor = conn.execute(
"""
cursor = conn.execute("""
SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
FROM config_settings
ORDER BY key
"""
)
""")
return [
ConfigSetting(
key=row[0],
@@ -585,12 +580,10 @@ class ConfigSettingRepository:
def get_latest_updated_at(self) -> datetime | None:
"""Get the latest updated_at timestamp from config_settings table."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
cursor = conn.execute("""
SELECT MAX(updated_at) as latest_updated_at
FROM config_settings
"""
)
""")
row = cursor.fetchone()
if row and row[0]:
# Convert string timestamp to datetime
@@ -699,16 +692,51 @@ class ConfigPairingRepository:
)
return cursor.rowcount > 0
def list_pairings(self) -> list[ConfigPairing]:
"""List all currency pairings."""
def upsert_pairing(self, pairing: ConfigPairing) -> ConfigPairing:
"""Insert or update a currency pairing (upsert on base_asset, quote_asset)."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
INSERT INTO config_pairings (base_asset, quote_asset, enabled, source)
VALUES (?, ?, ?, ?)
ON CONFLICT(base_asset, quote_asset) DO UPDATE SET
enabled = EXCLUDED.enabled,
source = EXCLUDED.source,
updated_at = current_timestamp
RETURNING id, base_asset, quote_asset, enabled, source, created_at, updated_at
""",
(
pairing.base_asset,
pairing.quote_asset,
pairing.enabled,
pairing.source,
),
)
row = cursor.fetchone()
if row:
return ConfigPairing(
id=row[0],
base_asset=row[1],
quote_asset=row[2],
enabled=bool(row[3]),
source=row[4],
created_at=row[5],
updated_at=row[6],
)
raise ValueError("Failed to upsert pairing")
def list_pairings(self, enabled_only: bool = False) -> list[ConfigPairing]:
"""List all currency pairings. If enabled_only=True, only enabled pairings."""
with self._store.connect() as conn:
query = """
SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at
FROM config_pairings
ORDER BY base_asset, quote_asset
"""
)
"""
params: list[object] = []
if enabled_only:
query += " WHERE enabled = TRUE"
query += " ORDER BY base_asset, quote_asset"
cursor = conn.execute(query, params)
return [
ConfigPairing(
id=row[0],
@@ -738,7 +766,8 @@ class ConfigBacktestingDefaultsRepository:
""",
(
(
orjson.dumps(defaults.starting_balances).decode("utf-8")
orjson.dumps(
defaults.starting_balances).decode("utf-8")
if defaults.starting_balances
else None
),
@@ -762,14 +791,12 @@ class ConfigBacktestingDefaultsRepository:
def get_defaults(self) -> ConfigBacktestingDefaults | None:
"""Get the current backtesting defaults."""
with self._store.connect() as conn:
cursor = conn.execute(
"""
cursor = conn.execute("""
SELECT id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
FROM config_backtesting_defaults
ORDER BY id DESC
LIMIT 1
"""
)
""")
row = cursor.fetchone()
if row:
return ConfigBacktestingDefaults(
@@ -795,7 +822,8 @@ class ConfigBacktestingDefaultsRepository:
""",
(
(
orjson.dumps(defaults.starting_balances).decode("utf-8")
orjson.dumps(
defaults.starting_balances).decode("utf-8")
if defaults.starting_balances
else None
),
@@ -848,7 +876,8 @@ class KrakenAccountSnapshotRepository:
snapshot.taker_fee,
snapshot.thirty_day_volume,
(
orjson.dumps(snapshot.trade_balance_raw).decode("utf-8")
orjson.dumps(
snapshot.trade_balance_raw).decode("utf-8")
if snapshot.trade_balance_raw
else None
),
@@ -862,15 +891,13 @@ class KrakenAccountSnapshotRepository:
def latest_snapshot(self) -> KrakenAccountSnapshot | None:
with self._store.connect() as conn:
row = conn.execute(
"""
row = conn.execute("""
SELECT snapshot_at, fee_tier, maker_fee, taker_fee,
thirty_day_volume, trade_balance_raw, fee_schedule_raw
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
"""
).fetchone()
""").fetchone()
if row is None:
return None
return KrakenAccountSnapshot(
@@ -911,7 +938,8 @@ class BacktestJobRepository:
VALUES (?, ?)
RETURNING id, status, events_path, config, created_at
""",
(events_path, orjson.dumps(config).decode("utf-8") if config else None),
(events_path, orjson.dumps(config).decode(
"utf-8") if config else None),
).fetchone()
if row is None:
raise ValueError("Failed to create backtest job")
@@ -0,0 +1,31 @@
{% for p in pairings %}
<label
style="
display: inline-flex;
align-items: center;
gap: 4px;
padding: 3px 8px;
border: 1px solid rgba(255, 255, 255, 0.12);
border-radius: 4px;
cursor: pointer;
font-size: 0.85rem;
"
>
<input
type="checkbox"
name="symbols"
value="{{ p.base_asset }}/{{ p.quote_asset }}"
{%
if
p.enabled
%}checked{%
endif
%}
/>
{{ p.base_asset }}/{{ p.quote_asset }}
</label>
{% endfor %} {% if not pairings %}
<span style="opacity: 0.5"
>No pairings available. Sync from Kraken in config page.</span
>
{% endif %}
@@ -53,13 +53,15 @@
>
<input type="hidden" name="source" value="db" />
<label class="field">
<span>Symbols (comma-separated, blank=all)</span>
<input
name="symbols"
type="text"
value="{{ symbols | default('') }}"
placeholder="BTC/USD,ETH/BTC"
/>
<span>Pairings</span>
<div
id="pairing-checkboxes"
hx-get="/dashboard/fragment/backtesting-pairings"
hx-trigger="load"
style="display: flex; flex-wrap: wrap; gap: 6px; margin-top: 4px"
>
<span style="opacity: 0.5">Loading pairings...</span>
</div>
</label>
<label class="field">
<span>Start time (ISO datetime, optional)</span>
@@ -467,6 +467,52 @@
</label>
</div>
<!-- Pairings -->
<div class="card" id="pairings-card">
<div class="label">Currency Pairings</div>
<div style="display: flex; gap: 8px; margin-bottom: 12px">
<input
id="pairing-search"
type="text"
placeholder="Search pairings..."
hx-get="/dashboard/fragment/pairings"
hx-target="#pairings-table-container"
hx-trigger="keyup changed delay:300ms"
hx-swap="innerHTML"
name="search"
style="
flex: 1;
padding: 6px 10px;
border-radius: 6px;
border: 1px solid rgba(255, 255, 255, 0.15);
background: rgba(0, 0, 0, 0.3);
color: inherit;
"
/>
<button
type="button"
class="button"
id="pairing-sync-btn"
hx-post="/dashboard/api/pairings/sync"
hx-target="#pairings-table-container"
hx-swap="innerHTML"
hx-trigger="click"
style="white-space: nowrap"
>
Sync from Kraken
</button>
</div>
<div
id="pairings-table-container"
hx-get="/dashboard/fragment/pairings"
hx-trigger="load"
>
<div style="text-align: center; padding: 20px; opacity: 0.5">
Loading pairings...
</div>
</div>
</div>
<!-- Risk -->
<div class="card">
<div class="label">Risk Limits</div>
@@ -0,0 +1,50 @@
<table
class="pairings-table"
style="width: 100%; border-collapse: collapse; font-size: 0.85rem"
>
<thead>
<tr
style="
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
text-align: left;
"
>
<th style="padding: 6px 8px">Base</th>
<th style="padding: 6px 8px">Quote</th>
<th style="padding: 6px 8px">Source</th>
<th style="padding: 6px 8px; text-align: center">Enabled</th>
</tr>
</thead>
<tbody>
{% for p in pairings %}
<tr style="border-bottom: 1px solid rgba(255, 255, 255, 0.05)">
<td style="padding: 6px 8px">{{ p.base_asset }}</td>
<td style="padding: 6px 8px">{{ p.quote_asset }}</td>
<td style="padding: 6px 8px; opacity: 0.6">{{ p.source }}</td>
<td style="padding: 6px 8px; text-align: center">
<input
type="checkbox"
hx-post="/dashboard/api/pairings/toggle"
hx-vals='{"base_asset": "{{ p.base_asset }}", "quote_asset": "{{ p.quote_asset }}"}'
hx-trigger="change"
hx-target="#pairings-table-container"
hx-swap="innerHTML"
hx-include="#pairing-search"
{%
if
p.enabled
%}checked{%
endif
%}
/>
</td>
</tr>
{% endfor %} {% if not pairings %}
<tr>
<td colspan="4" style="padding: 20px; text-align: center; opacity: 0.5">
No pairings found. Click "Sync from Kraken" to fetch available pairs.
</td>
</tr>
{% endif %}
</tbody>
</table>