feat: implement fee synchronization and dashboard updates for Kraken account fees

This commit is contained in:
2026-06-03 18:59:39 +02:00
parent 5f2f968721
commit 587c9afc3b
7 changed files with 397 additions and 1 deletions
+43
View File
@@ -5,11 +5,15 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
from arbitrade.alerting.notifier import build_notifier_from_settings
from arbitrade.api.control_state import DashboardControlState
from arbitrade.api.routes import public_router, router
from arbitrade.config.settings import Settings
from arbitrade.config.service import ConfigurationService
from arbitrade.exchange.fee_service import run_fee_sync_loop
from arbitrade.exchange.kraken_rest import KrakenRestClient
from arbitrade.logging_setup import configure_logging
from arbitrade.metrics import MetricsCalculator
from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state
@@ -22,16 +26,55 @@ def create_app(settings: Settings) -> FastAPI:
db = DuckDBStore(settings)
db.migrate()
kraken_client = KrakenRestClient(settings)
fee_sync_stop_event = asyncio.Event()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
await restore_runtime_state(app)
fee_sync_task = asyncio.create_task(
run_fee_sync_loop(
kraken_client,
db,
lambda: _active_pairings(app),
fee_sync_stop_event,
),
name="fee_sync_loop",
)
app.state.fee_sync_task = fee_sync_task
yield
fee_sync_stop_event.set()
fee_sync_task.cancel()
try:
await fee_sync_task
except asyncio.CancelledError:
pass
await kraken_client.close()
await graceful_shutdown(app)
def _active_pairings(app: FastAPI) -> list[str]:
ctl = app.state.dashboard_controls
cfg_svc = app.state.configuration_service
pairs: list[str] = []
if ctl.tradable_pairs:
pairs = [p.upper() for p in ctl.tradable_pairs]
elif cfg_svc is not None:
try:
all_pairings = cfg_svc.list_pairings()
pairs = [
f"{p.base_asset}/{p.quote_asset}"
for p in all_pairings
if getattr(p, "enabled", True)
]
except Exception:
pass
return pairs
app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan)
app.state.settings = settings
app.state.store = db
app.state.kraken_client = kraken_client
app.state.fee_sync_stop_event = fee_sync_stop_event
app.state.metrics = MetricsCalculator(db)
app.state.audit_repository = AuditRepository(db)
app.state.runtime_state_repository = RuntimeStateRepository(db)
+24
View File
@@ -152,6 +152,26 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
for row in latest_opportunities
]
# Query latest Kraken account snapshot for fee info
fee_tier = ""
maker_fee = ""
taker_fee = ""
thirty_day_volume = ""
try:
acct_row = conn.execute("""
SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
if acct_row is not None:
fee_tier = str(acct_row[0]) if acct_row[0] is not None else ""
maker_fee = f"{float(acct_row[1]):.4%}" if acct_row[1] is not None else ""
taker_fee = f"{float(acct_row[2]):.4%}" if acct_row[2] is not None else ""
thirty_day_volume = f"{float(acct_row[3]):.2f}" if acct_row[3] is not None else ""
except Exception:
pass
return {
"status": "live",
"generated_at": datetime.now(UTC).isoformat(),
@@ -161,6 +181,10 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
"open_trades": open_trade_rows,
"realized_pnl_total": f"{float(rpnl[0]):.2f} USD" if rpnl else "",
"opportunities": opportunity_rows,
"fee_tier": fee_tier,
"maker_fee": maker_fee,
"taker_fee": taker_fee,
"thirty_day_volume": thirty_day_volume,
}
+202
View File
@@ -0,0 +1,202 @@
"""Fee service -- fetch Kraken account fee tier, sync pair fees, persist snapshots."""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
import structlog
from arbitrade.exchange.kraken_rest import KrakenRestClient
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.repositories import (
KrakenAccountSnapshot,
KrakenAccountSnapshotRepository,
)
_LOG = structlog.get_logger(__name__)
_FEE_REFRESH_INTERVAL_SECONDS = 86400 # 1 day
async def fetch_and_store_account_snapshot(
client: KrakenRestClient,
store: DuckDBStore,
) -> KrakenAccountSnapshot | None:
"""Query TradeVolume + TradeBalance, persist as snapshot.
Returns the snapshot or None if either call failed.
"""
repo = KrakenAccountSnapshotRepository(store)
try:
volume_data = await client.trade_volume()
except Exception:
_LOG.exception("trade_volume_fetch_failed")
return None
try:
balance_data = await client.trade_balance()
except Exception:
_LOG.exception("trade_balance_fetch_failed")
return None
fee_tier = volume_data.get("fee_tier") if isinstance(
volume_data, dict) else None
fees_dict = volume_data.get("fees") if isinstance(
volume_data, dict) else None
fees_maker = volume_data.get("fees_maker") if isinstance(
volume_data, dict) else None
currency = volume_data.get("currency")
thirty_day_volume_str = volume_data.get("volume")
maker_fee = None
taker_fee = None
fee_tier_str = str(fee_tier) if fee_tier is not None else None
# Extract current tier's maker/taker rates from fees dict
if isinstance(fees_dict, dict) and fee_tier_str is not None:
tier_fees = fees_dict.get(fee_tier_str)
if isinstance(tier_fees, dict):
maker_val = tier_fees.get("maker")
taker_val = tier_fees.get("taker")
maker_fee = float(maker_val) if maker_val is not None else None
taker_fee = float(taker_val) if taker_val is not None else None
# Build fee schedule as combined dict
fee_schedule: dict[str, object] = {}
if isinstance(fees_dict, dict):
fee_schedule["fees"] = fees_dict
if isinstance(fees_maker, dict):
fee_schedule["fees_maker"] = fees_maker
if currency is not None:
fee_schedule["currency"] = currency
thirty_day_volume = (
float(thirty_day_volume_str) if thirty_day_volume_str is not None else None
)
snapshot = KrakenAccountSnapshot(
snapshot_at=datetime.now(timezone.utc),
fee_tier=fee_tier_str,
maker_fee=maker_fee,
taker_fee=taker_fee,
thirty_day_volume=thirty_day_volume,
trade_balance_raw=balance_data if isinstance(
balance_data, dict) else None,
fee_schedule_raw=fee_schedule if fee_schedule else None,
)
repo.insert_snapshot(snapshot)
_LOG.info(
"account_snapshot_stored",
fee_tier=fee_tier_str,
maker_fee=maker_fee,
taker_fee=taker_fee,
)
return snapshot
async def fetch_and_store_pair_fees(
client: KrakenRestClient,
store: DuckDBStore,
pairings: list[str],
) -> dict[str, dict[str, object]]:
"""Query TradeVolume per pair, upsert maker/taker into config_pair_fees.
Returns dict of pair -> fee info for successfully fetched pairs.
"""
results: dict[str, dict[str, object]] = {}
for pair in pairings:
try:
volume_data = await client.trade_volume(pair=pair)
except Exception:
_LOG.exception("trade_volume_pair_failed", pair=pair)
continue
if not isinstance(volume_data, dict):
continue
fees_dict = volume_data.get("fees")
fee_tier = volume_data.get("fee_tier")
fee_tier_str = str(fee_tier) if fee_tier is not None else None
maker_fee = None
taker_fee = None
if isinstance(fees_dict, dict) and fee_tier_str is not None:
tier_fees = fees_dict.get(fee_tier_str)
if isinstance(tier_fees, dict):
maker_fee = tier_fees.get("maker")
taker_fee = tier_fees.get("taker")
if maker_fee is not None and taker_fee is not None:
base, quote = pair.split("/", 1) if "/" in pair else (pair, "")
market_type = "crypto_fiat" if quote.upper(
) in {"USD", "EUR", "GBP", "JPY", "CAD", "CHF", "AUD"} else "crypto_crypto"
with store.connect() as conn:
# Find pairing_id
pairing_row = conn.execute(
"SELECT id FROM config_pairings WHERE base_asset = ? AND quote_asset = ?",
(base.upper() if "/" in pair else pair,
quote.upper() if "/" in pair else ""),
).fetchone()
if pairing_row is not None:
pairing_id = pairing_row[0]
conn.execute(
"""
INSERT OR REPLACE INTO config_pair_fees
(pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at, source)
VALUES (?, ?, ?, ?, current_timestamp, 'kraken_api')
""",
(pairing_id, market_type, float(
maker_fee), float(taker_fee)),
)
results[pair] = {
"market_type": market_type,
"maker_fee": float(maker_fee),
"taker_fee": float(taker_fee),
"fee_tier": fee_tier_str,
}
_LOG.info("pair_fee_synced", pair=pair,
maker=maker_fee, taker=taker_fee)
return results
async def run_fee_sync_loop(
client: KrakenRestClient,
store: DuckDBStore,
get_active_pairings: callable[[], list[str]],
stop_event: asyncio.Event,
) -> None:
"""Periodic loop: fetch account snapshot + sync pair fees every hour.
Runs until stop_event is set.
"""
_LOG.info("fee_sync_loop_started",
interval_s=_FEE_REFRESH_INTERVAL_SECONDS)
while not stop_event.is_set():
try:
await fetch_and_store_account_snapshot(client, store)
pairings = get_active_pairings()
if pairings:
await fetch_and_store_pair_fees(client, store, pairings)
except Exception:
_LOG.exception("fee_sync_loop_iteration_failed")
# Wait with stop_event check
try:
await asyncio.wait_for(
stop_event.wait(),
timeout=_FEE_REFRESH_INTERVAL_SECONDS,
)
break # stop_event was set
except asyncio.TimeoutError:
pass # timeout elapsed, loop again
_LOG.info("fee_sync_loop_stopped")
+31
View File
@@ -279,3 +279,34 @@ class KrakenRestClient:
"/0/private/CancelOrder",
data={"txid": order_id},
)
async def trade_volume(self, *, pair: str | None = None) -> dict[str, Any]:
"""Query Kraken TradeVolume for fee tier, 30d volume, and fee schedule.
Returns dict with keys: currency, volume, fees (dict of tiers),
fees_maker (dict of tier->fee mappings), fee_tier (current tier).
If pair provided, returns pair-specific fee info.
"""
data: dict[str, str] = {}
if pair is not None:
data["pair"] = pair
return await self._throttled_private_call(
"/0/private/TradeVolume",
data=data if data else None,
)
async def trade_balance(self, *, asset: str | None = None) -> dict[str, Any]:
"""Query Kraken TradeBalance for equity, trade balance, margin info.
Returns dict with keys: eb (equivalent balance/equity),
tb (trade balance), m (margin amount), n (unrealized net P&L),
c (cost basis), v (current valuation), e (equity).
If asset provided, returns asset-class-specific balance.
"""
data: dict[str, str] = {}
if asset is not None:
data["asset"] = asset
return await self._throttled_private_call(
"/0/private/TradeBalance",
data=data if data else None,
)
+31 -1
View File
@@ -145,11 +145,21 @@ CREATE TABLE IF NOT EXISTS runtime_state_snapshots (
last_known_balances JSON,
note VARCHAR
);
CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
snapshot_at TIMESTAMP NOT NULL,
fee_tier VARCHAR,
maker_fee DOUBLE,
taker_fee DOUBLE,
thirty_day_volume DOUBLE,
trade_balance_raw JSON,
fee_schedule_raw JSON
);
"""
class DuckDBStore:
SCHEMA_VERSION = 2
SCHEMA_VERSION = 3
def __init__(self, settings: Settings) -> None:
self._db_path = Path(settings.duckdb_path)
@@ -244,6 +254,26 @@ class DuckDBStore:
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (2)")
_LOG.info("migration_applied", version=2)
if current_version < 3:
# Migration v3: Add kraken_account_snapshots table
conn.execute("""
CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
snapshot_at TIMESTAMP NOT NULL,
fee_tier VARCHAR,
maker_fee DOUBLE,
taker_fee DOUBLE,
thirty_day_volume DOUBLE,
trade_balance_raw JSON,
fee_schedule_raw JSON
)
""")
# Add source column to config_pair_fees
conn.execute(
"ALTER TABLE config_pair_fees ADD COLUMN IF NOT EXISTS source VARCHAR DEFAULT 'manual'")
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version) VALUES (3)")
_LOG.info("migration_applied", version=3)
# Update version to current
conn.execute(
f"INSERT OR REPLACE INTO schema_migrations (version, applied_at) "
+61
View File
@@ -932,3 +932,64 @@ class ConfigBacktestingDefaultsRepository:
execution_latency_ms=row[5]
)
raise ValueError("Failed to update backtesting defaults")
@dataclass(slots=True)
class KrakenAccountSnapshot:
snapshot_at: datetime
fee_tier: str | None
maker_fee: float | None
taker_fee: float | None
thirty_day_volume: float | None
trade_balance_raw: dict[str, Any] | None
fee_schedule_raw: dict[str, Any] | None
class KrakenAccountSnapshotRepository:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
def insert_snapshot(self, snapshot: KrakenAccountSnapshot) -> None:
with self._store.connect() as conn:
conn.execute(
"""
INSERT INTO kraken_account_snapshots
(snapshot_at, fee_tier, maker_fee, taker_fee,
thirty_day_volume, trade_balance_raw, fee_schedule_raw)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
snapshot.snapshot_at,
snapshot.fee_tier,
snapshot.maker_fee,
snapshot.taker_fee,
snapshot.thirty_day_volume,
orjson.dumps(snapshot.trade_balance_raw).decode("utf-8")
if snapshot.trade_balance_raw else None,
orjson.dumps(snapshot.fee_schedule_raw).decode("utf-8")
if snapshot.fee_schedule_raw else None,
),
)
def latest_snapshot(self) -> KrakenAccountSnapshot | None:
with self._store.connect() as conn:
row = conn.execute(
"""
SELECT snapshot_at, fee_tier, maker_fee, taker_fee,
thirty_day_volume, trade_balance_raw, fee_schedule_raw
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
"""
).fetchone()
if row is None:
return None
return KrakenAccountSnapshot(
snapshot_at=row[0],
fee_tier=row[1],
maker_fee=row[2],
taker_fee=row[3],
thirty_day_volume=row[4],
trade_balance_raw=orjson.loads(row[5]) if row[5] else None,
fee_schedule_raw=orjson.loads(row[6]) if row[6] else None,
)
@@ -16,6 +16,11 @@
<div class="label">Realized P&amp;L</div>
<div class="value">{{ realized_pnl_total }}</div>
</article>
<article class="card">
<div class="label">Fee Tier</div>
<div class="value">{{ fee_tier }}</div>
<div class="meta">Maker {{ maker_fee }} / Taker {{ taker_fee }}</div>
</article>
</div>
<div