diff --git a/src/arbitrade/api/app.py b/src/arbitrade/api/app.py index bd2cbe8..b92cc30 100644 --- a/src/arbitrade/api/app.py +++ b/src/arbitrade/api/app.py @@ -5,11 +5,15 @@ from contextlib import asynccontextmanager from fastapi import FastAPI +import asyncio + from arbitrade.alerting.notifier import build_notifier_from_settings from arbitrade.api.control_state import DashboardControlState from arbitrade.api.routes import public_router, router from arbitrade.config.settings import Settings from arbitrade.config.service import ConfigurationService +from arbitrade.exchange.fee_service import run_fee_sync_loop +from arbitrade.exchange.kraken_rest import KrakenRestClient from arbitrade.logging_setup import configure_logging from arbitrade.metrics import MetricsCalculator from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state @@ -22,16 +26,55 @@ def create_app(settings: Settings) -> FastAPI: db = DuckDBStore(settings) db.migrate() + kraken_client = KrakenRestClient(settings) + fee_sync_stop_event = asyncio.Event() @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: await restore_runtime_state(app) + fee_sync_task = asyncio.create_task( + run_fee_sync_loop( + kraken_client, + db, + lambda: _active_pairings(app), + fee_sync_stop_event, + ), + name="fee_sync_loop", + ) + app.state.fee_sync_task = fee_sync_task yield + fee_sync_stop_event.set() + fee_sync_task.cancel() + try: + await fee_sync_task + except asyncio.CancelledError: + pass + await kraken_client.close() await graceful_shutdown(app) + def _active_pairings(app: FastAPI) -> list[str]: + ctl = app.state.dashboard_controls + cfg_svc = app.state.configuration_service + pairs: list[str] = [] + if ctl.tradable_pairs: + pairs = [p.upper() for p in ctl.tradable_pairs] + elif cfg_svc is not None: + try: + all_pairings = cfg_svc.list_pairings() + pairs = [ + f"{p.base_asset}/{p.quote_asset}" + for p in all_pairings + if getattr(p, "enabled", True) + ] + except Exception: + pass + return pairs + app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan) app.state.settings = settings app.state.store = db + app.state.kraken_client = kraken_client + app.state.fee_sync_stop_event = fee_sync_stop_event app.state.metrics = MetricsCalculator(db) app.state.audit_repository = AuditRepository(db) app.state.runtime_state_repository = RuntimeStateRepository(db) diff --git a/src/arbitrade/api/routes.py b/src/arbitrade/api/routes.py index 829721e..7c84708 100644 --- a/src/arbitrade/api/routes.py +++ b/src/arbitrade/api/routes.py @@ -152,6 +152,26 @@ def _dashboard_overview(request: Request) -> dict[str, object]: for row in latest_opportunities ] + # Query latest Kraken account snapshot for fee info + fee_tier = "—" + maker_fee = "—" + taker_fee = "—" + thirty_day_volume = "—" + try: + acct_row = conn.execute(""" + SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume + FROM kraken_account_snapshots + ORDER BY snapshot_at DESC + LIMIT 1 + """).fetchone() + if acct_row is not None: + fee_tier = str(acct_row[0]) if acct_row[0] is not None else "—" + maker_fee = f"{float(acct_row[1]):.4%}" if acct_row[1] is not None else "—" + taker_fee = f"{float(acct_row[2]):.4%}" if acct_row[2] is not None else "—" + thirty_day_volume = f"{float(acct_row[3]):.2f}" if acct_row[3] is not None else "—" + except Exception: + pass + return { "status": "live", "generated_at": datetime.now(UTC).isoformat(), @@ -161,6 +181,10 @@ def _dashboard_overview(request: Request) -> dict[str, object]: "open_trades": open_trade_rows, "realized_pnl_total": f"{float(rpnl[0]):.2f} USD" if rpnl else "—", "opportunities": opportunity_rows, + "fee_tier": fee_tier, + "maker_fee": maker_fee, + "taker_fee": taker_fee, + "thirty_day_volume": thirty_day_volume, } diff --git a/src/arbitrade/exchange/fee_service.py b/src/arbitrade/exchange/fee_service.py new file mode 100644 index 0000000..ee48f38 --- /dev/null +++ b/src/arbitrade/exchange/fee_service.py @@ -0,0 +1,202 @@ +"""Fee service -- fetch Kraken account fee tier, sync pair fees, persist snapshots.""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone + +import structlog + +from arbitrade.exchange.kraken_rest import KrakenRestClient +from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.repositories import ( + KrakenAccountSnapshot, + KrakenAccountSnapshotRepository, +) + +_LOG = structlog.get_logger(__name__) + +_FEE_REFRESH_INTERVAL_SECONDS = 86400 # 1 day + + +async def fetch_and_store_account_snapshot( + client: KrakenRestClient, + store: DuckDBStore, +) -> KrakenAccountSnapshot | None: + """Query TradeVolume + TradeBalance, persist as snapshot. + + Returns the snapshot or None if either call failed. + """ + repo = KrakenAccountSnapshotRepository(store) + + try: + volume_data = await client.trade_volume() + except Exception: + _LOG.exception("trade_volume_fetch_failed") + return None + + try: + balance_data = await client.trade_balance() + except Exception: + _LOG.exception("trade_balance_fetch_failed") + return None + + fee_tier = volume_data.get("fee_tier") if isinstance( + volume_data, dict) else None + fees_dict = volume_data.get("fees") if isinstance( + volume_data, dict) else None + fees_maker = volume_data.get("fees_maker") if isinstance( + volume_data, dict) else None + currency = volume_data.get("currency") + thirty_day_volume_str = volume_data.get("volume") + + maker_fee = None + taker_fee = None + fee_tier_str = str(fee_tier) if fee_tier is not None else None + + # Extract current tier's maker/taker rates from fees dict + if isinstance(fees_dict, dict) and fee_tier_str is not None: + tier_fees = fees_dict.get(fee_tier_str) + if isinstance(tier_fees, dict): + maker_val = tier_fees.get("maker") + taker_val = tier_fees.get("taker") + maker_fee = float(maker_val) if maker_val is not None else None + taker_fee = float(taker_val) if taker_val is not None else None + + # Build fee schedule as combined dict + fee_schedule: dict[str, object] = {} + if isinstance(fees_dict, dict): + fee_schedule["fees"] = fees_dict + if isinstance(fees_maker, dict): + fee_schedule["fees_maker"] = fees_maker + if currency is not None: + fee_schedule["currency"] = currency + + thirty_day_volume = ( + float(thirty_day_volume_str) if thirty_day_volume_str is not None else None + ) + + snapshot = KrakenAccountSnapshot( + snapshot_at=datetime.now(timezone.utc), + fee_tier=fee_tier_str, + maker_fee=maker_fee, + taker_fee=taker_fee, + thirty_day_volume=thirty_day_volume, + trade_balance_raw=balance_data if isinstance( + balance_data, dict) else None, + fee_schedule_raw=fee_schedule if fee_schedule else None, + ) + + repo.insert_snapshot(snapshot) + _LOG.info( + "account_snapshot_stored", + fee_tier=fee_tier_str, + maker_fee=maker_fee, + taker_fee=taker_fee, + ) + return snapshot + + +async def fetch_and_store_pair_fees( + client: KrakenRestClient, + store: DuckDBStore, + pairings: list[str], +) -> dict[str, dict[str, object]]: + """Query TradeVolume per pair, upsert maker/taker into config_pair_fees. + + Returns dict of pair -> fee info for successfully fetched pairs. + """ + results: dict[str, dict[str, object]] = {} + + for pair in pairings: + try: + volume_data = await client.trade_volume(pair=pair) + except Exception: + _LOG.exception("trade_volume_pair_failed", pair=pair) + continue + + if not isinstance(volume_data, dict): + continue + + fees_dict = volume_data.get("fees") + fee_tier = volume_data.get("fee_tier") + fee_tier_str = str(fee_tier) if fee_tier is not None else None + + maker_fee = None + taker_fee = None + if isinstance(fees_dict, dict) and fee_tier_str is not None: + tier_fees = fees_dict.get(fee_tier_str) + if isinstance(tier_fees, dict): + maker_fee = tier_fees.get("maker") + taker_fee = tier_fees.get("taker") + + if maker_fee is not None and taker_fee is not None: + base, quote = pair.split("/", 1) if "/" in pair else (pair, "") + market_type = "crypto_fiat" if quote.upper( + ) in {"USD", "EUR", "GBP", "JPY", "CAD", "CHF", "AUD"} else "crypto_crypto" + + with store.connect() as conn: + # Find pairing_id + pairing_row = conn.execute( + "SELECT id FROM config_pairings WHERE base_asset = ? AND quote_asset = ?", + (base.upper() if "/" in pair else pair, + quote.upper() if "/" in pair else ""), + ).fetchone() + + if pairing_row is not None: + pairing_id = pairing_row[0] + conn.execute( + """ + INSERT OR REPLACE INTO config_pair_fees + (pairing_id, market_type, maker_fee_rate, taker_fee_rate, updated_at, source) + VALUES (?, ?, ?, ?, current_timestamp, 'kraken_api') + """, + (pairing_id, market_type, float( + maker_fee), float(taker_fee)), + ) + + results[pair] = { + "market_type": market_type, + "maker_fee": float(maker_fee), + "taker_fee": float(taker_fee), + "fee_tier": fee_tier_str, + } + _LOG.info("pair_fee_synced", pair=pair, + maker=maker_fee, taker=taker_fee) + + return results + + +async def run_fee_sync_loop( + client: KrakenRestClient, + store: DuckDBStore, + get_active_pairings: callable[[], list[str]], + stop_event: asyncio.Event, +) -> None: + """Periodic loop: fetch account snapshot + sync pair fees every hour. + + Runs until stop_event is set. + """ + _LOG.info("fee_sync_loop_started", + interval_s=_FEE_REFRESH_INTERVAL_SECONDS) + + while not stop_event.is_set(): + try: + await fetch_and_store_account_snapshot(client, store) + pairings = get_active_pairings() + if pairings: + await fetch_and_store_pair_fees(client, store, pairings) + except Exception: + _LOG.exception("fee_sync_loop_iteration_failed") + + # Wait with stop_event check + try: + await asyncio.wait_for( + stop_event.wait(), + timeout=_FEE_REFRESH_INTERVAL_SECONDS, + ) + break # stop_event was set + except asyncio.TimeoutError: + pass # timeout elapsed, loop again + + _LOG.info("fee_sync_loop_stopped") diff --git a/src/arbitrade/exchange/kraken_rest.py b/src/arbitrade/exchange/kraken_rest.py index b4d5816..67e4fae 100644 --- a/src/arbitrade/exchange/kraken_rest.py +++ b/src/arbitrade/exchange/kraken_rest.py @@ -279,3 +279,34 @@ class KrakenRestClient: "/0/private/CancelOrder", data={"txid": order_id}, ) + + async def trade_volume(self, *, pair: str | None = None) -> dict[str, Any]: + """Query Kraken TradeVolume for fee tier, 30d volume, and fee schedule. + + Returns dict with keys: currency, volume, fees (dict of tiers), + fees_maker (dict of tier->fee mappings), fee_tier (current tier). + If pair provided, returns pair-specific fee info. + """ + data: dict[str, str] = {} + if pair is not None: + data["pair"] = pair + return await self._throttled_private_call( + "/0/private/TradeVolume", + data=data if data else None, + ) + + async def trade_balance(self, *, asset: str | None = None) -> dict[str, Any]: + """Query Kraken TradeBalance for equity, trade balance, margin info. + + Returns dict with keys: eb (equivalent balance/equity), + tb (trade balance), m (margin amount), n (unrealized net P&L), + c (cost basis), v (current valuation), e (equity). + If asset provided, returns asset-class-specific balance. + """ + data: dict[str, str] = {} + if asset is not None: + data["asset"] = asset + return await self._throttled_private_call( + "/0/private/TradeBalance", + data=data if data else None, + ) diff --git a/src/arbitrade/storage/db.py b/src/arbitrade/storage/db.py index 084a46c..f01e6f0 100644 --- a/src/arbitrade/storage/db.py +++ b/src/arbitrade/storage/db.py @@ -145,11 +145,21 @@ CREATE TABLE IF NOT EXISTS runtime_state_snapshots ( last_known_balances JSON, note VARCHAR ); + +CREATE TABLE IF NOT EXISTS kraken_account_snapshots ( + snapshot_at TIMESTAMP NOT NULL, + fee_tier VARCHAR, + maker_fee DOUBLE, + taker_fee DOUBLE, + thirty_day_volume DOUBLE, + trade_balance_raw JSON, + fee_schedule_raw JSON +); """ class DuckDBStore: - SCHEMA_VERSION = 2 + SCHEMA_VERSION = 3 def __init__(self, settings: Settings) -> None: self._db_path = Path(settings.duckdb_path) @@ -244,6 +254,26 @@ class DuckDBStore: "INSERT OR IGNORE INTO schema_migrations (version) VALUES (2)") _LOG.info("migration_applied", version=2) + if current_version < 3: + # Migration v3: Add kraken_account_snapshots table + conn.execute(""" + CREATE TABLE IF NOT EXISTS kraken_account_snapshots ( + snapshot_at TIMESTAMP NOT NULL, + fee_tier VARCHAR, + maker_fee DOUBLE, + taker_fee DOUBLE, + thirty_day_volume DOUBLE, + trade_balance_raw JSON, + fee_schedule_raw JSON + ) + """) + # Add source column to config_pair_fees + conn.execute( + "ALTER TABLE config_pair_fees ADD COLUMN IF NOT EXISTS source VARCHAR DEFAULT 'manual'") + conn.execute( + "INSERT OR IGNORE INTO schema_migrations (version) VALUES (3)") + _LOG.info("migration_applied", version=3) + # Update version to current conn.execute( f"INSERT OR REPLACE INTO schema_migrations (version, applied_at) " diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index d137398..fbf5330 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -932,3 +932,64 @@ class ConfigBacktestingDefaultsRepository: execution_latency_ms=row[5] ) raise ValueError("Failed to update backtesting defaults") + + +@dataclass(slots=True) +class KrakenAccountSnapshot: + snapshot_at: datetime + fee_tier: str | None + maker_fee: float | None + taker_fee: float | None + thirty_day_volume: float | None + trade_balance_raw: dict[str, Any] | None + fee_schedule_raw: dict[str, Any] | None + + +class KrakenAccountSnapshotRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def insert_snapshot(self, snapshot: KrakenAccountSnapshot) -> None: + with self._store.connect() as conn: + conn.execute( + """ + INSERT INTO kraken_account_snapshots + (snapshot_at, fee_tier, maker_fee, taker_fee, + thirty_day_volume, trade_balance_raw, fee_schedule_raw) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + snapshot.snapshot_at, + snapshot.fee_tier, + snapshot.maker_fee, + snapshot.taker_fee, + snapshot.thirty_day_volume, + orjson.dumps(snapshot.trade_balance_raw).decode("utf-8") + if snapshot.trade_balance_raw else None, + orjson.dumps(snapshot.fee_schedule_raw).decode("utf-8") + if snapshot.fee_schedule_raw else None, + ), + ) + + def latest_snapshot(self) -> KrakenAccountSnapshot | None: + with self._store.connect() as conn: + row = conn.execute( + """ + SELECT snapshot_at, fee_tier, maker_fee, taker_fee, + thirty_day_volume, trade_balance_raw, fee_schedule_raw + FROM kraken_account_snapshots + ORDER BY snapshot_at DESC + LIMIT 1 + """ + ).fetchone() + if row is None: + return None + return KrakenAccountSnapshot( + snapshot_at=row[0], + fee_tier=row[1], + maker_fee=row[2], + taker_fee=row[3], + thirty_day_volume=row[4], + trade_balance_raw=orjson.loads(row[5]) if row[5] else None, + fee_schedule_raw=orjson.loads(row[6]) if row[6] else None, + ) diff --git a/src/arbitrade/web/templates/partials/overview.html b/src/arbitrade/web/templates/partials/overview.html index 6787b51..6334ce3 100644 --- a/src/arbitrade/web/templates/partials/overview.html +++ b/src/arbitrade/web/templates/partials/overview.html @@ -16,6 +16,11 @@
Realized P&L
{{ realized_pnl_total }}
+
+
Fee Tier
+
{{ fee_tier }}
+
Maker {{ maker_fee }} / Taker {{ taker_fee }}
+