Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e4f5d8dfcc | |||
| 403daa6cf1 |
+41
-1119
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1 @@
|
||||
"""Dashboard module for monitoring and controlling the arbitrage bot."""
|
||||
@@ -0,0 +1,63 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import Request
|
||||
|
||||
from arbitrade.storage.repositories import (
|
||||
BacktestJobRepository,
|
||||
)
|
||||
|
||||
|
||||
async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
|
||||
repo = BacktestJobRepository(request.app.state.store)
|
||||
jobs = await repo.list_jobs(limit=5)
|
||||
reports = []
|
||||
for job in jobs:
|
||||
report: dict[str, object] = {
|
||||
"id": str(job.id),
|
||||
"status": job.status,
|
||||
}
|
||||
if job.created_at is not None:
|
||||
report["created_at"] = job.created_at.isoformat()
|
||||
if job.finished_at is not None:
|
||||
report["finished_at"] = job.finished_at.isoformat()
|
||||
reports.append(report)
|
||||
return reports
|
||||
|
||||
|
||||
async def _backtesting_panel_context(
|
||||
request: Request,
|
||||
*,
|
||||
status: str = "idle",
|
||||
message: str = "Configure a replay run and execute backtest.",
|
||||
latest_report: dict[str, object] | None = None,
|
||||
defaults: dict[str, str] | None = None,
|
||||
) -> dict[str, object]:
|
||||
default_values = {
|
||||
"symbols": "",
|
||||
"start_time": "",
|
||||
"end_time": "",
|
||||
"starting_balances": "USD=1000.0",
|
||||
"trade_capital": "100.0",
|
||||
"min_profit_threshold": "0.0005",
|
||||
"fee_profile": "api",
|
||||
"custom_fee_rate": "",
|
||||
"slippage_bps": "4.0",
|
||||
"execution_latency_ms": "20.0",
|
||||
}
|
||||
if defaults is not None:
|
||||
default_values.update(defaults)
|
||||
|
||||
reports = await _recent_backtest_reports(request)
|
||||
latest = latest_report or (reports[0] if reports else None)
|
||||
|
||||
return {
|
||||
"status": status,
|
||||
"message": message,
|
||||
"flash_message": "",
|
||||
"no_enabled_pairings": False,
|
||||
"latest_report": latest,
|
||||
"recent_reports": reports,
|
||||
"run_endpoint": "/dashboard/backtesting/run",
|
||||
"reports_endpoint": "/dashboard/api/backtesting/reports",
|
||||
**default_values,
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -21,7 +21,7 @@ async def run_log_aggregation(store: PgStore) -> None:
|
||||
"""Aggregate log counts for the last 2 hours across all periods."""
|
||||
repo = LogAggregationRepository(store)
|
||||
since = datetime.now(UTC) - timedelta(hours=2)
|
||||
periods = ["1h", "3h", "6h", "1d", "1w", "1mo"]
|
||||
periods = ["1h", "1d", "1w", "1mo"]
|
||||
for period in periods:
|
||||
try:
|
||||
await repo.aggregate_since(since, period)
|
||||
|
||||
@@ -242,6 +242,12 @@ class AuditRepository:
|
||||
|
||||
async def insert(self, record: AuditRecord) -> None:
|
||||
async with self._store.pool.acquire() as conn:
|
||||
payload = None
|
||||
if record.payload is not None:
|
||||
try:
|
||||
payload = orjson.dumps(record.payload).decode("utf-8")
|
||||
except Exception:
|
||||
payload = None
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO audit_events (
|
||||
@@ -258,7 +264,7 @@ class AuditRepository:
|
||||
record.actor,
|
||||
record.event_type,
|
||||
record.decision,
|
||||
(None if record.payload is None else orjson.dumps(record.payload).decode("utf-8")),
|
||||
payload,
|
||||
record.correlation_id,
|
||||
)
|
||||
|
||||
@@ -278,6 +284,9 @@ class AuditRepository:
|
||||
for row in rows:
|
||||
payload: dict[str, Any] | None = None
|
||||
raw_payload = row["payload"]
|
||||
correlation_id = None
|
||||
if row["correlation_id"] is not None:
|
||||
correlation_id = str(row["correlation_id"])
|
||||
if isinstance(raw_payload, str) and raw_payload:
|
||||
decoded = orjson.loads(raw_payload)
|
||||
if isinstance(decoded, dict):
|
||||
@@ -290,9 +299,7 @@ class AuditRepository:
|
||||
event_type=str(row["event_type"]),
|
||||
decision=str(row["decision"]),
|
||||
payload=payload,
|
||||
correlation_id=(
|
||||
str(row["correlation_id"]) if row["correlation_id"] is not None else None
|
||||
),
|
||||
correlation_id=correlation_id,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -352,6 +359,9 @@ class RuntimeStateRepository:
|
||||
|
||||
balances: dict[str, Any] | None = None
|
||||
raw_balances = row["last_known_balances"]
|
||||
kill_switch_reason = None
|
||||
if row["kill_switch_reason"] is not None:
|
||||
kill_switch_reason = str(row["kill_switch_reason"])
|
||||
if isinstance(raw_balances, str) and raw_balances:
|
||||
decoded = orjson.loads(raw_balances)
|
||||
if isinstance(decoded, dict):
|
||||
@@ -361,9 +371,7 @@ class RuntimeStateRepository:
|
||||
snapshot_at=row["snapshot_at"],
|
||||
is_running=bool(row["is_running"]),
|
||||
kill_switch_active=bool(row["kill_switch_active"]),
|
||||
kill_switch_reason=(
|
||||
str(row["kill_switch_reason"]) if row["kill_switch_reason"] is not None else None
|
||||
),
|
||||
kill_switch_reason=kill_switch_reason,
|
||||
open_trade_count=int(row["open_trade_count"]),
|
||||
last_known_balances=balances,
|
||||
note=str(row["note"]) if row["note"] is not None else None,
|
||||
@@ -770,10 +778,12 @@ class ConfigBacktestingDefaultsRepository:
|
||||
defaults.execution_latency_ms,
|
||||
)
|
||||
if row:
|
||||
starting_balances = None
|
||||
if row["starting_balances"] is not None:
|
||||
starting_balances = orjson.loads(row["starting_balances"])
|
||||
|
||||
return ConfigBacktestingDefaults(
|
||||
starting_balances=(
|
||||
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
|
||||
),
|
||||
starting_balances=starting_balances,
|
||||
trade_capital=row["trade_capital"],
|
||||
min_profit_threshold=row["min_profit_threshold"],
|
||||
slippage_bps=row["slippage_bps"],
|
||||
@@ -791,10 +801,11 @@ class ConfigBacktestingDefaultsRepository:
|
||||
LIMIT 1
|
||||
""")
|
||||
if row:
|
||||
starting_balances = None
|
||||
if row["starting_balances"] is not None:
|
||||
starting_balances = orjson.loads(row["starting_balances"])
|
||||
return ConfigBacktestingDefaults(
|
||||
starting_balances=(
|
||||
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
|
||||
),
|
||||
starting_balances=starting_balances,
|
||||
trade_capital=row["trade_capital"],
|
||||
min_profit_threshold=row["min_profit_threshold"],
|
||||
slippage_bps=row["slippage_bps"],
|
||||
@@ -828,10 +839,11 @@ class ConfigBacktestingDefaultsRepository:
|
||||
defaults.execution_latency_ms,
|
||||
)
|
||||
if row:
|
||||
starting_balances = None
|
||||
if row["starting_balances"] is not None:
|
||||
starting_balances = orjson.loads(row["starting_balances"])
|
||||
return ConfigBacktestingDefaults(
|
||||
starting_balances=(
|
||||
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
|
||||
),
|
||||
starting_balances=starting_balances,
|
||||
trade_capital=row["trade_capital"],
|
||||
min_profit_threshold=row["min_profit_threshold"],
|
||||
slippage_bps=row["slippage_bps"],
|
||||
@@ -894,18 +906,20 @@ class KrakenAccountSnapshotRepository:
|
||||
""")
|
||||
if row is None:
|
||||
return None
|
||||
trade_balance_raw = None
|
||||
fee_schedule_raw = None
|
||||
if row["trade_balance_raw"] is not None:
|
||||
trade_balance_raw = orjson.loads(row["trade_balance_raw"])
|
||||
if row["fee_schedule_raw"] is not None:
|
||||
fee_schedule_raw = orjson.loads(row["fee_schedule_raw"])
|
||||
return KrakenAccountSnapshot(
|
||||
snapshot_at=row["snapshot_at"],
|
||||
fee_tier=row["fee_tier"],
|
||||
maker_fee=row["maker_fee"],
|
||||
taker_fee=row["taker_fee"],
|
||||
thirty_day_volume=row["thirty_day_volume"],
|
||||
trade_balance_raw=(
|
||||
orjson.loads(row["trade_balance_raw"]) if row["trade_balance_raw"] else None
|
||||
),
|
||||
fee_schedule_raw=(
|
||||
orjson.loads(row["fee_schedule_raw"]) if row["fee_schedule_raw"] else None
|
||||
),
|
||||
trade_balance_raw=trade_balance_raw,
|
||||
fee_schedule_raw=fee_schedule_raw,
|
||||
)
|
||||
|
||||
|
||||
@@ -1065,6 +1079,12 @@ class LogRepository:
|
||||
|
||||
async def insert(self, record: LogRecord) -> None:
|
||||
async with self._store.pool.acquire() as conn:
|
||||
context = None
|
||||
if record.context:
|
||||
try:
|
||||
context = orjson.dumps(record.context).decode("utf-8")
|
||||
except Exception:
|
||||
context = None
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO app_logs (recorded_at, level, logger, message, context)
|
||||
@@ -1074,7 +1094,7 @@ class LogRepository:
|
||||
record.level,
|
||||
record.logger,
|
||||
record.message,
|
||||
orjson.dumps(record.context).decode("utf-8") if record.context else None,
|
||||
context,
|
||||
)
|
||||
|
||||
async def query(
|
||||
@@ -1202,8 +1222,6 @@ class LogAggregationRepository:
|
||||
"""Aggregate log counts per level for entries >= since, grouped by period."""
|
||||
period_map = {
|
||||
"1h": "date_trunc('hour', recorded_at)",
|
||||
"3h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 3)",
|
||||
"6h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 6)",
|
||||
"1d": "date_trunc('day', recorded_at)",
|
||||
"1w": "date_trunc('week', recorded_at)",
|
||||
"1mo": "date_trunc('month', recorded_at)",
|
||||
|
||||
Reference in New Issue
Block a user