Compare commits

..

2 Commits

Author SHA1 Message Date
zwitschi e4f5d8dfcc refactor: clean up imports and improve code formatting across multiple files
CI / lint-test-build (push) Successful in 2m21s
2026-06-09 10:02:41 +02:00
zwitschi 403daa6cf1 Refactor log aggregation periods and improve code formatting
- Removed the "3h" and "6h" periods from the log aggregation process in maintenance.py to streamline log counts.
- Enhanced code readability by adjusting line breaks and indentation in repositories.py for better clarity.
2026-06-09 09:41:23 +02:00
6 changed files with 1307 additions and 1145 deletions
File diff suppressed because it is too large Load Diff
+1
View File
@@ -0,0 +1 @@
"""Dashboard module for monitoring and controlling the arbitrage bot."""
+63
View File
@@ -0,0 +1,63 @@
from __future__ import annotations
from fastapi import Request
from arbitrade.storage.repositories import (
BacktestJobRepository,
)
async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
repo = BacktestJobRepository(request.app.state.store)
jobs = await repo.list_jobs(limit=5)
reports = []
for job in jobs:
report: dict[str, object] = {
"id": str(job.id),
"status": job.status,
}
if job.created_at is not None:
report["created_at"] = job.created_at.isoformat()
if job.finished_at is not None:
report["finished_at"] = job.finished_at.isoformat()
reports.append(report)
return reports
async def _backtesting_panel_context(
request: Request,
*,
status: str = "idle",
message: str = "Configure a replay run and execute backtest.",
latest_report: dict[str, object] | None = None,
defaults: dict[str, str] | None = None,
) -> dict[str, object]:
default_values = {
"symbols": "",
"start_time": "",
"end_time": "",
"starting_balances": "USD=1000.0",
"trade_capital": "100.0",
"min_profit_threshold": "0.0005",
"fee_profile": "api",
"custom_fee_rate": "",
"slippage_bps": "4.0",
"execution_latency_ms": "20.0",
}
if defaults is not None:
default_values.update(defaults)
reports = await _recent_backtest_reports(request)
latest = latest_report or (reports[0] if reports else None)
return {
"status": status,
"message": message,
"flash_message": "",
"no_enabled_pairings": False,
"latest_report": latest,
"recent_reports": reports,
"run_endpoint": "/dashboard/backtesting/run",
"reports_endpoint": "/dashboard/api/backtesting/reports",
**default_values,
}
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -21,7 +21,7 @@ async def run_log_aggregation(store: PgStore) -> None:
"""Aggregate log counts for the last 2 hours across all periods.""" """Aggregate log counts for the last 2 hours across all periods."""
repo = LogAggregationRepository(store) repo = LogAggregationRepository(store)
since = datetime.now(UTC) - timedelta(hours=2) since = datetime.now(UTC) - timedelta(hours=2)
periods = ["1h", "3h", "6h", "1d", "1w", "1mo"] periods = ["1h", "1d", "1w", "1mo"]
for period in periods: for period in periods:
try: try:
await repo.aggregate_since(since, period) await repo.aggregate_since(since, period)
+43 -25
View File
@@ -242,6 +242,12 @@ class AuditRepository:
async def insert(self, record: AuditRecord) -> None: async def insert(self, record: AuditRecord) -> None:
async with self._store.pool.acquire() as conn: async with self._store.pool.acquire() as conn:
payload = None
if record.payload is not None:
try:
payload = orjson.dumps(record.payload).decode("utf-8")
except Exception:
payload = None
await conn.execute( await conn.execute(
""" """
INSERT INTO audit_events ( INSERT INTO audit_events (
@@ -258,7 +264,7 @@ class AuditRepository:
record.actor, record.actor,
record.event_type, record.event_type,
record.decision, record.decision,
(None if record.payload is None else orjson.dumps(record.payload).decode("utf-8")), payload,
record.correlation_id, record.correlation_id,
) )
@@ -278,6 +284,9 @@ class AuditRepository:
for row in rows: for row in rows:
payload: dict[str, Any] | None = None payload: dict[str, Any] | None = None
raw_payload = row["payload"] raw_payload = row["payload"]
correlation_id = None
if row["correlation_id"] is not None:
correlation_id = str(row["correlation_id"])
if isinstance(raw_payload, str) and raw_payload: if isinstance(raw_payload, str) and raw_payload:
decoded = orjson.loads(raw_payload) decoded = orjson.loads(raw_payload)
if isinstance(decoded, dict): if isinstance(decoded, dict):
@@ -290,9 +299,7 @@ class AuditRepository:
event_type=str(row["event_type"]), event_type=str(row["event_type"]),
decision=str(row["decision"]), decision=str(row["decision"]),
payload=payload, payload=payload,
correlation_id=( correlation_id=correlation_id,
str(row["correlation_id"]) if row["correlation_id"] is not None else None
),
) )
) )
@@ -352,6 +359,9 @@ class RuntimeStateRepository:
balances: dict[str, Any] | None = None balances: dict[str, Any] | None = None
raw_balances = row["last_known_balances"] raw_balances = row["last_known_balances"]
kill_switch_reason = None
if row["kill_switch_reason"] is not None:
kill_switch_reason = str(row["kill_switch_reason"])
if isinstance(raw_balances, str) and raw_balances: if isinstance(raw_balances, str) and raw_balances:
decoded = orjson.loads(raw_balances) decoded = orjson.loads(raw_balances)
if isinstance(decoded, dict): if isinstance(decoded, dict):
@@ -361,9 +371,7 @@ class RuntimeStateRepository:
snapshot_at=row["snapshot_at"], snapshot_at=row["snapshot_at"],
is_running=bool(row["is_running"]), is_running=bool(row["is_running"]),
kill_switch_active=bool(row["kill_switch_active"]), kill_switch_active=bool(row["kill_switch_active"]),
kill_switch_reason=( kill_switch_reason=kill_switch_reason,
str(row["kill_switch_reason"]) if row["kill_switch_reason"] is not None else None
),
open_trade_count=int(row["open_trade_count"]), open_trade_count=int(row["open_trade_count"]),
last_known_balances=balances, last_known_balances=balances,
note=str(row["note"]) if row["note"] is not None else None, note=str(row["note"]) if row["note"] is not None else None,
@@ -770,10 +778,12 @@ class ConfigBacktestingDefaultsRepository:
defaults.execution_latency_ms, defaults.execution_latency_ms,
) )
if row: if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults( return ConfigBacktestingDefaults(
starting_balances=( starting_balances=starting_balances,
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
),
trade_capital=row["trade_capital"], trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"], min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"], slippage_bps=row["slippage_bps"],
@@ -791,10 +801,11 @@ class ConfigBacktestingDefaultsRepository:
LIMIT 1 LIMIT 1
""") """)
if row: if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults( return ConfigBacktestingDefaults(
starting_balances=( starting_balances=starting_balances,
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
),
trade_capital=row["trade_capital"], trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"], min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"], slippage_bps=row["slippage_bps"],
@@ -828,10 +839,11 @@ class ConfigBacktestingDefaultsRepository:
defaults.execution_latency_ms, defaults.execution_latency_ms,
) )
if row: if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults( return ConfigBacktestingDefaults(
starting_balances=( starting_balances=starting_balances,
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
),
trade_capital=row["trade_capital"], trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"], min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"], slippage_bps=row["slippage_bps"],
@@ -894,18 +906,20 @@ class KrakenAccountSnapshotRepository:
""") """)
if row is None: if row is None:
return None return None
trade_balance_raw = None
fee_schedule_raw = None
if row["trade_balance_raw"] is not None:
trade_balance_raw = orjson.loads(row["trade_balance_raw"])
if row["fee_schedule_raw"] is not None:
fee_schedule_raw = orjson.loads(row["fee_schedule_raw"])
return KrakenAccountSnapshot( return KrakenAccountSnapshot(
snapshot_at=row["snapshot_at"], snapshot_at=row["snapshot_at"],
fee_tier=row["fee_tier"], fee_tier=row["fee_tier"],
maker_fee=row["maker_fee"], maker_fee=row["maker_fee"],
taker_fee=row["taker_fee"], taker_fee=row["taker_fee"],
thirty_day_volume=row["thirty_day_volume"], thirty_day_volume=row["thirty_day_volume"],
trade_balance_raw=( trade_balance_raw=trade_balance_raw,
orjson.loads(row["trade_balance_raw"]) if row["trade_balance_raw"] else None fee_schedule_raw=fee_schedule_raw,
),
fee_schedule_raw=(
orjson.loads(row["fee_schedule_raw"]) if row["fee_schedule_raw"] else None
),
) )
@@ -1065,6 +1079,12 @@ class LogRepository:
async def insert(self, record: LogRecord) -> None: async def insert(self, record: LogRecord) -> None:
async with self._store.pool.acquire() as conn: async with self._store.pool.acquire() as conn:
context = None
if record.context:
try:
context = orjson.dumps(record.context).decode("utf-8")
except Exception:
context = None
await conn.execute( await conn.execute(
""" """
INSERT INTO app_logs (recorded_at, level, logger, message, context) INSERT INTO app_logs (recorded_at, level, logger, message, context)
@@ -1074,7 +1094,7 @@ class LogRepository:
record.level, record.level,
record.logger, record.logger,
record.message, record.message,
orjson.dumps(record.context).decode("utf-8") if record.context else None, context,
) )
async def query( async def query(
@@ -1202,8 +1222,6 @@ class LogAggregationRepository:
"""Aggregate log counts per level for entries >= since, grouped by period.""" """Aggregate log counts per level for entries >= since, grouped by period."""
period_map = { period_map = {
"1h": "date_trunc('hour', recorded_at)", "1h": "date_trunc('hour', recorded_at)",
"3h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 3)",
"6h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 6)",
"1d": "date_trunc('day', recorded_at)", "1d": "date_trunc('day', recorded_at)",
"1w": "date_trunc('week', recorded_at)", "1w": "date_trunc('week', recorded_at)",
"1mo": "date_trunc('month', recorded_at)", "1mo": "date_trunc('month', recorded_at)",