diff --git a/src/arbitrade/api/routes.py b/src/arbitrade/api/routes.py index 3f4dee9..37d5dcc 100644 --- a/src/arbitrade/api/routes.py +++ b/src/arbitrade/api/routes.py @@ -6,51 +6,40 @@ from collections.abc import AsyncIterator from datetime import UTC, datetime from importlib import resources from pathlib import Path -from typing import cast -from urllib.parse import parse_qs -import orjson from fastapi import APIRouter, Depends, Request, Response from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.templating import Jinja2Templates -from arbitrade.alerting.notifier import SupportsAlerts, SupportsAlertStatus from arbitrade.api.auth import require_dashboard_auth -from arbitrade.api.control_state import DashboardControlState from arbitrade.config.pairing_sync import sync_pairings_from_kraken -from arbitrade.config.service import ConfigPairing from arbitrade.dashboard.context import ( _backtesting_panel_context, _recent_backtest_reports, ) from arbitrade.dashboard.dashboard import ( _alert_status_snapshot, - _dashboard_backtesting_job_handler, - _dashboard_backtesting_job_export, + _dashboard_audit, _dashboard_backtesting_handler, + _dashboard_backtesting_job_export, + _dashboard_backtesting_job_handler, + _dashboard_charts, + _dashboard_config_context, + _dashboard_controls, _dashboard_ctl_cfg, + _dashboard_ctl_kill, _dashboard_ctl_start, _dashboard_ctl_stop, - _dashboard_ctl_kill, _dashboard_metrics, _dashboard_overview, - _dashboard_config_context, - _dashboard_charts, - _dashboard_controls, - _dashboard_audit, - _dashboard_response, _dashboard_pairings_response, + _dashboard_response, _pairing_repo, - _toggle_pairing + _toggle_pairing, ) -from arbitrade.detection.graph import CurrencyGraph, TriangularCycle -from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import ( - AuditRecord, - AuditRepository, BacktestJobRepository, ConfigPairingRepository, - KrakenAccountSnapshotRepository, LogRepository, ) @@ -60,8 +49,7 @@ public_router = APIRouter() def _resolve_templates_directory() -> str: # Support source layout, Docker runtime (/app), and installed package data. - source_layout_path = Path( - __file__).resolve().parents[3] / "web" / "templates" + source_layout_path = Path(__file__).resolve().parents[3] / "web" / "templates" if source_layout_path.is_dir(): return str(source_layout_path) @@ -70,8 +58,7 @@ def _resolve_templates_directory() -> str: return str(docker_runtime_path) try: - package_path = resources.files( - "arbitrade").joinpath("web", "templates") + package_path = resources.files("arbitrade").joinpath("web", "templates") if package_path.is_dir(): return str(package_path) except (ModuleNotFoundError, AttributeError): @@ -375,6 +362,7 @@ async def health() -> JSONResponse: # ── Pairing API ───────────────────────────────────────────────────────────── + @router.get("/dashboard/api/pairings", response_class=JSONResponse) async def dashboard_api_pairings( request: Request, diff --git a/src/arbitrade/dashboard/context.py b/src/arbitrade/dashboard/context.py index c28dddd..c3d2024 100644 --- a/src/arbitrade/dashboard/context.py +++ b/src/arbitrade/dashboard/context.py @@ -1,25 +1,9 @@ from __future__ import annotations -import json -from asyncio import Lock -from collections.abc import AsyncIterator -from datetime import UTC, datetime -from importlib import resources -from pathlib import Path -from typing import cast -from urllib.parse import parse_qs +from fastapi import Request -import orjson -from fastapi import APIRouter, Depends, Request, Response -from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse -from fastapi.templating import Jinja2Templates from arbitrade.storage.repositories import ( - AuditRecord, - AuditRepository, BacktestJobRepository, - ConfigPairingRepository, - KrakenAccountSnapshotRepository, - LogRepository, ) diff --git a/src/arbitrade/dashboard/dashboard.py b/src/arbitrade/dashboard/dashboard.py index 0d359f8..ba34db1 100644 --- a/src/arbitrade/dashboard/dashboard.py +++ b/src/arbitrade/dashboard/dashboard.py @@ -1,39 +1,31 @@ from __future__ import annotations import json -from asyncio import Lock -from collections.abc import AsyncIterator from datetime import UTC, datetime from importlib import resources from pathlib import Path from typing import cast from urllib.parse import parse_qs -from httpcore import request import orjson from fastapi import APIRouter, Depends, Request, Response -from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse +from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from arbitrade.alerting.notifier import SupportsAlerts, SupportsAlertStatus from arbitrade.api.auth import require_dashboard_auth from arbitrade.api.control_state import DashboardControlState -from arbitrade.api.routes import ( +from arbitrade.config.service import ConfigPairing +from arbitrade.dashboard.context import ( _backtesting_panel_context, ) -from arbitrade.config.pairing_sync import sync_pairings_from_kraken -from arbitrade.config.service import ConfigPairing - -from arbitrade.api.control_state import DashboardControlState from arbitrade.storage.pg_store import PgStore -from arbitrade.detection.graph import CurrencyGraph, TriangularCycle from arbitrade.storage.repositories import ( AuditRecord, AuditRepository, BacktestJobRepository, ConfigPairingRepository, KrakenAccountSnapshotRepository, - LogRepository, ) router = APIRouter(dependencies=[Depends(require_dashboard_auth)]) @@ -42,8 +34,7 @@ public_router = APIRouter() def _resolve_templates_directory() -> str: # Support source layout, Docker runtime (/app), and installed package data. - source_layout_path = Path( - __file__).resolve().parents[3] / "web" / "templates" + source_layout_path = Path(__file__).resolve().parents[3] / "web" / "templates" if source_layout_path.is_dir(): return str(source_layout_path) @@ -52,8 +43,7 @@ def _resolve_templates_directory() -> str: return str(docker_runtime_path) try: - package_path = resources.files( - "arbitrade").joinpath("web", "templates") + package_path = resources.files("arbitrade").joinpath("web", "templates") if package_path.is_dir(): return str(package_path) except (ModuleNotFoundError, AttributeError): @@ -242,11 +232,9 @@ async def _dashboard_pairings_response( if source: pairings = [p for p in pairings if p.source.lower() == source.lower()] if base: - pairings = [p for p in pairings if p.base_asset.lower() == - base.lower()] + pairings = [p for p in pairings if p.base_asset.lower() == base.lower()] if quote: - pairings = [p for p in pairings if p.quote_asset.lower() == - quote.lower()] + pairings = [p for p in pairings if p.quote_asset.lower() == quote.lower()] # Sort reverse = order.lower() == "desc" @@ -391,8 +379,7 @@ async def _dashboard_overview(request: Request) -> dict[str, object]: LIMIT 1 """) if acct_row is not None: - fee_tier = str( - acct_row["fee_tier"]) if acct_row["fee_tier"] is not None else "—" + fee_tier = str(acct_row["fee_tier"]) if acct_row["fee_tier"] is not None else "—" maker_fee = ( f"{float(acct_row['maker_fee']):.4%}" if acct_row["maker_fee"] is not None @@ -420,8 +407,7 @@ async def _dashboard_overview(request: Request) -> dict[str, object]: try: parsed = json.loads(balances_raw) if isinstance(parsed, dict): - non_zero = {k: float(v) - for k, v in parsed.items() if float(v) > 0.0} + non_zero = {k: float(v) for k, v in parsed.items() if float(v) > 0.0} if non_zero: balances_value = "
".join( f"{v:.6g} {k}" for k, v in sorted(non_zero.items()) @@ -442,8 +428,7 @@ async def _dashboard_overview(request: Request) -> dict[str, object]: "trade_ref": str(r["trade_ref"]), "status": str(r["status"]), "started_at": ( - r["started_at"].isoformat() if isinstance( - r["started_at"], datetime) else "—" + r["started_at"].isoformat() if isinstance(r["started_at"], datetime) else "—" ), "cycle": str(r["cycle"]) if r["cycle"] is not None else "—", } @@ -457,8 +442,7 @@ async def _dashboard_overview(request: Request) -> dict[str, object]: f"{float(r['est_profit']):.2f} USD" if r["est_profit"] is not None else "—" ), "detected_at": ( - r["detected_at"].isoformat() if isinstance( - r["detected_at"], datetime) else "—" + r["detected_at"].isoformat() if isinstance(r["detected_at"], datetime) else "—" ), } for r in latest_opportunities @@ -499,10 +483,8 @@ async def _dashboard_charts(request: Request) -> dict[str, object]: labels.append(row["detected_at"].isoformat()) else: labels.append(f"opportunity-{index + 1}") - np = [float(row["net_pct"]) if row["net_pct"] - is not None else 0.0 for row in cr] - ep = [float(row["est_profit"]) if row["est_profit"] - is not None else 0.0 for row in cr] + np = [float(row["net_pct"]) if row["net_pct"] is not None else 0.0 for row in cr] + ep = [float(row["est_profit"]) if row["est_profit"] is not None else 0.0 for row in cr] cycles = [str(row["cycle"]) for row in cr] return { @@ -576,8 +558,7 @@ async def _dashboard_config_context(request: Request) -> dict[str, object]: max_consecutive_failures_value = ( str(rs.max_consecutive_failures) if rs.max_consecutive_failures is not None else "" ) - strategy_stat_arb_enabled = bool( - getattr(rs, "strategy_enable_stat_arb_experiment", False)) + strategy_stat_arb_enabled = bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)) return { # Runtime @@ -698,8 +679,7 @@ def _dashboard_controls(request: Request) -> dict[str, object]: alerts_last_channel_results = [ str(item) for item in cast(list[object], alert_status.get("last_channel_results", [])) ] - strategy_stat_arb_enabled = bool( - getattr(rs, "strategy_enable_stat_arb_experiment", False)) + strategy_stat_arb_enabled = bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)) return { "execution_status": "running" if ctl.is_running else "stopped", @@ -755,8 +735,7 @@ async def _dashboard_backtesting_handler(request: Request) -> HTMLResponse: try: custom_fee_rate = ( - float(defaults["custom_fee_rate"] - ) if defaults["custom_fee_rate"].strip() else None + float(defaults["custom_fee_rate"]) if defaults["custom_fee_rate"].strip() else None ) fee_rate = await _fee_rate_for_profile( defaults["fee_profile"], custom_fee_rate, request=request @@ -767,8 +746,7 @@ async def _dashboard_backtesting_handler(request: Request) -> HTMLResponse: if not symbols_str.strip(): pairing_repo = ConfigPairingRepository(request.app.state.store) enabled = await pairing_repo.list_pairings(enabled_only=True) - symbols_str = ",".join( - f"{p.base_asset}/{p.quote_asset}" for p in enabled) + symbols_str = ",".join(f"{p.base_asset}/{p.quote_asset}" for p in enabled) config_dict: dict[str, object] = { "source": defaults["source"], @@ -882,8 +860,7 @@ async def _dashboard_backtesting_job_export(request: Request, job_id: str) -> Re return Response( content=orjson.dumps(payload).decode("utf-8"), media_type="application/x-jsonlines", - headers={ - "Content-Disposition": f"attachment; filename=backtest_{job_id[:8]}.jsonl"}, + headers={"Content-Disposition": f"attachment; filename=backtest_{job_id[:8]}.jsonl"}, ) diff --git a/src/arbitrade/logging/maintenance.py b/src/arbitrade/logging/maintenance.py index 5a3185c..38f1833 100644 --- a/src/arbitrade/logging/maintenance.py +++ b/src/arbitrade/logging/maintenance.py @@ -36,8 +36,7 @@ async def run_log_archive(store: PgStore, retention_days: int = _RETENTION_DAYS) repo = LogArchiveRepository(store) count = await repo.archive_before(cutoff) if count > 0: - _LOG.info("log_archive_complete", - cutoff=cutoff.isoformat(), archived=count) + _LOG.info("log_archive_complete", cutoff=cutoff.isoformat(), archived=count) return count diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index d847aa2..a86b90e 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -242,6 +242,12 @@ class AuditRepository: async def insert(self, record: AuditRecord) -> None: async with self._store.pool.acquire() as conn: + payload = None + if record.payload is not None: + try: + payload = orjson.dumps(record.payload).decode("utf-8") + except Exception: + payload = None await conn.execute( """ INSERT INTO audit_events ( @@ -258,8 +264,7 @@ class AuditRepository: record.actor, record.event_type, record.decision, - (None if record.payload is None else orjson.dumps( - record.payload).decode("utf-8")), + payload, record.correlation_id, ) @@ -279,6 +284,9 @@ class AuditRepository: for row in rows: payload: dict[str, Any] | None = None raw_payload = row["payload"] + correlation_id = None + if row["correlation_id"] is not None: + correlation_id = str(row["correlation_id"]) if isinstance(raw_payload, str) and raw_payload: decoded = orjson.loads(raw_payload) if isinstance(decoded, dict): @@ -291,10 +299,7 @@ class AuditRepository: event_type=str(row["event_type"]), decision=str(row["decision"]), payload=payload, - correlation_id=( - str(row["correlation_id"] - ) if row["correlation_id"] is not None else None - ), + correlation_id=correlation_id, ) ) @@ -354,6 +359,9 @@ class RuntimeStateRepository: balances: dict[str, Any] | None = None raw_balances = row["last_known_balances"] + kill_switch_reason = None + if row["kill_switch_reason"] is not None: + kill_switch_reason = str(row["kill_switch_reason"]) if isinstance(raw_balances, str) and raw_balances: decoded = orjson.loads(raw_balances) if isinstance(decoded, dict): @@ -363,10 +371,7 @@ class RuntimeStateRepository: snapshot_at=row["snapshot_at"], is_running=bool(row["is_running"]), kill_switch_active=bool(row["kill_switch_active"]), - kill_switch_reason=( - str(row["kill_switch_reason"] - ) if row["kill_switch_reason"] is not None else None - ), + kill_switch_reason=kill_switch_reason, open_trade_count=int(row["open_trade_count"]), last_known_balances=balances, note=str(row["note"]) if row["note"] is not None else None, @@ -773,11 +778,12 @@ class ConfigBacktestingDefaultsRepository: defaults.execution_latency_ms, ) if row: + starting_balances = None + if row["starting_balances"] is not None: + starting_balances = orjson.loads(row["starting_balances"]) + return ConfigBacktestingDefaults( - starting_balances=( - orjson.loads(row["starting_balances"] - ) if row["starting_balances"] else None - ), + starting_balances=starting_balances, trade_capital=row["trade_capital"], min_profit_threshold=row["min_profit_threshold"], slippage_bps=row["slippage_bps"], @@ -795,11 +801,11 @@ class ConfigBacktestingDefaultsRepository: LIMIT 1 """) if row: + starting_balances = None + if row["starting_balances"] is not None: + starting_balances = orjson.loads(row["starting_balances"]) return ConfigBacktestingDefaults( - starting_balances=( - orjson.loads(row["starting_balances"] - ) if row["starting_balances"] else None - ), + starting_balances=starting_balances, trade_capital=row["trade_capital"], min_profit_threshold=row["min_profit_threshold"], slippage_bps=row["slippage_bps"], @@ -833,11 +839,11 @@ class ConfigBacktestingDefaultsRepository: defaults.execution_latency_ms, ) if row: + starting_balances = None + if row["starting_balances"] is not None: + starting_balances = orjson.loads(row["starting_balances"]) return ConfigBacktestingDefaults( - starting_balances=( - orjson.loads(row["starting_balances"] - ) if row["starting_balances"] else None - ), + starting_balances=starting_balances, trade_capital=row["trade_capital"], min_profit_threshold=row["min_profit_threshold"], slippage_bps=row["slippage_bps"], @@ -900,20 +906,20 @@ class KrakenAccountSnapshotRepository: """) if row is None: return None + trade_balance_raw = None + fee_schedule_raw = None + if row["trade_balance_raw"] is not None: + trade_balance_raw = orjson.loads(row["trade_balance_raw"]) + if row["fee_schedule_raw"] is not None: + fee_schedule_raw = orjson.loads(row["fee_schedule_raw"]) return KrakenAccountSnapshot( snapshot_at=row["snapshot_at"], fee_tier=row["fee_tier"], maker_fee=row["maker_fee"], taker_fee=row["taker_fee"], thirty_day_volume=row["thirty_day_volume"], - trade_balance_raw=( - orjson.loads(row["trade_balance_raw"] - ) if row["trade_balance_raw"] else None - ), - fee_schedule_raw=( - orjson.loads(row["fee_schedule_raw"] - ) if row["fee_schedule_raw"] else None - ), + trade_balance_raw=trade_balance_raw, + fee_schedule_raw=fee_schedule_raw, ) @@ -1073,6 +1079,12 @@ class LogRepository: async def insert(self, record: LogRecord) -> None: async with self._store.pool.acquire() as conn: + context = None + if record.context: + try: + context = orjson.dumps(record.context).decode("utf-8") + except Exception: + context = None await conn.execute( """ INSERT INTO app_logs (recorded_at, level, logger, message, context) @@ -1082,8 +1094,7 @@ class LogRepository: record.level, record.logger, record.message, - orjson.dumps(record.context).decode( - "utf-8") if record.context else None, + context, ) async def query(