diff --git a/src/arbitrade/api/routes.py b/src/arbitrade/api/routes.py index 0a95d68..3f4dee9 100644 --- a/src/arbitrade/api/routes.py +++ b/src/arbitrade/api/routes.py @@ -19,6 +19,30 @@ from arbitrade.api.auth import require_dashboard_auth from arbitrade.api.control_state import DashboardControlState from arbitrade.config.pairing_sync import sync_pairings_from_kraken from arbitrade.config.service import ConfigPairing +from arbitrade.dashboard.context import ( + _backtesting_panel_context, + _recent_backtest_reports, +) +from arbitrade.dashboard.dashboard import ( + _alert_status_snapshot, + _dashboard_backtesting_job_handler, + _dashboard_backtesting_job_export, + _dashboard_backtesting_handler, + _dashboard_ctl_cfg, + _dashboard_ctl_start, + _dashboard_ctl_stop, + _dashboard_ctl_kill, + _dashboard_metrics, + _dashboard_overview, + _dashboard_config_context, + _dashboard_charts, + _dashboard_controls, + _dashboard_audit, + _dashboard_response, + _dashboard_pairings_response, + _pairing_repo, + _toggle_pairing +) from arbitrade.detection.graph import CurrencyGraph, TriangularCycle from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import ( @@ -36,7 +60,8 @@ public_router = APIRouter() def _resolve_templates_directory() -> str: # Support source layout, Docker runtime (/app), and installed package data. - source_layout_path = Path(__file__).resolve().parents[3] / "web" / "templates" + source_layout_path = Path( + __file__).resolve().parents[3] / "web" / "templates" if source_layout_path.is_dir(): return str(source_layout_path) @@ -45,7 +70,8 @@ def _resolve_templates_directory() -> str: return str(docker_runtime_path) try: - package_path = resources.files("arbitrade").joinpath("web", "templates") + package_path = resources.files( + "arbitrade").joinpath("web", "templates") if package_path.is_dir(): return str(package_path) except (ModuleNotFoundError, AttributeError): @@ -59,730 +85,6 @@ _BACKTEST_ROOT = Path(__file__).resolve().parents[3] _BACKTEST_RUN_LOCK = Lock() -def _format_metric(value: float | None, *, precision: int = 2, suffix: str = "") -> str: - if value is None: - return "—" - return f"{value:.{precision}f}{suffix}" - - -async def _dashboard_metrics(request: Request) -> dict[str, str]: - metrics = await request.app.state.metrics.compute() - return { - "realized_pnl": _format_metric(metrics.realized_pnl_usd, precision=2, suffix=" USD"), - "win_rate": _format_metric( - metrics.win_rate * 100.0 if metrics.win_rate is not None else None, - precision=1, - suffix="%", - ), - "avg_trade_duration": _format_metric( - metrics.avg_trade_duration_seconds, precision=1, suffix=" s" - ), - "opportunities_per_minute": _format_metric( - metrics.opportunities_per_minute, precision=1, suffix=" /min" - ), - "fill_rate": _format_metric( - metrics.fill_rate * 100.0 if metrics.fill_rate is not None else None, - precision=1, - suffix="%", - ), - "latency_p50": _format_metric(metrics.latency_p50_seconds, precision=3, suffix=" s"), - "latency_p95": _format_metric(metrics.latency_p95_seconds, precision=3, suffix=" s"), - "latency_p99": _format_metric(metrics.latency_p99_seconds, precision=3, suffix=" s"), - "generated_at": datetime.now(UTC).isoformat(), - } - - -async def _dashboard_overview(request: Request) -> dict[str, object]: - store: PgStore = request.app.state.store - async with store.pool.acquire() as conn: - portfolio_row = await conn.fetchrow(""" - SELECT balances, total_value_usd - FROM portfolio_snapshots - ORDER BY snapshot_at DESC - LIMIT 1 - """) - open_trades = await conn.fetch(""" - SELECT trade_ref, status, started_at, cycle - FROM trades - WHERE finished_at IS NULL - ORDER BY started_at DESC - LIMIT 5 - """) - rpnl = await conn.fetchrow(""" - SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) - FROM trades - """) - latest_opportunities = await conn.fetch(""" - SELECT cycle, net_pct, est_profit, detected_at - FROM opportunities - ORDER BY detected_at DESC - LIMIT 5 - """) - - # Query equity from kraken_account_snapshots - equity_value = "—" - try: - equity_row = await conn.fetchrow(""" - SELECT trade_balance_raw - FROM kraken_account_snapshots - ORDER BY snapshot_at DESC - LIMIT 1 - """) - if equity_row is not None and equity_row["trade_balance_raw"] is not None: - tb_raw = equity_row["trade_balance_raw"] - if isinstance(tb_raw, str): - tb_raw = json.loads(tb_raw) - if isinstance(tb_raw, dict): - eb = tb_raw.get("eb") - equity_value = f"{float(eb):.2f} USD" if eb is not None else "—" - except Exception: - pass - - # Query latest Kraken account snapshot for fee info - fee_tier = "—" - maker_fee = "—" - taker_fee = "—" - thirty_day_volume = "—" - try: - acct_row = await conn.fetchrow(""" - SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume - FROM kraken_account_snapshots - ORDER BY snapshot_at DESC - LIMIT 1 - """) - if acct_row is not None: - fee_tier = str(acct_row["fee_tier"]) if acct_row["fee_tier"] is not None else "—" - maker_fee = ( - f"{float(acct_row['maker_fee']):.4%}" - if acct_row["maker_fee"] is not None - else "—" - ) - taker_fee = ( - f"{float(acct_row['taker_fee']):.4%}" - if acct_row["taker_fee"] is not None - else "—" - ) - thirty_day_volume = ( - f"{float(acct_row['thirty_day_volume']):.2f}" - if acct_row["thirty_day_volume"] is not None - else "—" - ) - except Exception: - pass - - balances_value = "—" - total_value = "—" - if portfolio_row is not None: - balances_raw = portfolio_row["balances"] - total_value_raw = portfolio_row["total_value_usd"] - if isinstance(balances_raw, str) and balances_raw: - try: - parsed = json.loads(balances_raw) - if isinstance(parsed, dict): - non_zero = {k: float(v) for k, v in parsed.items() if float(v) > 0.0} - if non_zero: - balances_value = "
".join( - f"{v:.6g} {k}" for k, v in sorted(non_zero.items()) - ) - else: - balances_value = "No balances" - else: - balances_value = str(balances_raw) - except (json.JSONDecodeError, ValueError, TypeError): - balances_value = str(balances_raw) - elif balances_raw is not None: - balances_value = str(balances_raw) - if total_value_raw is not None: - total_value = f"{float(total_value_raw):.2f} USD" - - open_trade_rows = [ - { - "trade_ref": str(r["trade_ref"]), - "status": str(r["status"]), - "started_at": ( - r["started_at"].isoformat() if isinstance(r["started_at"], datetime) else "—" - ), - "cycle": str(r["cycle"]) if r["cycle"] is not None else "—", - } - for r in open_trades - ] - opportunity_rows = [ - { - "cycle": str(r["cycle"]), - "net_pct": f"{float(r['net_pct']):.2f}%" if r["net_pct"] is not None else "—", - "est_profit": ( - f"{float(r['est_profit']):.2f} USD" if r["est_profit"] is not None else "—" - ), - "detected_at": ( - r["detected_at"].isoformat() if isinstance(r["detected_at"], datetime) else "—" - ), - } - for r in latest_opportunities - ] - - return { - "status": "live", - "generated_at": datetime.now(UTC).isoformat(), - "balances": balances_value, - "total_value": total_value, - "equity": equity_value, - "open_trade_count": len(open_trade_rows), - "open_trades": open_trade_rows, - "realized_pnl_total": f"{float(rpnl[0]):.2f} USD" if rpnl else "—", - "opportunities": opportunity_rows, - "fee_tier": fee_tier, - "maker_fee": maker_fee, - "taker_fee": taker_fee, - "thirty_day_volume": thirty_day_volume, - "fee_source": "API" if fee_tier != "—" else "—", - } - - -async def _dashboard_charts(request: Request) -> dict[str, object]: - store: PgStore = request.app.state.store - async with store.pool.acquire() as conn: - rows = await conn.fetch(""" - SELECT detected_at, cycle, net_pct, est_profit - FROM opportunities - ORDER BY detected_at DESC - LIMIT 10 - """) - - cr = list(reversed(rows)) - labels = [] - for index, row in enumerate(cr): - if isinstance(row["detected_at"], datetime): - labels.append(row["detected_at"].isoformat()) - else: - labels.append(f"opportunity-{index + 1}") - np = [float(row["net_pct"]) if row["net_pct"] is not None else 0.0 for row in cr] - ep = [float(row["est_profit"]) if row["est_profit"] is not None else 0.0 for row in cr] - cycles = [str(row["cycle"]) for row in cr] - - return { - "labels": labels, - "net_pct_values": np, - "est_profit_values": ep, - "cycles": cycles, - "has_chart_data": bool(cr), - "generated_at": datetime.now(UTC).isoformat(), - } - - -def _dashboard_controls_state(request: Request) -> DashboardControlState: - return cast(DashboardControlState, request.app.state.dashboard_controls) - - -def _audit_repository(request: Request) -> AuditRepository | None: - repository = getattr(request.app.state, "audit_repository", None) - return cast(AuditRepository | None, repository) - - -async def _record_audit( - request: Request, - *, - actor: str, - event_type: str, - decision: str, - payload: dict[str, object] | None = None, -) -> None: - repository = _audit_repository(request) - if repository is None: - return - correlation_id = request.headers.get("x-request-id") - if payload is not None: - ret_pl = {str(key): payload[key] for key in payload} - else: - ret_pl = None - await repository.insert( - AuditRecord( - occurred_at=datetime.now(UTC), - actor=actor, - event_type=event_type, - decision=decision, - payload=ret_pl, - correlation_id=correlation_id, - ) - ) - - -async def _dashboard_audit(request: Request, *, limit: int = 15) -> dict[str, object]: - repository = _audit_repository(request) - if repository is None: - return { - "entries": [], - "generated_at": datetime.now(UTC).isoformat(), - } - - records = await repository.list_recent(limit=limit) - entries: list[dict[str, str]] = [] - for record in records: - payload_text = "—" - if record.payload: - payload_text = json.dumps(record.payload) - entries.append( - { - "occurred_at": record.occurred_at.isoformat(), - "actor": record.actor, - "event_type": record.event_type, - "decision": record.decision, - "payload": payload_text, - "correlation_id": record.correlation_id or "—", - } - ) - - return { - "entries": entries, - "generated_at": datetime.now(UTC).isoformat(), - } - - -def _alert_notifier(request: Request) -> SupportsAlerts | None: - notifier = getattr(request.app.state, "alert_notifier", None) - return cast(SupportsAlerts | None, notifier) - - -def _alert_status_snapshot(request: Request) -> dict[str, object]: - notifier = getattr(request.app.state, "alert_notifier", None) - if isinstance(notifier, SupportsAlertStatus): - return notifier.status_snapshot() - return { - "enabled": False, - "has_channels": False, - "configured_channels": [], - "min_severity": "—", - "dedup_seconds": 0.0, - "last_result": "unavailable", - "last_attempted_at": None, - "last_success_at": None, - "last_error": None, - "last_event": None, - "last_channel_results": [], - } - - -async def _dashboard_config_context(request: Request) -> dict[str, object]: - ctl = _dashboard_controls_state(request) - rs = request.app.state.settings - max_trade_capital_usd = ( - f"{float(rs.max_trade_capital_usd):.2f} USD" - if rs.max_trade_capital_usd is not None - else "—" - ) - max_trade_capital_usd_value = ( - f"{float(rs.max_trade_capital_usd):.2f}" if rs.max_trade_capital_usd is not None else "" - ) - max_concurrent_trades = ( - str(rs.max_concurrent_trades) if rs.max_concurrent_trades is not None else "—" - ) - max_concurrent_trades_value = ( - str(rs.max_concurrent_trades) if rs.max_concurrent_trades is not None else "" - ) - max_exposure_per_asset = ( - f"{float(rs.max_exposure_per_asset_usd):.2f} USD" - if rs.max_exposure_per_asset_usd is not None - else "—" - ) - max_exposure_per_asset_value = ( - f"{float(rs.max_exposure_per_asset_usd):.2f}" - if rs.max_exposure_per_asset_usd is not None - else "" - ) - daily_loss_limit = ( - f"{float(rs.daily_loss_limit_usd):.2f} USD" if rs.daily_loss_limit_usd is not None else "—" - ) - daily_loss_limit_value = ( - f"{float(rs.daily_loss_limit_usd):.2f}" if rs.daily_loss_limit_usd is not None else "" - ) - cumulative_loss_limit = ( - f"{float(rs.cumulative_loss_limit_usd):.2f} USD" - if rs.cumulative_loss_limit_usd is not None - else "—" - ) - cumulative_loss_limit_value = ( - f"{float(rs.cumulative_loss_limit_usd):.2f}" - if rs.cumulative_loss_limit_usd is not None - else "" - ) - max_source_latency = ( - f"{float(rs.max_source_latency_ms):.1f} ms" if rs.max_source_latency_ms is not None else "—" - ) - max_source_latency_value = ( - f"{float(rs.max_source_latency_ms):.1f}" if rs.max_source_latency_ms is not None else "" - ) - max_apply_latency = ( - f"{float(rs.max_apply_latency_ms):.1f} ms" if rs.max_apply_latency_ms is not None else "—" - ) - max_apply_latency_value = ( - f"{float(rs.max_apply_latency_ms):.1f}" if rs.max_apply_latency_ms is not None else "" - ) - max_consecutive_failures = ( - str(rs.max_consecutive_failures) if rs.max_consecutive_failures is not None else "—" - ) - max_consecutive_failures_value = ( - str(rs.max_consecutive_failures) if rs.max_consecutive_failures is not None else "" - ) - strategy_stat_arb_enabled = bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)) - - return { - # Runtime - "app_env": rs.app_env, - "app_host": rs.app_host, - "app_port": str(rs.app_port), - "log_level": rs.log_level, - "log_json": "checked" if rs.log_json else "", - "paper_trading_mode": "checked" if rs.paper_trading_mode else "", - "trade_capital_usd": f"{float(rs.trade_capital_usd):.2f}", - "trade_capital_usd_value": f"{float(rs.trade_capital_usd):.2f}", - "max_trade_capital_usd": max_trade_capital_usd, - "max_trade_capital_usd_value": max_trade_capital_usd_value, - "max_concurrent_trades": max_concurrent_trades, - "max_concurrent_trades_value": max_concurrent_trades_value, - "max_exposure_per_asset": max_exposure_per_asset, - "max_exposure_per_asset_value": max_exposure_per_asset_value, - "quote_balance_asset": rs.quote_balance_asset, - "min_order_size_usd": ( - f"{float(rs.min_order_size_usd):.2f}" if rs.min_order_size_usd is not None else "" - ), - "min_order_size_usd_value": ( - f"{float(rs.min_order_size_usd):.2f}" if rs.min_order_size_usd is not None else "" - ), - "tradable_pairs_value": ", ".join(ctl.tradable_pairs), - "strategy_mode": ctl.strategy_mode, - "strategy_stat_arb_enabled": strategy_stat_arb_enabled, - "strategy_profit_threshold": f"{ctl.strategy_profit_threshold:.6f}", - "strategy_max_depth_levels": str(ctl.strategy_max_depth_levels), - # Alerts - "alerts_enabled": "checked" if rs.alerts_enabled else "", - "alert_min_severity": rs.alert_min_severity, - "alert_dedup_seconds": f"{rs.alert_dedup_seconds:.0f}", - "alert_on_trade_events": "checked" if rs.alert_on_trade_events else "", - "alert_on_error_events": "checked" if rs.alert_on_error_events else "", - "alert_on_threshold_events": "checked" if rs.alert_on_threshold_events else "", - "alert_on_system_events": "checked" if rs.alert_on_system_events else "", - "telegram_alerts_enabled": "checked" if rs.telegram_alerts_enabled else "", - "telegram_bot_token": rs.telegram_bot_token or "", - "telegram_chat_id": rs.telegram_chat_id or "", - "discord_alerts_enabled": "checked" if rs.discord_alerts_enabled else "", - "discord_webhook_url": rs.discord_webhook_url or "", - "email_alerts_enabled": "checked" if rs.email_alerts_enabled else "", - "email_smtp_host": rs.email_smtp_host or "", - "email_smtp_port": str(rs.email_smtp_port), - "email_smtp_username": rs.email_smtp_username or "", - "email_smtp_password": "", - "email_alert_from": rs.email_alert_from or "", - "email_alert_to": rs.email_alert_to or "", - "email_smtp_use_tls": "checked" if rs.email_smtp_use_tls else "", - # Kraken - "kraken_rest_url": rs.kraken_rest_url, - "kraken_ws_url": rs.kraken_ws_url, - "kraken_private_rate_limit_seconds": f"{rs.kraken_private_rate_limit_seconds:.2f}", - "kraken_http_timeout_seconds": f"{rs.kraken_http_timeout_seconds:.1f}", - "kraken_retry_attempts": str(rs.kraken_retry_attempts), - "kraken_retry_base_delay_seconds": f"{rs.kraken_retry_base_delay_seconds:.2f}", - "kraken_api_key": rs.kraken_api_key or "", - "kraken_api_secret": "", - "kraken_api_key_permissions": rs.kraken_api_key_permissions, - "ws_heartbeat_timeout_seconds": f"{rs.ws_heartbeat_timeout_seconds:.1f}", - "ws_max_staleness_seconds": f"{rs.ws_max_staleness_seconds:.1f}", - # Risk - "daily_loss_limit": daily_loss_limit, - "daily_loss_limit_value": daily_loss_limit_value, - "cumulative_loss_limit": cumulative_loss_limit, - "cumulative_loss_limit_value": cumulative_loss_limit_value, - "max_source_latency": max_source_latency, - "max_source_latency_value": max_source_latency_value, - "max_apply_latency": max_apply_latency, - "max_apply_latency_value": max_apply_latency_value, - "max_consecutive_failures": max_consecutive_failures, - "max_consecutive_failures_value": max_consecutive_failures_value, - "kill_switch_active": "checked" if rs.kill_switch_active else "", - # Strategy stat-arb - "strategy_stat_arb_lookback_window": str(rs.strategy_stat_arb_lookback_window), - "strategy_stat_arb_entry_zscore": f"{rs.strategy_stat_arb_entry_zscore:.1f}", - "strategy_stat_arb_exit_zscore": f"{rs.strategy_stat_arb_exit_zscore:.1f}", - "strategy_stat_arb_max_holding_seconds": f"{rs.strategy_stat_arb_max_holding_seconds:.0f}", - # UI - "config_endpoint": "/dashboard/control/config", - } - - -def _dashboard_controls(request: Request) -> dict[str, object]: - ctl = _dashboard_controls_state(request) - rs = request.app.state.settings - alert_status = _alert_status_snapshot(request) - last_event = alert_status.get("last_event") - last_event_title = "—" - if isinstance(last_event, dict): - title_value = last_event.get("title") - if isinstance(title_value, str): - last_event_title = title_value - - cc = alert_status.get("configured_channels") - cd = "—" - if isinstance(cc, list) and cc: - cd = ", ".join(str(channel) for channel in cc) - - ddsr = alert_status.get("dedup_seconds", 0.0) - dds = float(ddsr) if isinstance(ddsr, int | float) else 0.0 - tpd = ", ".join(ctl.tradable_pairs) if ctl.tradable_pairs else "All" - max_trade_capital_usd = ( - f"{float(rs.max_trade_capital_usd):.2f} USD" - if rs.max_trade_capital_usd is not None - else "—" - ) - max_trade_capital_usd_value = ( - f"{float(rs.max_trade_capital_usd):.2f}" if rs.max_trade_capital_usd is not None else "" - ) - max_concurrent_trades = ( - str(rs.max_concurrent_trades) if rs.max_concurrent_trades is not None else "—" - ) - max_concurrent_trades_value = ( - str(rs.max_concurrent_trades) if rs.max_concurrent_trades is not None else "" - ) - alerts_last_channel_results = [ - str(item) for item in cast(list[object], alert_status.get("last_channel_results", [])) - ] - strategy_stat_arb_enabled = bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)) - - return { - "execution_status": "running" if ctl.is_running else "stopped", - "kill_switch_status": "active" if ctl.kill_switch.is_active else "inactive", - "kill_switch_reason": ctl.kill_switch.reason or "—", - "paper_trading_mode": "enabled" if rs.paper_trading_mode else "disabled", - "trade_capital_usd": f"{float(rs.trade_capital_usd):.2f} USD", - "trade_capital_usd_value": f"{float(rs.trade_capital_usd):.2f}", - "max_trade_capital_usd": max_trade_capital_usd, - "max_trade_capital_usd_value": max_trade_capital_usd_value, - "max_concurrent_trades": max_concurrent_trades, - "max_concurrent_trades_value": max_concurrent_trades_value, - "alerts_enabled": "enabled" if bool(alert_status.get("enabled", False)) else "disabled", - "alerts_channels": cd, - "alerts_min_severity": str(alert_status.get("min_severity", "—")), - "alerts_dedup_seconds": f"{dds:.0f}", - "alerts_last_result": str(alert_status.get("last_result", "unavailable")), - "alerts_last_attempted_at": str(alert_status.get("last_attempted_at") or "—"), - "alerts_last_success_at": str(alert_status.get("last_success_at") or "—"), - "alerts_last_event_title": last_event_title, - "alerts_last_error": str(alert_status.get("last_error") or "—"), - "alerts_last_channel_results": alerts_last_channel_results, - "tradable_pairs_display": tpd, - "tradable_pairs_value": ", ".join(ctl.tradable_pairs), - "strategy_mode": ctl.strategy_mode, - "strategy_stat_arb_enabled": strategy_stat_arb_enabled, - "strategy_profit_threshold": f"{ctl.strategy_profit_threshold:.6f}", - "strategy_max_depth_levels": str(ctl.strategy_max_depth_levels), - "updated_at": ctl.updated_at.isoformat(), - "start_endpoint": "/dashboard/control/start", - "stop_endpoint": "/dashboard/control/stop", - "kill_switch_endpoint": "/dashboard/control/kill-switch", - "config_endpoint": "/dashboard/control/config", - "chart_endpoint": "/dashboard/fragment/charts", - } - - -def _parse_form_body(body: bytes) -> dict[str, str]: - parsed = parse_qs(body.decode("utf-8"), keep_blank_values=True) - result: dict[str, str] = {} - for key, values in parsed.items(): - if not values: - continue - if len(values) == 1: - result[key] = values[0] - else: - # Multi-value fields (e.g. checkboxes) -> join with comma - result[key] = ",".join(values) - return result - - -def _form_bool(value: str | None) -> bool: - if value is None: - return False - return value.lower() in {"1", "true", "yes", "on"} - - -def _parse_comma_separated_list(value: str | None) -> list[str]: - if value is None: - return [] - - items: list[str] = [] - for raw_item in value.split(","): - item = raw_item.strip().upper() - if item and item not in items: - items.append(item) - return items - - -def _normalize_fee_profile(profile: str) -> str: - return profile.strip().lower().replace("-", "_") - - -async def _fee_rate_for_profile( - profile: str, - custom_fee_rate: float | None, - request: Request | None = None, -) -> float: - """Resolve fee rate from profile name. - - - 'api': fetches latest maker_fee from kraken_account_snapshots (requires request) - - 'custom': uses custom_fee_rate - - legacy 'standard'/'maker_heavy'/'taker_heavy': still supported via hardcoded - fallback, logged at warning level - """ - normalized = _normalize_fee_profile(profile) - - if normalized == "api": - if request is None: - raise ValueError("api fee profile requires request context") - store: PgStore = request.app.state.store - repo = KrakenAccountSnapshotRepository(store) - latest = await repo.latest_snapshot() - if latest is not None and latest.maker_fee is not None: - return latest.maker_fee - # Fallback to standard if no snapshot yet - return 0.0026 - - if normalized == "custom": - if custom_fee_rate is None: - raise ValueError("custom fee profile requires custom_fee_rate") - if custom_fee_rate < 0.0: - raise ValueError("custom_fee_rate must be >= 0") - return custom_fee_rate - - # Legacy hardcoded profiles (kept for backward compat, but soft-deprecated) - profile_map = { - "standard": 0.0026, - "maker_heavy": 0.0016, - "taker_heavy": 0.0035, - } - if normalized in profile_map: - return profile_map[normalized] - - valid = ", ".join(sorted(list(profile_map.keys()) + ["api", "custom"])) - raise ValueError(f"fee_profile must be one of: {valid}") - - -def _parse_balances(raw: str) -> dict[str, float]: - balances: dict[str, float] = {} - for entry in raw.split(","): - stripped = entry.strip() - if not stripped: - continue - if "=" not in stripped: - raise ValueError("starting_balances must be in ASSET=value format") - asset, value = stripped.split("=", 1) - balances[asset.strip().upper()] = float(value) - if not balances: - raise ValueError("starting_balances must include at least one balance") - return balances - - -def _resolve_workspace_path(raw: str) -> Path: - candidate = Path(raw.strip()) - if not candidate.is_absolute(): - candidate = (_BACKTEST_ROOT / candidate).resolve() - else: - candidate = candidate.resolve() - return candidate - - -def _display_path(path: Path) -> str: - try: - return str(path.relative_to(_BACKTEST_ROOT)) - except ValueError: - return str(path) - - -def _build_cycles_from_events( - symbols: set[str], -) -> tuple[dict[str, list[TriangularCycle]], list[str]]: - graph = CurrencyGraph() - for symbol in sorted(symbols): - if "/" not in symbol: - continue - base, quote = symbol.upper().split("/", 1) - graph.add_pair(base, quote, f"{base}/{quote}") - cycles = graph.triangular_cycles() - return graph.index_cycles_by_pair(cycles), sorted(symbols) - - -async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]: - """Fetch recent backtest jobs from DB.""" - store: PgStore = request.app.state.store - repo = BacktestJobRepository(store) - jobs = await repo.list_jobs(limit=20) - return [ - { - "job_id": j.id or "", - "run_at": j.created_at.isoformat() if j.created_at else "—", - "events_path": j.events_path, - "status": j.status, - "config": j.config or {}, - "report": j.report or {}, - "error": j.error, - } - for j in jobs - ] - - -async def _backtesting_panel_context( - request: Request, - *, - status: str = "idle", - message: str = "Configure a replay run and execute backtest.", - latest_report: dict[str, object] | None = None, - defaults: dict[str, str] | None = None, -) -> dict[str, object]: - default_values = { - "symbols": "", - "start_time": "", - "end_time": "", - "starting_balances": "USD=1000.0", - "trade_capital": "100.0", - "min_profit_threshold": "0.0005", - "fee_profile": "api", - "custom_fee_rate": "", - "slippage_bps": "4.0", - "execution_latency_ms": "20.0", - } - if defaults is not None: - default_values.update(defaults) - - reports = await _recent_backtest_reports(request) - latest = latest_report or (reports[0] if reports else None) - - return { - "status": status, - "message": message, - "flash_message": "", - "no_enabled_pairings": False, - "latest_report": latest, - "recent_reports": reports, - "run_endpoint": "/dashboard/backtesting/run", - "reports_endpoint": "/dashboard/api/backtesting/reports", - **default_values, - } - - -async def _dashboard_response( - request: Request, template_name: str = "dashboard.html" -) -> HTMLResponse: - return templates.TemplateResponse( - request=request, - name=template_name, - context={ - "title": "Arbitrade Dashboard", - "request": request, - "metrics_endpoint": "/dashboard/fragment/metrics", - "overview_endpoint": "/dashboard/fragment/overview", - "controls_endpoint": "/dashboard/fragment/controls", - "charts_endpoint": "/dashboard/fragment/charts", - "stream_endpoint": "/dashboard/stream/metrics", - "overview_stream_endpoint": "/dashboard/stream/overview", - }, - ) - - async def _health_response(request: Request) -> HTMLResponse: return templates.TemplateResponse( request=request, @@ -983,94 +285,7 @@ async def dashboard_backtesting_reports(request: Request) -> JSONResponse: @router.post("/dashboard/backtesting/run", response_class=HTMLResponse) async def dashboard_backtesting_run(request: Request) -> HTMLResponse: """Submit a backtest job to the async queue. Returns panel with job list.""" - form = _parse_form_body(await request.body()) - defaults = { - "starting_balances": form.get("starting_balances", "USD=1000.0"), - "trade_capital": form.get("trade_capital", "100.0"), - "min_profit_threshold": form.get("min_profit_threshold", "0.0005"), - "fee_profile": _normalize_fee_profile(form.get("fee_profile", "api")), - "custom_fee_rate": form.get("custom_fee_rate", ""), - "slippage_bps": form.get("slippage_bps", "4.0"), - "execution_latency_ms": form.get("execution_latency_ms", "20.0"), - "start_time": form.get("start_time", ""), - "end_time": form.get("end_time", ""), - "symbols": form.get("symbols", ""), - "source": form.get("source", "db"), - } - - try: - custom_fee_rate = ( - float(defaults["custom_fee_rate"]) if defaults["custom_fee_rate"].strip() else None - ) - fee_rate = await _fee_rate_for_profile( - defaults["fee_profile"], custom_fee_rate, request=request - ) - - # Use enabled pairings from DB when none selected - symbols_str = defaults["symbols"] - if not symbols_str.strip(): - pairing_repo = ConfigPairingRepository(request.app.state.store) - enabled = await pairing_repo.list_pairings(enabled_only=True) - symbols_str = ",".join(f"{p.base_asset}/{p.quote_asset}" for p in enabled) - - config_dict: dict[str, object] = { - "source": defaults["source"], - "starting_balances": defaults["starting_balances"], - "trade_capital": float(defaults["trade_capital"]), - "min_profit_threshold": float(defaults["min_profit_threshold"]), - "fee_rate": fee_rate, - "fee_profile": defaults["fee_profile"], - "slippage_bps": float(defaults["slippage_bps"]), - "execution_latency_ms": float(defaults["execution_latency_ms"]), - "start_time": defaults["start_time"], - "end_time": defaults["end_time"], - "symbols": symbols_str, - } - - store = request.app.state.store - repo = BacktestJobRepository(store) - events_label = symbols_str if symbols_str else "DB-sourced" - job = await repo.create_job(events_label, config_dict) - msg_job = job.id[:8] if job.id else "unknown" - - queue = request.app.state.backtest_queue - await queue.put((job.id or "", config_dict)) - - await _record_audit( - request, - actor="dashboard_user", - event_type="dashboard.backtesting.submit", - decision="queued", - payload={"job_id": job.id, "source": defaults["source"]}, - ) - - context = await _backtesting_panel_context( - request, - status="submitted", - message=f"Job {msg_job}... queued. Refresh to see results.", - defaults=defaults, - ) - context["flash_message"] = f"Backtest job {msg_job}... submitted successfully." - except ValueError as exc: - context = await _backtesting_panel_context( - request, - status="failed", - message=str(exc), - defaults=defaults, - ) - except Exception as exc: - context = await _backtesting_panel_context( - request, - status="failed", - message=f"Unexpected error: {exc}", - defaults=defaults, - ) - - return templates.TemplateResponse( - request=request, - name="partials/backtesting_panel.html", - context={"request": request, **context}, - ) + return await _dashboard_backtesting_handler(request) @router.post("/dashboard/backtesting/job/{job_id}/delete", response_class=HTMLResponse) @@ -1088,227 +303,32 @@ async def dashboard_backtesting_delete(request: Request, job_id: str) -> HTMLRes @router.get("/dashboard/backtesting/job/{job_id}", response_class=HTMLResponse) async def dashboard_backtesting_job_detail(request: Request, job_id: str) -> HTMLResponse: - store = request.app.state.store - repo = BacktestJobRepository(store) - job = await repo.get_job(job_id) - if job is None: - return HTMLResponse("

Job not found

", status_code=404) - - report_html = "
No report yet
" - if job.report: - i = job.id[:8] if job.id else "unknown" - r = job.report - report_html = ( - f"
" - f"
Job {i}... Report
" - f"
Status: {job.status}
" - f"
Events: {job.events_path}
" - f"
Processed: {r.get('processed_events', '—')}
" - f"
Opportunities: {r.get('opportunities_seen', '—')}
" - f"
Trades: {r.get('trades_executed', '—')}
" - f"
Realized P&L: {r.get('realized_pnl_usd', '—')} USD
" - f"
Max drawdown: {r.get('max_drawdown_usd', '—')} USD
" - f"
Win rate: {r.get('win_rate', '—')}
" - f"
Fill rate: {r.get('fill_rate', '—')}
" - f"
Latency p50: {r.get('execution_latency_p50_ms', '—')} ms
" - f"
Created: {job.created_at}
" - f"
" - ) - - return HTMLResponse(report_html) + return await _dashboard_backtesting_job_handler(request, job_id=job_id) @router.get("/dashboard/backtesting/job/{job_id}/export", response_class=Response) async def dashboard_backtesting_export(request: Request, job_id: str) -> Response: - store = request.app.state.store - repo = BacktestJobRepository(store) - job = await repo.get_job(job_id) - if job is None: - return Response("Job not found", status_code=404) - - payload: dict[str, object] = { - "job_id": job_id, - "status": job.status, - "events_path": job.events_path, - "created_at": job.created_at.isoformat() if job.created_at else None, - } - if job.report: - payload["report"] = job.report - if job.config: - payload["config"] = job.config - - return Response( - content=orjson.dumps(payload).decode("utf-8"), - media_type="application/x-jsonlines", - headers={"Content-Disposition": f"attachment; filename=backtest_{job_id[:8]}.jsonl"}, - ) + return await _dashboard_backtesting_job_export(request, job_id=job_id) @router.post("/dashboard/control/start", response_class=HTMLResponse) async def dashboard_control_start(request: Request) -> HTMLResponse: - controls = _dashboard_controls_state(request) - controls.is_running = True - controls.mark_updated() - notifier = _alert_notifier(request) - if notifier is not None: - await notifier.notify( - category="system", - severity="info", - title="Execution started", - message="Dashboard control started execution.", - ) - await _record_audit( - request, - actor="dashboard_user", - event_type="dashboard.control.start", - decision="approved", - payload={"execution_status": "running"}, - ) - return templates.TemplateResponse( - request=request, - name="partials/controls.html", - context={"request": request, **_dashboard_controls(request)}, - ) + return await _dashboard_ctl_start(request) @router.post("/dashboard/control/stop", response_class=HTMLResponse) async def dashboard_control_stop(request: Request) -> HTMLResponse: - controls = _dashboard_controls_state(request) - controls.is_running = False - controls.mark_updated() - notifier = _alert_notifier(request) - if notifier is not None: - await notifier.notify( - category="system", - severity="warning", - title="Execution stopped", - message="Dashboard control stopped execution.", - ) - await _record_audit( - request, - actor="dashboard_user", - event_type="dashboard.control.stop", - decision="approved", - payload={"execution_status": "stopped"}, - ) - return templates.TemplateResponse( - request=request, - name="partials/controls.html", - context={"request": request, **_dashboard_controls(request)}, - ) + return await _dashboard_ctl_stop(request) @router.post("/dashboard/control/kill-switch", response_class=HTMLResponse) async def dashboard_control_kill_switch(request: Request) -> HTMLResponse: - controls = _dashboard_controls_state(request) - form = _parse_form_body(await request.body()) - reason = form.get("reason") or "manual" - controls.kill_switch.activate(reason=reason) - controls.is_running = False - controls.mark_updated() - notifier = _alert_notifier(request) - if notifier is not None: - await notifier.notify( - category="threshold", - severity="critical", - title="Kill switch activated", - message="Kill switch triggered from dashboard control.", - details={"reason": reason}, - ) - await _record_audit( - request, - actor="dashboard_user", - event_type="dashboard.control.kill_switch", - decision="approved", - payload={"reason": reason}, - ) - return templates.TemplateResponse( - request=request, - name="partials/controls.html", - context={"request": request, **_dashboard_controls(request)}, - ) + return await _dashboard_ctl_kill(request) @router.post("/dashboard/control/config", response_class=HTMLResponse) async def dashboard_control_config(request: Request) -> HTMLResponse: - ctl = _dashboard_controls_state(request) - rs = request.app.state.settings - form = _parse_form_body(await request.body()) - - if "trade_capital_usd" in form and form["trade_capital_usd"]: - rs.trade_capital_usd = float(form["trade_capital_usd"]) - if "max_trade_capital_usd" in form: - mtcv = form["max_trade_capital_usd"].strip() - rs.max_trade_capital_usd = float(mtcv) if mtcv else None - if "max_concurrent_trades" in form: - mcv = form["max_concurrent_trades"].strip() - rs.max_concurrent_trades = int(mcv) if mcv else None - - form_pairs = form.get("tradable_pairs") - ctl.tradable_pairs = _parse_comma_separated_list(form_pairs) - if "strategy_mode" in form and form["strategy_mode"].strip(): - strategy_mode = form["strategy_mode"].strip().lower() - allowed_strategy_modes = {"incremental", "paper", "live"} - if bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)): - allowed_strategy_modes.add("stat_arb_experiment") - if strategy_mode not in allowed_strategy_modes: - e = f"strategy_mode must be one of: {', '.join(sorted(allowed_strategy_modes))}" - raise ValueError(e) - ctl.strategy_mode = strategy_mode - if "strategy_profit_threshold" in form: - if form["strategy_profit_threshold"].strip(): - spt = float(form["strategy_profit_threshold"]) - ctl.strategy_profit_threshold = spt - if "strategy_max_depth_levels" in form: - if form["strategy_max_depth_levels"].strip(): - smdl = int(form["strategy_max_depth_levels"]) - ctl.strategy_max_depth_levels = smdl - - rs.paper_trading_mode = _form_bool(form.get("paper_trading_mode")) - ctl.mark_updated() - - notifier = _alert_notifier(request) - if notifier is not None: - await notifier.notify( - category="system", - severity="info", - title="Runtime config updated", - message="Dashboard control updated runtime risk and execution settings.", - details={ - "trade_capital_usd": f"{rs.trade_capital_usd}", - "max_trade_capital_usd": ( - "none" if rs.max_trade_capital_usd is None else f"{rs.max_trade_capital_usd}" - ), - "max_concurrent_trades": ( - "none" if rs.max_concurrent_trades is None else f"{rs.max_concurrent_trades}" - ), - "paper_trading_mode": "true" if rs.paper_trading_mode else "false", - }, - ) - await _record_audit( - request, - actor="dashboard_user", - event_type="dashboard.control.config", - decision="approved", - payload={ - "trade_capital_usd": rs.trade_capital_usd, - "max_trade_capital_usd": rs.max_trade_capital_usd, - "max_concurrent_trades": rs.max_concurrent_trades, - "paper_trading_mode": rs.paper_trading_mode, - "tradable_pairs": ctl.tradable_pairs, - "strategy_mode": ctl.strategy_mode, - "strategy_profit_threshold": ctl.strategy_profit_threshold, - "strategy_max_depth_levels": ctl.strategy_max_depth_levels, - }, - ) - - d_context = await _dashboard_config_context(request) - d_context["flash_message"] = "Configuration saved successfully." - return templates.TemplateResponse( - request=request, - name="partials/config.html", - context={"request": request, **d_context}, - ) + return await _dashboard_ctl_cfg(request) @router.get("/dashboard/stream/metrics") @@ -1355,11 +375,6 @@ async def health() -> JSONResponse: # ── Pairing API ───────────────────────────────────────────────────────────── - -def _pairing_repo(request: Request) -> ConfigPairingRepository: - return ConfigPairingRepository(request.app.state.store) - - @router.get("/dashboard/api/pairings", response_class=JSONResponse) async def dashboard_api_pairings( request: Request, @@ -1372,50 +387,15 @@ async def dashboard_api_pairings( order: str = "asc", ) -> JSONResponse: """List pairings with optional filters.""" - repo = _pairing_repo(request) - pairings = await repo.list_pairings() - - # Apply filters - if search: - search_lower = search.lower() - pairings = [ - p - for p in pairings - if search_lower in p.base_asset.lower() or search_lower in p.quote_asset.lower() - ] - if enabled is not None: - enabled_bool = enabled.lower() == "true" - pairings = [p for p in pairings if p.enabled == enabled_bool] - if source: - pairings = [p for p in pairings if p.source.lower() == source.lower()] - if base: - pairings = [p for p in pairings if p.base_asset.lower() == base.lower()] - if quote: - pairings = [p for p in pairings if p.quote_asset.lower() == quote.lower()] - - # Sort - reverse = order.lower() == "desc" - if sort == "base_asset": - pairings.sort(key=lambda p: p.base_asset, reverse=reverse) - elif sort == "quote_asset": - pairings.sort(key=lambda p: p.quote_asset, reverse=reverse) - elif sort == "enabled": - pairings.sort(key=lambda p: p.enabled, reverse=reverse) - - return JSONResponse( - [ - { - "id": p.id, - "base_asset": p.base_asset, - "quote_asset": p.quote_asset, - "pair": f"{p.base_asset}/{p.quote_asset}", - "enabled": p.enabled, - "source": p.source, - "created_at": p.created_at.isoformat() if p.created_at else None, - "updated_at": p.updated_at.isoformat() if p.updated_at else None, - } - for p in pairings - ] + return await _dashboard_pairings_response( + request, + search=search, + enabled=enabled, + source=source, + base=base, + quote=quote, + sort=sort, + order=order, ) @@ -1449,53 +429,7 @@ async def dashboard_pairings_fragment( @router.post("/dashboard/api/pairings/toggle") async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse: - """Toggle enabled/disabled for a pairing. Expects JSON or form body with base_asset and quote_asset.""" - ctype = request.headers.get("content-type", "") - if "application/json" in ctype: - body = await request.json() - else: - form = _parse_form_body(await request.body()) - body = form - - base_asset = str(body.get("base_asset", "")).upper() - quote_asset = str(body.get("quote_asset", "")).upper() - if not base_asset or not quote_asset: - return HTMLResponse("Missing base_asset or quote_asset", status_code=400) - - repo = _pairing_repo(request) - existing = await repo.get_pairing(base_asset, quote_asset) - if existing is None: - return HTMLResponse("Pairing not found", status_code=404) - - toggled = ConfigPairing( - base_asset=existing.base_asset, - quote_asset=existing.quote_asset, - enabled=not existing.enabled, - source=existing.source, - ) - await repo.update_pairing(base_asset, quote_asset, toggled) - - await _record_audit( - request, - actor="dashboard_user", - event_type="dashboard.pairings.toggle", - decision="approved", - payload={ - "base_asset": base_asset, - "quote_asset": quote_asset, - "enabled": toggled.enabled, - }, - ) - - # Return refreshed fragment - pairings_repo = _pairing_repo(request) - pairings = await pairings_repo.list_pairings() - pairings.sort(key=lambda p: (p.base_asset, p.quote_asset)) - return templates.TemplateResponse( - request=request, - name="partials/pairings_table.html", - context={"request": request, "pairings": pairings}, - ) + return await _toggle_pairing(request) @router.post("/dashboard/api/pairings/sync", response_class=HTMLResponse) diff --git a/src/arbitrade/dashboard/__init__.py b/src/arbitrade/dashboard/__init__.py new file mode 100644 index 0000000..2818d5a --- /dev/null +++ b/src/arbitrade/dashboard/__init__.py @@ -0,0 +1 @@ +"""Dashboard module for monitoring and controlling the arbitrage bot.""" diff --git a/src/arbitrade/dashboard/context.py b/src/arbitrade/dashboard/context.py new file mode 100644 index 0000000..c28dddd --- /dev/null +++ b/src/arbitrade/dashboard/context.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import json +from asyncio import Lock +from collections.abc import AsyncIterator +from datetime import UTC, datetime +from importlib import resources +from pathlib import Path +from typing import cast +from urllib.parse import parse_qs + +import orjson +from fastapi import APIRouter, Depends, Request, Response +from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse +from fastapi.templating import Jinja2Templates +from arbitrade.storage.repositories import ( + AuditRecord, + AuditRepository, + BacktestJobRepository, + ConfigPairingRepository, + KrakenAccountSnapshotRepository, + LogRepository, +) + + +async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]: + repo = BacktestJobRepository(request.app.state.store) + jobs = await repo.list_jobs(limit=5) + reports = [] + for job in jobs: + report: dict[str, object] = { + "id": str(job.id), + "status": job.status, + } + if job.created_at is not None: + report["created_at"] = job.created_at.isoformat() + if job.finished_at is not None: + report["finished_at"] = job.finished_at.isoformat() + reports.append(report) + return reports + + +async def _backtesting_panel_context( + request: Request, + *, + status: str = "idle", + message: str = "Configure a replay run and execute backtest.", + latest_report: dict[str, object] | None = None, + defaults: dict[str, str] | None = None, +) -> dict[str, object]: + default_values = { + "symbols": "", + "start_time": "", + "end_time": "", + "starting_balances": "USD=1000.0", + "trade_capital": "100.0", + "min_profit_threshold": "0.0005", + "fee_profile": "api", + "custom_fee_rate": "", + "slippage_bps": "4.0", + "execution_latency_ms": "20.0", + } + if defaults is not None: + default_values.update(defaults) + + reports = await _recent_backtest_reports(request) + latest = latest_report or (reports[0] if reports else None) + + return { + "status": status, + "message": message, + "flash_message": "", + "no_enabled_pairings": False, + "latest_report": latest, + "recent_reports": reports, + "run_endpoint": "/dashboard/backtesting/run", + "reports_endpoint": "/dashboard/api/backtesting/reports", + **default_values, + } diff --git a/src/arbitrade/dashboard/dashboard.py b/src/arbitrade/dashboard/dashboard.py new file mode 100644 index 0000000..0d359f8 --- /dev/null +++ b/src/arbitrade/dashboard/dashboard.py @@ -0,0 +1,1181 @@ +from __future__ import annotations + +import json +from asyncio import Lock +from collections.abc import AsyncIterator +from datetime import UTC, datetime +from importlib import resources +from pathlib import Path +from typing import cast +from urllib.parse import parse_qs + +from httpcore import request +import orjson +from fastapi import APIRouter, Depends, Request, Response +from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse +from fastapi.templating import Jinja2Templates + +from arbitrade.alerting.notifier import SupportsAlerts, SupportsAlertStatus +from arbitrade.api.auth import require_dashboard_auth +from arbitrade.api.control_state import DashboardControlState +from arbitrade.api.routes import ( + _backtesting_panel_context, +) +from arbitrade.config.pairing_sync import sync_pairings_from_kraken +from arbitrade.config.service import ConfigPairing + +from arbitrade.api.control_state import DashboardControlState +from arbitrade.storage.pg_store import PgStore +from arbitrade.detection.graph import CurrencyGraph, TriangularCycle +from arbitrade.storage.repositories import ( + AuditRecord, + AuditRepository, + BacktestJobRepository, + ConfigPairingRepository, + KrakenAccountSnapshotRepository, + LogRepository, +) + +router = APIRouter(dependencies=[Depends(require_dashboard_auth)]) +public_router = APIRouter() + + +def _resolve_templates_directory() -> str: + # Support source layout, Docker runtime (/app), and installed package data. + source_layout_path = Path( + __file__).resolve().parents[3] / "web" / "templates" + if source_layout_path.is_dir(): + return str(source_layout_path) + + docker_runtime_path = Path.cwd() / "web" / "templates" + if docker_runtime_path.is_dir(): + return str(docker_runtime_path) + + try: + package_path = resources.files( + "arbitrade").joinpath("web", "templates") + if package_path.is_dir(): + return str(package_path) + except (ModuleNotFoundError, AttributeError): + pass + + return str(source_layout_path) + + +templates = Jinja2Templates(directory=_resolve_templates_directory()) + + +def _alert_notifier(request: Request) -> SupportsAlerts | None: + notifier = getattr(request.app.state, "alert_notifier", None) + return cast(SupportsAlerts | None, notifier) + + +def _alert_status_snapshot(request: Request) -> dict[str, object]: + notifier = getattr(request.app.state, "alert_notifier", None) + if isinstance(notifier, SupportsAlertStatus): + return notifier.status_snapshot() + return { + "enabled": False, + "has_channels": False, + "configured_channels": [], + "min_severity": "—", + "dedup_seconds": 0.0, + "last_result": "unavailable", + "last_attempted_at": None, + "last_success_at": None, + "last_error": None, + "last_event": None, + "last_channel_results": [], + } + + +def _pairing_repo(request: Request) -> ConfigPairingRepository: + return ConfigPairingRepository(request.app.state.store) + + +async def _fee_rate_for_profile( + profile: str, + custom_fee_rate: float | None, + request: Request, +) -> tuple[float, float]: + normalized = _normalize_fee_profile(profile) + if normalized == "custom": + if custom_fee_rate is None: + raise ValueError("Custom fee profile requires custom_fee_rate") + return custom_fee_rate, custom_fee_rate + elif normalized == "api": + repo = KrakenAccountSnapshotRepository(request.app.state.store) + snapshot = await repo.latest_snapshot() + if snapshot is None or snapshot.maker_fee is None or snapshot.taker_fee is None: + raise ValueError("No fee data available from API") + return snapshot.maker_fee, snapshot.taker_fee + elif normalized == "flat_001": + return 0.001, 0.001 + elif normalized == "flat_0005": + return 0.0005, 0.0005 + else: + raise ValueError(f"Unknown fee profile: {profile}") + + +def _format_metric(value: float | None, *, precision: int = 2, suffix: str = "") -> str: + if value is None: + return "—" + return f"{value:.{precision}f}{suffix}" + + +def _normalize_fee_profile(profile: str) -> str: + return profile.strip().lower().replace("-", "_") + + +def _parse_comma_separated_list(value: str | None) -> list[str]: + if value is None: + return [] + + items: list[str] = [] + for raw_item in value.split(","): + item = raw_item.strip().upper() + if item and item not in items: + items.append(item) + return items + + +def _parse_form_body(body: bytes) -> dict[str, str]: + parsed = parse_qs(body.decode("utf-8"), keep_blank_values=True) + result: dict[str, str] = {} + for key, values in parsed.items(): + if not values: + continue + if len(values) == 1: + result[key] = values[0] + else: + # Multi-value fields (e.g. checkboxes) -> join with comma + result[key] = ",".join(values) + return result + + +def _form_bool(value: str | None) -> bool: + if value is None: + return False + return value.lower() in {"1", "true", "yes", "on"} + + +def _audit_repository(request: Request) -> AuditRepository | None: + repository = getattr(request.app.state, "audit_repository", None) + return cast(AuditRepository | None, repository) + + +async def _record_audit( + request: Request, + *, + actor: str, + event_type: str, + decision: str, + payload: dict[str, object] | None = None, +) -> None: + repository = _audit_repository(request) + if repository is None: + return + correlation_id = request.headers.get("x-request-id") + if payload is not None: + ret_pl = {str(key): payload[key] for key in payload} + else: + ret_pl = None + await repository.insert( + AuditRecord( + occurred_at=datetime.now(UTC), + actor=actor, + event_type=event_type, + decision=decision, + payload=ret_pl, + correlation_id=correlation_id, + ) + ) + + +def _dashboard_controls_state(request: Request) -> DashboardControlState: + return cast(DashboardControlState, request.app.state.dashboard_controls) + + +async def _dashboard_response( + request: Request, template_name: str = "dashboard.html" +) -> HTMLResponse: + return templates.TemplateResponse( + request=request, + name=template_name, + context={ + "title": "Arbitrade Dashboard", + "request": request, + "metrics_endpoint": "/dashboard/fragment/metrics", + "overview_endpoint": "/dashboard/fragment/overview", + "controls_endpoint": "/dashboard/fragment/controls", + "charts_endpoint": "/dashboard/fragment/charts", + "stream_endpoint": "/dashboard/stream/metrics", + "overview_stream_endpoint": "/dashboard/stream/overview", + }, + ) + + +async def _dashboard_pairings_response( + request: Request, + search: str | None = None, + enabled: str | None = None, + source: str | None = None, + base: str | None = None, + quote: str | None = None, + sort: str = "base_asset", + order: str = "asc", +) -> JSONResponse: + repo = _pairing_repo(request) + pairings = await repo.list_pairings() + + # Apply filters + if search: + search_lower = search.lower() + pairings = [ + p + for p in pairings + if search_lower in p.base_asset.lower() or search_lower in p.quote_asset.lower() + ] + if enabled is not None: + enabled_bool = enabled.lower() == "true" + pairings = [p for p in pairings if p.enabled == enabled_bool] + if source: + pairings = [p for p in pairings if p.source.lower() == source.lower()] + if base: + pairings = [p for p in pairings if p.base_asset.lower() == + base.lower()] + if quote: + pairings = [p for p in pairings if p.quote_asset.lower() == + quote.lower()] + + # Sort + reverse = order.lower() == "desc" + if sort == "base_asset": + pairings.sort(key=lambda p: p.base_asset, reverse=reverse) + elif sort == "quote_asset": + pairings.sort(key=lambda p: p.quote_asset, reverse=reverse) + elif sort == "enabled": + pairings.sort(key=lambda p: p.enabled, reverse=reverse) + + return JSONResponse( + [ + { + "id": p.id, + "base_asset": p.base_asset, + "quote_asset": p.quote_asset, + "pair": f"{p.base_asset}/{p.quote_asset}", + "enabled": p.enabled, + "source": p.source, + "created_at": p.created_at.isoformat() if p.created_at else None, + "updated_at": p.updated_at.isoformat() if p.updated_at else None, + } + for p in pairings + ] + ) + + +async def _dashboard_metrics(request: Request) -> dict[str, str]: + metrics = await request.app.state.metrics.compute() + return { + "realized_pnl": _format_metric(metrics.realized_pnl_usd, precision=2, suffix=" USD"), + "win_rate": _format_metric( + metrics.win_rate * 100.0 if metrics.win_rate is not None else None, + precision=1, + suffix="%", + ), + "avg_trade_duration": _format_metric( + metrics.avg_trade_duration_seconds, precision=1, suffix=" s" + ), + "opportunities_per_minute": _format_metric( + metrics.opportunities_per_minute, precision=1, suffix=" /min" + ), + "fill_rate": _format_metric( + metrics.fill_rate * 100.0 if metrics.fill_rate is not None else None, + precision=1, + suffix="%", + ), + "latency_p50": _format_metric(metrics.latency_p50_seconds, precision=3, suffix=" s"), + "latency_p95": _format_metric(metrics.latency_p95_seconds, precision=3, suffix=" s"), + "latency_p99": _format_metric(metrics.latency_p99_seconds, precision=3, suffix=" s"), + "generated_at": datetime.now(UTC).isoformat(), + } + + +async def _dashboard_audit(request: Request, *, limit: int = 15) -> dict[str, object]: + repository = _audit_repository(request) + if repository is None: + return { + "entries": [], + "generated_at": datetime.now(UTC).isoformat(), + } + + records = await repository.list_recent(limit=limit) + entries: list[dict[str, str]] = [] + for record in records: + payload_text = "—" + if record.payload: + payload_text = json.dumps(record.payload) + entries.append( + { + "occurred_at": record.occurred_at.isoformat(), + "actor": record.actor, + "event_type": record.event_type, + "decision": record.decision, + "payload": payload_text, + "correlation_id": record.correlation_id or "—", + } + ) + + return { + "entries": entries, + "generated_at": datetime.now(UTC).isoformat(), + } + + +async def _dashboard_overview(request: Request) -> dict[str, object]: + store: PgStore = request.app.state.store + async with store.pool.acquire() as conn: + portfolio_row = await conn.fetchrow(""" + SELECT balances, total_value_usd + FROM portfolio_snapshots + ORDER BY snapshot_at DESC + LIMIT 1 + """) + open_trades = await conn.fetch(""" + SELECT trade_ref, status, started_at, cycle + FROM trades + WHERE finished_at IS NULL + ORDER BY started_at DESC + LIMIT 5 + """) + rpnl = await conn.fetchrow(""" + SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) + FROM trades + """) + latest_opportunities = await conn.fetch(""" + SELECT cycle, net_pct, est_profit, detected_at + FROM opportunities + ORDER BY detected_at DESC + LIMIT 5 + """) + + # Query equity from kraken_account_snapshots + equity_value = "—" + try: + equity_row = await conn.fetchrow(""" + SELECT trade_balance_raw + FROM kraken_account_snapshots + ORDER BY snapshot_at DESC + LIMIT 1 + """) + if equity_row is not None and equity_row["trade_balance_raw"] is not None: + tb_raw = equity_row["trade_balance_raw"] + if isinstance(tb_raw, str): + tb_raw = json.loads(tb_raw) + if isinstance(tb_raw, dict): + eb = tb_raw.get("eb") + equity_value = f"{float(eb):.2f} USD" if eb is not None else "—" + except Exception: + pass + + # Query latest Kraken account snapshot for fee info + fee_tier = "—" + maker_fee = "—" + taker_fee = "—" + thirty_day_volume = "—" + try: + acct_row = await conn.fetchrow(""" + SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume + FROM kraken_account_snapshots + ORDER BY snapshot_at DESC + LIMIT 1 + """) + if acct_row is not None: + fee_tier = str( + acct_row["fee_tier"]) if acct_row["fee_tier"] is not None else "—" + maker_fee = ( + f"{float(acct_row['maker_fee']):.4%}" + if acct_row["maker_fee"] is not None + else "—" + ) + taker_fee = ( + f"{float(acct_row['taker_fee']):.4%}" + if acct_row["taker_fee"] is not None + else "—" + ) + thirty_day_volume = ( + f"{float(acct_row['thirty_day_volume']):.2f}" + if acct_row["thirty_day_volume"] is not None + else "—" + ) + except Exception: + pass + + balances_value = "—" + total_value = "—" + if portfolio_row is not None: + balances_raw = portfolio_row["balances"] + total_value_raw = portfolio_row["total_value_usd"] + if isinstance(balances_raw, str) and balances_raw: + try: + parsed = json.loads(balances_raw) + if isinstance(parsed, dict): + non_zero = {k: float(v) + for k, v in parsed.items() if float(v) > 0.0} + if non_zero: + balances_value = "
".join( + f"{v:.6g} {k}" for k, v in sorted(non_zero.items()) + ) + else: + balances_value = "No balances" + else: + balances_value = str(balances_raw) + except (json.JSONDecodeError, ValueError, TypeError): + balances_value = str(balances_raw) + elif balances_raw is not None: + balances_value = str(balances_raw) + if total_value_raw is not None: + total_value = f"{float(total_value_raw):.2f} USD" + + open_trade_rows = [ + { + "trade_ref": str(r["trade_ref"]), + "status": str(r["status"]), + "started_at": ( + r["started_at"].isoformat() if isinstance( + r["started_at"], datetime) else "—" + ), + "cycle": str(r["cycle"]) if r["cycle"] is not None else "—", + } + for r in open_trades + ] + opportunity_rows = [ + { + "cycle": str(r["cycle"]), + "net_pct": f"{float(r['net_pct']):.2f}%" if r["net_pct"] is not None else "—", + "est_profit": ( + f"{float(r['est_profit']):.2f} USD" if r["est_profit"] is not None else "—" + ), + "detected_at": ( + r["detected_at"].isoformat() if isinstance( + r["detected_at"], datetime) else "—" + ), + } + for r in latest_opportunities + ] + + return { + "status": "live", + "generated_at": datetime.now(UTC).isoformat(), + "balances": balances_value, + "total_value": total_value, + "equity": equity_value, + "open_trade_count": len(open_trade_rows), + "open_trades": open_trade_rows, + "realized_pnl_total": f"{float(rpnl[0]):.2f} USD" if rpnl else "—", + "opportunities": opportunity_rows, + "fee_tier": fee_tier, + "maker_fee": maker_fee, + "taker_fee": taker_fee, + "thirty_day_volume": thirty_day_volume, + "fee_source": "API" if fee_tier != "—" else "—", + } + + +async def _dashboard_charts(request: Request) -> dict[str, object]: + store: PgStore = request.app.state.store + async with store.pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT detected_at, cycle, net_pct, est_profit + FROM opportunities + ORDER BY detected_at DESC + LIMIT 10 + """) + + cr = list(reversed(rows)) + labels = [] + for index, row in enumerate(cr): + if isinstance(row["detected_at"], datetime): + labels.append(row["detected_at"].isoformat()) + else: + labels.append(f"opportunity-{index + 1}") + np = [float(row["net_pct"]) if row["net_pct"] + is not None else 0.0 for row in cr] + ep = [float(row["est_profit"]) if row["est_profit"] + is not None else 0.0 for row in cr] + cycles = [str(row["cycle"]) for row in cr] + + return { + "labels": labels, + "net_pct_values": np, + "est_profit_values": ep, + "cycles": cycles, + "has_chart_data": bool(cr), + "generated_at": datetime.now(UTC).isoformat(), + } + + +async def _dashboard_config_context(request: Request) -> dict[str, object]: + ctl = _dashboard_controls_state(request) + rs = request.app.state.settings + max_trade_capital_usd = ( + f"{float(rs.max_trade_capital_usd):.2f} USD" + if rs.max_trade_capital_usd is not None + else "—" + ) + max_trade_capital_usd_value = ( + f"{float(rs.max_trade_capital_usd):.2f}" if rs.max_trade_capital_usd is not None else "" + ) + max_concurrent_trades = ( + str(rs.max_concurrent_trades) if rs.max_concurrent_trades is not None else "—" + ) + max_concurrent_trades_value = ( + str(rs.max_concurrent_trades) if rs.max_concurrent_trades is not None else "" + ) + max_exposure_per_asset = ( + f"{float(rs.max_exposure_per_asset_usd):.2f} USD" + if rs.max_exposure_per_asset_usd is not None + else "—" + ) + max_exposure_per_asset_value = ( + f"{float(rs.max_exposure_per_asset_usd):.2f}" + if rs.max_exposure_per_asset_usd is not None + else "" + ) + daily_loss_limit = ( + f"{float(rs.daily_loss_limit_usd):.2f} USD" if rs.daily_loss_limit_usd is not None else "—" + ) + daily_loss_limit_value = ( + f"{float(rs.daily_loss_limit_usd):.2f}" if rs.daily_loss_limit_usd is not None else "" + ) + cumulative_loss_limit = ( + f"{float(rs.cumulative_loss_limit_usd):.2f} USD" + if rs.cumulative_loss_limit_usd is not None + else "—" + ) + cumulative_loss_limit_value = ( + f"{float(rs.cumulative_loss_limit_usd):.2f}" + if rs.cumulative_loss_limit_usd is not None + else "" + ) + max_source_latency = ( + f"{float(rs.max_source_latency_ms):.1f} ms" if rs.max_source_latency_ms is not None else "—" + ) + max_source_latency_value = ( + f"{float(rs.max_source_latency_ms):.1f}" if rs.max_source_latency_ms is not None else "" + ) + max_apply_latency = ( + f"{float(rs.max_apply_latency_ms):.1f} ms" if rs.max_apply_latency_ms is not None else "—" + ) + max_apply_latency_value = ( + f"{float(rs.max_apply_latency_ms):.1f}" if rs.max_apply_latency_ms is not None else "" + ) + max_consecutive_failures = ( + str(rs.max_consecutive_failures) if rs.max_consecutive_failures is not None else "—" + ) + max_consecutive_failures_value = ( + str(rs.max_consecutive_failures) if rs.max_consecutive_failures is not None else "" + ) + strategy_stat_arb_enabled = bool( + getattr(rs, "strategy_enable_stat_arb_experiment", False)) + + return { + # Runtime + "app_env": rs.app_env, + "app_host": rs.app_host, + "app_port": str(rs.app_port), + "log_level": rs.log_level, + "log_json": "checked" if rs.log_json else "", + "paper_trading_mode": "checked" if rs.paper_trading_mode else "", + "trade_capital_usd": f"{float(rs.trade_capital_usd):.2f}", + "trade_capital_usd_value": f"{float(rs.trade_capital_usd):.2f}", + "max_trade_capital_usd": max_trade_capital_usd, + "max_trade_capital_usd_value": max_trade_capital_usd_value, + "max_concurrent_trades": max_concurrent_trades, + "max_concurrent_trades_value": max_concurrent_trades_value, + "max_exposure_per_asset": max_exposure_per_asset, + "max_exposure_per_asset_value": max_exposure_per_asset_value, + "quote_balance_asset": rs.quote_balance_asset, + "min_order_size_usd": ( + f"{float(rs.min_order_size_usd):.2f}" if rs.min_order_size_usd is not None else "" + ), + "min_order_size_usd_value": ( + f"{float(rs.min_order_size_usd):.2f}" if rs.min_order_size_usd is not None else "" + ), + "tradable_pairs_value": ", ".join(ctl.tradable_pairs), + "strategy_mode": ctl.strategy_mode, + "strategy_stat_arb_enabled": strategy_stat_arb_enabled, + "strategy_profit_threshold": f"{ctl.strategy_profit_threshold:.6f}", + "strategy_max_depth_levels": str(ctl.strategy_max_depth_levels), + # Alerts + "alerts_enabled": "checked" if rs.alerts_enabled else "", + "alert_min_severity": rs.alert_min_severity, + "alert_dedup_seconds": f"{rs.alert_dedup_seconds:.0f}", + "alert_on_trade_events": "checked" if rs.alert_on_trade_events else "", + "alert_on_error_events": "checked" if rs.alert_on_error_events else "", + "alert_on_threshold_events": "checked" if rs.alert_on_threshold_events else "", + "alert_on_system_events": "checked" if rs.alert_on_system_events else "", + "telegram_alerts_enabled": "checked" if rs.telegram_alerts_enabled else "", + "telegram_bot_token": rs.telegram_bot_token or "", + "telegram_chat_id": rs.telegram_chat_id or "", + "discord_alerts_enabled": "checked" if rs.discord_alerts_enabled else "", + "discord_webhook_url": rs.discord_webhook_url or "", + "email_alerts_enabled": "checked" if rs.email_alerts_enabled else "", + "email_smtp_host": rs.email_smtp_host or "", + "email_smtp_port": str(rs.email_smtp_port), + "email_smtp_username": rs.email_smtp_username or "", + "email_smtp_password": "", + "email_alert_from": rs.email_alert_from or "", + "email_alert_to": rs.email_alert_to or "", + "email_smtp_use_tls": "checked" if rs.email_smtp_use_tls else "", + # Kraken + "kraken_rest_url": rs.kraken_rest_url, + "kraken_ws_url": rs.kraken_ws_url, + "kraken_private_rate_limit_seconds": f"{rs.kraken_private_rate_limit_seconds:.2f}", + "kraken_http_timeout_seconds": f"{rs.kraken_http_timeout_seconds:.1f}", + "kraken_retry_attempts": str(rs.kraken_retry_attempts), + "kraken_retry_base_delay_seconds": f"{rs.kraken_retry_base_delay_seconds:.2f}", + "kraken_api_key": rs.kraken_api_key or "", + "kraken_api_secret": "", + "kraken_api_key_permissions": rs.kraken_api_key_permissions, + "ws_heartbeat_timeout_seconds": f"{rs.ws_heartbeat_timeout_seconds:.1f}", + "ws_max_staleness_seconds": f"{rs.ws_max_staleness_seconds:.1f}", + # Risk + "daily_loss_limit": daily_loss_limit, + "daily_loss_limit_value": daily_loss_limit_value, + "cumulative_loss_limit": cumulative_loss_limit, + "cumulative_loss_limit_value": cumulative_loss_limit_value, + "max_source_latency": max_source_latency, + "max_source_latency_value": max_source_latency_value, + "max_apply_latency": max_apply_latency, + "max_apply_latency_value": max_apply_latency_value, + "max_consecutive_failures": max_consecutive_failures, + "max_consecutive_failures_value": max_consecutive_failures_value, + "kill_switch_active": "checked" if rs.kill_switch_active else "", + # Strategy stat-arb + "strategy_stat_arb_lookback_window": str(rs.strategy_stat_arb_lookback_window), + "strategy_stat_arb_entry_zscore": f"{rs.strategy_stat_arb_entry_zscore:.1f}", + "strategy_stat_arb_exit_zscore": f"{rs.strategy_stat_arb_exit_zscore:.1f}", + "strategy_stat_arb_max_holding_seconds": f"{rs.strategy_stat_arb_max_holding_seconds:.0f}", + # UI + "config_endpoint": "/dashboard/control/config", + } + + +def _dashboard_controls(request: Request) -> dict[str, object]: + ctl = _dashboard_controls_state(request) + rs = request.app.state.settings + alert_status = _alert_status_snapshot(request) + last_event = alert_status.get("last_event") + last_event_title = "—" + if isinstance(last_event, dict): + title_value = last_event.get("title") + if isinstance(title_value, str): + last_event_title = title_value + + cc = alert_status.get("configured_channels") + cd = "—" + if isinstance(cc, list) and cc: + cd = ", ".join(str(channel) for channel in cc) + + ddsr = alert_status.get("dedup_seconds", 0.0) + dds = float(ddsr) if isinstance(ddsr, int | float) else 0.0 + tpd = ", ".join(ctl.tradable_pairs) if ctl.tradable_pairs else "All" + max_trade_capital_usd = ( + f"{float(rs.max_trade_capital_usd):.2f} USD" + if rs.max_trade_capital_usd is not None + else "—" + ) + max_trade_capital_usd_value = ( + f"{float(rs.max_trade_capital_usd):.2f}" if rs.max_trade_capital_usd is not None else "" + ) + max_concurrent_trades = ( + str(rs.max_concurrent_trades) if rs.max_concurrent_trades is not None else "—" + ) + max_concurrent_trades_value = ( + str(rs.max_concurrent_trades) if rs.max_concurrent_trades is not None else "" + ) + alerts_last_channel_results = [ + str(item) for item in cast(list[object], alert_status.get("last_channel_results", [])) + ] + strategy_stat_arb_enabled = bool( + getattr(rs, "strategy_enable_stat_arb_experiment", False)) + + return { + "execution_status": "running" if ctl.is_running else "stopped", + "kill_switch_status": "active" if ctl.kill_switch.is_active else "inactive", + "kill_switch_reason": ctl.kill_switch.reason or "—", + "paper_trading_mode": "enabled" if rs.paper_trading_mode else "disabled", + "trade_capital_usd": f"{float(rs.trade_capital_usd):.2f} USD", + "trade_capital_usd_value": f"{float(rs.trade_capital_usd):.2f}", + "max_trade_capital_usd": max_trade_capital_usd, + "max_trade_capital_usd_value": max_trade_capital_usd_value, + "max_concurrent_trades": max_concurrent_trades, + "max_concurrent_trades_value": max_concurrent_trades_value, + "alerts_enabled": "enabled" if bool(alert_status.get("enabled", False)) else "disabled", + "alerts_channels": cd, + "alerts_min_severity": str(alert_status.get("min_severity", "—")), + "alerts_dedup_seconds": f"{dds:.0f}", + "alerts_last_result": str(alert_status.get("last_result", "unavailable")), + "alerts_last_attempted_at": str(alert_status.get("last_attempted_at") or "—"), + "alerts_last_success_at": str(alert_status.get("last_success_at") or "—"), + "alerts_last_event_title": last_event_title, + "alerts_last_error": str(alert_status.get("last_error") or "—"), + "alerts_last_channel_results": alerts_last_channel_results, + "tradable_pairs_display": tpd, + "tradable_pairs_value": ", ".join(ctl.tradable_pairs), + "strategy_mode": ctl.strategy_mode, + "strategy_stat_arb_enabled": strategy_stat_arb_enabled, + "strategy_profit_threshold": f"{ctl.strategy_profit_threshold:.6f}", + "strategy_max_depth_levels": str(ctl.strategy_max_depth_levels), + "updated_at": ctl.updated_at.isoformat(), + "start_endpoint": "/dashboard/control/start", + "stop_endpoint": "/dashboard/control/stop", + "kill_switch_endpoint": "/dashboard/control/kill-switch", + "config_endpoint": "/dashboard/control/config", + "chart_endpoint": "/dashboard/fragment/charts", + } + + +async def _dashboard_backtesting_handler(request: Request) -> HTMLResponse: + form = _parse_form_body(await request.body()) + defaults = { + "starting_balances": form.get("starting_balances", "USD=1000.0"), + "trade_capital": form.get("trade_capital", "100.0"), + "min_profit_threshold": form.get("min_profit_threshold", "0.0005"), + "fee_profile": _normalize_fee_profile(form.get("fee_profile", "api")), + "custom_fee_rate": form.get("custom_fee_rate", ""), + "slippage_bps": form.get("slippage_bps", "4.0"), + "execution_latency_ms": form.get("execution_latency_ms", "20.0"), + "start_time": form.get("start_time", ""), + "end_time": form.get("end_time", ""), + "symbols": form.get("symbols", ""), + "source": form.get("source", "db"), + } + + try: + custom_fee_rate = ( + float(defaults["custom_fee_rate"] + ) if defaults["custom_fee_rate"].strip() else None + ) + fee_rate = await _fee_rate_for_profile( + defaults["fee_profile"], custom_fee_rate, request=request + ) + + # Use enabled pairings from DB when none selected + symbols_str = defaults["symbols"] + if not symbols_str.strip(): + pairing_repo = ConfigPairingRepository(request.app.state.store) + enabled = await pairing_repo.list_pairings(enabled_only=True) + symbols_str = ",".join( + f"{p.base_asset}/{p.quote_asset}" for p in enabled) + + config_dict: dict[str, object] = { + "source": defaults["source"], + "starting_balances": defaults["starting_balances"], + "trade_capital": float(defaults["trade_capital"]), + "min_profit_threshold": float(defaults["min_profit_threshold"]), + "fee_rate": fee_rate, + "fee_profile": defaults["fee_profile"], + "slippage_bps": float(defaults["slippage_bps"]), + "execution_latency_ms": float(defaults["execution_latency_ms"]), + "start_time": defaults["start_time"], + "end_time": defaults["end_time"], + "symbols": symbols_str, + } + + store = request.app.state.store + repo = BacktestJobRepository(store) + events_label = symbols_str if symbols_str else "DB-sourced" + job = await repo.create_job(events_label, config_dict) + msg_job = job.id[:8] if job.id else "unknown" + + queue = request.app.state.backtest_queue + await queue.put((job.id or "", config_dict)) + + await _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.backtesting.submit", + decision="queued", + payload={"job_id": job.id, "source": defaults["source"]}, + ) + + context = await _backtesting_panel_context( + request, + status="submitted", + message=f"Job {msg_job}... queued. Refresh to see results.", + defaults=defaults, + ) + context["flash_message"] = f"Backtest job {msg_job}... submitted successfully." + except ValueError as exc: + context = await _backtesting_panel_context( + request, + status="failed", + message=str(exc), + defaults=defaults, + ) + except Exception as exc: + context = await _backtesting_panel_context( + request, + status="failed", + message=f"Unexpected error: {exc}", + defaults=defaults, + ) + + return templates.TemplateResponse( + request=request, + name="partials/backtesting_panel.html", + context={"request": request, **context}, + ) + + +async def _dashboard_backtesting_job_handler(request: Request, job_id: str) -> HTMLResponse: + store = request.app.state.store + repo = BacktestJobRepository(store) + job = await repo.get_job(job_id) + if job is None: + return HTMLResponse("

Job not found

", status_code=404) + + report_html = "
No report yet
" + if job.report: + i = job.id[:8] if job.id else "unknown" + r = job.report + report_html = ( + f"
" + f"
Job {i}... Report
" + f"
Status: {job.status}
" + f"
Events: {job.events_path}
" + f"
Processed: {r.get('processed_events', '—')}
" + f"
Opportunities: {r.get('opportunities_seen', '—')}
" + f"
Trades: {r.get('trades_executed', '—')}
" + f"
Realized P&L: {r.get('realized_pnl_usd', '—')} USD
" + f"
Max drawdown: {r.get('max_drawdown_usd', '—')} USD
" + f"
Win rate: {r.get('win_rate', '—')}
" + f"
Fill rate: {r.get('fill_rate', '—')}
" + f"
Latency p50: {r.get('execution_latency_p50_ms', '—')} ms
" + f"
Created: {job.created_at}
" + f"
" + ) + + return HTMLResponse(report_html) + + +async def _dashboard_backtesting_job_export(request: Request, job_id: str) -> Response: + store = request.app.state.store + repo = BacktestJobRepository(store) + job = await repo.get_job(job_id) + if job is None: + return Response("Job not found", status_code=404) + + payload: dict[str, object] = { + "job_id": job_id, + "status": job.status, + "events_path": job.events_path, + "created_at": job.created_at.isoformat() if job.created_at else None, + } + if job.report: + payload["report"] = job.report + if job.config: + payload["config"] = job.config + + return Response( + content=orjson.dumps(payload).decode("utf-8"), + media_type="application/x-jsonlines", + headers={ + "Content-Disposition": f"attachment; filename=backtest_{job_id[:8]}.jsonl"}, + ) + + +async def _dashboard_control_cfg_handler(request: Request) -> HTMLResponse: + ctl = _dashboard_controls_state(request) + rs = request.app.state.settings + form = _parse_form_body(await request.body()) + + if "trade_capital_usd" in form and form["trade_capital_usd"]: + rs.trade_capital_usd = float(form["trade_capital_usd"]) + if "max_trade_capital_usd" in form: + mtcv = form["max_trade_capital_usd"].strip() + rs.max_trade_capital_usd = float(mtcv) if mtcv else None + if "max_concurrent_trades" in form: + mcv = form["max_concurrent_trades"].strip() + rs.max_concurrent_trades = int(mcv) if mcv else None + + form_pairs = form.get("tradable_pairs") + ctl.tradable_pairs = _parse_comma_separated_list(form_pairs) + if "strategy_mode" in form and form["strategy_mode"].strip(): + strategy_mode = form["strategy_mode"].strip().lower() + allowed_strategy_modes = {"incremental", "paper", "live"} + if bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)): + allowed_strategy_modes.add("stat_arb_experiment") + if strategy_mode not in allowed_strategy_modes: + e = f"strategy_mode must be one of: {', '.join(sorted(allowed_strategy_modes))}" + raise ValueError(e) + ctl.strategy_mode = strategy_mode + if "strategy_profit_threshold" in form: + if form["strategy_profit_threshold"].strip(): + spt = float(form["strategy_profit_threshold"]) + ctl.strategy_profit_threshold = spt + if "strategy_max_depth_levels" in form: + if form["strategy_max_depth_levels"].strip(): + smdl = int(form["strategy_max_depth_levels"]) + ctl.strategy_max_depth_levels = smdl + + rs.paper_trading_mode = _form_bool(form.get("paper_trading_mode")) + ctl.mark_updated() + + notifier = _alert_notifier(request) + if notifier is not None: + await notifier.notify( + category="system", + severity="info", + title="Runtime config updated", + message="Dashboard control updated runtime risk and execution settings.", + details={ + "trade_capital_usd": f"{rs.trade_capital_usd}", + "max_trade_capital_usd": ( + "none" if rs.max_trade_capital_usd is None else f"{rs.max_trade_capital_usd}" + ), + "max_concurrent_trades": ( + "none" if rs.max_concurrent_trades is None else f"{rs.max_concurrent_trades}" + ), + "paper_trading_mode": "true" if rs.paper_trading_mode else "false", + }, + ) + await _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.control.config", + decision="approved", + payload={ + "trade_capital_usd": rs.trade_capital_usd, + "max_trade_capital_usd": rs.max_trade_capital_usd, + "max_concurrent_trades": rs.max_concurrent_trades, + "paper_trading_mode": rs.paper_trading_mode, + "tradable_pairs": ctl.tradable_pairs, + "strategy_mode": ctl.strategy_mode, + "strategy_profit_threshold": ctl.strategy_profit_threshold, + "strategy_max_depth_levels": ctl.strategy_max_depth_levels, + }, + ) + + d_context = await _dashboard_config_context(request) + d_context["flash_message"] = "Configuration saved successfully." + return templates.TemplateResponse( + request=request, + name="partials/config.html", + context={"request": request, **d_context}, + ) + + +async def _toggle_pairing(request: Request) -> HTMLResponse: + """Toggle enabled/disabled for a pairing. Expects JSON or form body with base_asset and quote_asset.""" + ctype = request.headers.get("content-type", "") + if "application/json" in ctype: + body = await request.json() + else: + form = _parse_form_body(await request.body()) + body = form + + base_asset = str(body.get("base_asset", "")).upper() + quote_asset = str(body.get("quote_asset", "")).upper() + if not base_asset or not quote_asset: + return HTMLResponse("Missing base_asset or quote_asset", status_code=400) + + repo = _pairing_repo(request) + existing = await repo.get_pairing(base_asset, quote_asset) + if existing is None: + return HTMLResponse("Pairing not found", status_code=404) + + toggled = ConfigPairing( + base_asset=existing.base_asset, + quote_asset=existing.quote_asset, + enabled=not existing.enabled, + source=existing.source, + ) + await repo.update_pairing(base_asset, quote_asset, toggled) + + await _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.pairings.toggle", + decision="approved", + payload={ + "base_asset": base_asset, + "quote_asset": quote_asset, + "enabled": toggled.enabled, + }, + ) + + # Return refreshed fragment + pairings_repo = _pairing_repo(request) + pairings = await pairings_repo.list_pairings() + pairings.sort(key=lambda p: (p.base_asset, p.quote_asset)) + return templates.TemplateResponse( + request=request, + name="partials/pairings_table.html", + context={"request": request, "pairings": pairings}, + ) + + +async def _dashboard_ctl_start(request: Request) -> HTMLResponse: + controls = _dashboard_controls_state(request) + controls.is_running = True + controls.mark_updated() + notifier = _alert_notifier(request) + if notifier is not None: + await notifier.notify( + category="system", + severity="info", + title="Execution started", + message="Dashboard control started execution.", + ) + await _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.control.start", + decision="approved", + payload={"execution_status": "running"}, + ) + return templates.TemplateResponse( + request=request, + name="partials/controls.html", + context={"request": request, **_dashboard_controls(request)}, + ) + + +async def _dashboard_ctl_stop(request: Request) -> HTMLResponse: + controls = _dashboard_controls_state(request) + controls.is_running = False + controls.mark_updated() + notifier = _alert_notifier(request) + if notifier is not None: + await notifier.notify( + category="system", + severity="warning", + title="Execution stopped", + message="Dashboard control stopped execution.", + ) + await _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.control.stop", + decision="approved", + payload={"execution_status": "stopped"}, + ) + return templates.TemplateResponse( + request=request, + name="partials/controls.html", + context={"request": request, **_dashboard_controls(request)}, + ) + + +async def _dashboard_ctl_kill(request: Request) -> HTMLResponse: + controls = _dashboard_controls_state(request) + form = _parse_form_body(await request.body()) + reason = form.get("reason") or "manual" + controls.kill_switch.activate(reason=reason) + controls.is_running = False + controls.mark_updated() + notifier = _alert_notifier(request) + if notifier is not None: + await notifier.notify( + category="threshold", + severity="critical", + title="Kill switch activated", + message="Kill switch triggered from dashboard control.", + details={"reason": reason}, + ) + await _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.control.kill_switch", + decision="approved", + payload={"reason": reason}, + ) + return templates.TemplateResponse( + request=request, + name="partials/controls.html", + context={"request": request, **_dashboard_controls(request)}, + ) + + +async def _dashboard_ctl_cfg(request: Request) -> HTMLResponse: + ctl = _dashboard_controls_state(request) + rs = request.app.state.settings + form = _parse_form_body(await request.body()) + + if "trade_capital_usd" in form and form["trade_capital_usd"]: + rs.trade_capital_usd = float(form["trade_capital_usd"]) + if "max_trade_capital_usd" in form: + mtcv = form["max_trade_capital_usd"].strip() + rs.max_trade_capital_usd = float(mtcv) if mtcv else None + if "max_concurrent_trades" in form: + mcv = form["max_concurrent_trades"].strip() + rs.max_concurrent_trades = int(mcv) if mcv else None + + form_pairs = form.get("tradable_pairs") + ctl.tradable_pairs = _parse_comma_separated_list(form_pairs) + if "strategy_mode" in form and form["strategy_mode"].strip(): + strategy_mode = form["strategy_mode"].strip().lower() + allowed_strategy_modes = {"incremental", "paper", "live"} + if bool(getattr(rs, "strategy_enable_stat_arb_experiment", False)): + allowed_strategy_modes.add("stat_arb_experiment") + if strategy_mode not in allowed_strategy_modes: + e = f"strategy_mode must be one of: {', '.join(sorted(allowed_strategy_modes))}" + raise ValueError(e) + ctl.strategy_mode = strategy_mode + if "strategy_profit_threshold" in form: + if form["strategy_profit_threshold"].strip(): + spt = float(form["strategy_profit_threshold"]) + ctl.strategy_profit_threshold = spt + if "strategy_max_depth_levels" in form: + if form["strategy_max_depth_levels"].strip(): + smdl = int(form["strategy_max_depth_levels"]) + ctl.strategy_max_depth_levels = smdl + + rs.paper_trading_mode = _form_bool(form.get("paper_trading_mode")) + ctl.mark_updated() + + notifier = _alert_notifier(request) + if notifier is not None: + await notifier.notify( + category="system", + severity="info", + title="Runtime config updated", + message="Dashboard control updated runtime risk and execution settings.", + details={ + "trade_capital_usd": f"{rs.trade_capital_usd}", + "max_trade_capital_usd": ( + "none" if rs.max_trade_capital_usd is None else f"{rs.max_trade_capital_usd}" + ), + "max_concurrent_trades": ( + "none" if rs.max_concurrent_trades is None else f"{rs.max_concurrent_trades}" + ), + "paper_trading_mode": "true" if rs.paper_trading_mode else "false", + }, + ) + await _record_audit( + request, + actor="dashboard_user", + event_type="dashboard.control.config", + decision="approved", + payload={ + "trade_capital_usd": rs.trade_capital_usd, + "max_trade_capital_usd": rs.max_trade_capital_usd, + "max_concurrent_trades": rs.max_concurrent_trades, + "paper_trading_mode": rs.paper_trading_mode, + "tradable_pairs": ctl.tradable_pairs, + "strategy_mode": ctl.strategy_mode, + "strategy_profit_threshold": ctl.strategy_profit_threshold, + "strategy_max_depth_levels": ctl.strategy_max_depth_levels, + }, + ) + + d_context = await _dashboard_config_context(request) + d_context["flash_message"] = "Configuration saved successfully." + return templates.TemplateResponse( + request=request, + name="partials/config.html", + context={"request": request, **d_context}, + ) diff --git a/src/arbitrade/logging/maintenance.py b/src/arbitrade/logging/maintenance.py index 1a1a89f..5a3185c 100644 --- a/src/arbitrade/logging/maintenance.py +++ b/src/arbitrade/logging/maintenance.py @@ -21,7 +21,7 @@ async def run_log_aggregation(store: PgStore) -> None: """Aggregate log counts for the last 2 hours across all periods.""" repo = LogAggregationRepository(store) since = datetime.now(UTC) - timedelta(hours=2) - periods = ["1h", "3h", "6h", "1d", "1w", "1mo"] + periods = ["1h", "1d", "1w", "1mo"] for period in periods: try: await repo.aggregate_since(since, period) @@ -36,7 +36,8 @@ async def run_log_archive(store: PgStore, retention_days: int = _RETENTION_DAYS) repo = LogArchiveRepository(store) count = await repo.archive_before(cutoff) if count > 0: - _LOG.info("log_archive_complete", cutoff=cutoff.isoformat(), archived=count) + _LOG.info("log_archive_complete", + cutoff=cutoff.isoformat(), archived=count) return count diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index 6ecee68..d847aa2 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -258,7 +258,8 @@ class AuditRepository: record.actor, record.event_type, record.decision, - (None if record.payload is None else orjson.dumps(record.payload).decode("utf-8")), + (None if record.payload is None else orjson.dumps( + record.payload).decode("utf-8")), record.correlation_id, ) @@ -291,7 +292,8 @@ class AuditRepository: decision=str(row["decision"]), payload=payload, correlation_id=( - str(row["correlation_id"]) if row["correlation_id"] is not None else None + str(row["correlation_id"] + ) if row["correlation_id"] is not None else None ), ) ) @@ -362,7 +364,8 @@ class RuntimeStateRepository: is_running=bool(row["is_running"]), kill_switch_active=bool(row["kill_switch_active"]), kill_switch_reason=( - str(row["kill_switch_reason"]) if row["kill_switch_reason"] is not None else None + str(row["kill_switch_reason"] + ) if row["kill_switch_reason"] is not None else None ), open_trade_count=int(row["open_trade_count"]), last_known_balances=balances, @@ -772,7 +775,8 @@ class ConfigBacktestingDefaultsRepository: if row: return ConfigBacktestingDefaults( starting_balances=( - orjson.loads(row["starting_balances"]) if row["starting_balances"] else None + orjson.loads(row["starting_balances"] + ) if row["starting_balances"] else None ), trade_capital=row["trade_capital"], min_profit_threshold=row["min_profit_threshold"], @@ -793,7 +797,8 @@ class ConfigBacktestingDefaultsRepository: if row: return ConfigBacktestingDefaults( starting_balances=( - orjson.loads(row["starting_balances"]) if row["starting_balances"] else None + orjson.loads(row["starting_balances"] + ) if row["starting_balances"] else None ), trade_capital=row["trade_capital"], min_profit_threshold=row["min_profit_threshold"], @@ -830,7 +835,8 @@ class ConfigBacktestingDefaultsRepository: if row: return ConfigBacktestingDefaults( starting_balances=( - orjson.loads(row["starting_balances"]) if row["starting_balances"] else None + orjson.loads(row["starting_balances"] + ) if row["starting_balances"] else None ), trade_capital=row["trade_capital"], min_profit_threshold=row["min_profit_threshold"], @@ -901,10 +907,12 @@ class KrakenAccountSnapshotRepository: taker_fee=row["taker_fee"], thirty_day_volume=row["thirty_day_volume"], trade_balance_raw=( - orjson.loads(row["trade_balance_raw"]) if row["trade_balance_raw"] else None + orjson.loads(row["trade_balance_raw"] + ) if row["trade_balance_raw"] else None ), fee_schedule_raw=( - orjson.loads(row["fee_schedule_raw"]) if row["fee_schedule_raw"] else None + orjson.loads(row["fee_schedule_raw"] + ) if row["fee_schedule_raw"] else None ), ) @@ -1074,7 +1082,8 @@ class LogRepository: record.level, record.logger, record.message, - orjson.dumps(record.context).decode("utf-8") if record.context else None, + orjson.dumps(record.context).decode( + "utf-8") if record.context else None, ) async def query( @@ -1202,8 +1211,6 @@ class LogAggregationRepository: """Aggregate log counts per level for entries >= since, grouped by period.""" period_map = { "1h": "date_trunc('hour', recorded_at)", - "3h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 3)", - "6h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 6)", "1d": "date_trunc('day', recorded_at)", "1w": "date_trunc('week', recorded_at)", "1mo": "date_trunc('month', recorded_at)",