Refactor log aggregation periods and improve code formatting

- Removed the "3h" and "6h" periods from the log aggregation process in maintenance.py to streamline log counts.
- Enhanced code readability by adjusting line breaks and indentation in repositories.py for better clarity.
This commit is contained in:
2026-06-09 09:41:23 +02:00
parent dc99f1604e
commit 403daa6cf1
6 changed files with 1327 additions and 1124 deletions
File diff suppressed because it is too large Load Diff
+1
View File
@@ -0,0 +1 @@
"""Dashboard module for monitoring and controlling the arbitrage bot."""
+79
View File
@@ -0,0 +1,79 @@
from __future__ import annotations
import json
from asyncio import Lock
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from importlib import resources
from pathlib import Path
from typing import cast
from urllib.parse import parse_qs
import orjson
from fastapi import APIRouter, Depends, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from arbitrade.storage.repositories import (
AuditRecord,
AuditRepository,
BacktestJobRepository,
ConfigPairingRepository,
KrakenAccountSnapshotRepository,
LogRepository,
)
async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
repo = BacktestJobRepository(request.app.state.store)
jobs = await repo.list_jobs(limit=5)
reports = []
for job in jobs:
report: dict[str, object] = {
"id": str(job.id),
"status": job.status,
}
if job.created_at is not None:
report["created_at"] = job.created_at.isoformat()
if job.finished_at is not None:
report["finished_at"] = job.finished_at.isoformat()
reports.append(report)
return reports
async def _backtesting_panel_context(
request: Request,
*,
status: str = "idle",
message: str = "Configure a replay run and execute backtest.",
latest_report: dict[str, object] | None = None,
defaults: dict[str, str] | None = None,
) -> dict[str, object]:
default_values = {
"symbols": "",
"start_time": "",
"end_time": "",
"starting_balances": "USD=1000.0",
"trade_capital": "100.0",
"min_profit_threshold": "0.0005",
"fee_profile": "api",
"custom_fee_rate": "",
"slippage_bps": "4.0",
"execution_latency_ms": "20.0",
}
if defaults is not None:
default_values.update(defaults)
reports = await _recent_backtest_reports(request)
latest = latest_report or (reports[0] if reports else None)
return {
"status": status,
"message": message,
"flash_message": "",
"no_enabled_pairings": False,
"latest_report": latest,
"recent_reports": reports,
"run_endpoint": "/dashboard/backtesting/run",
"reports_endpoint": "/dashboard/api/backtesting/reports",
**default_values,
}
File diff suppressed because it is too large Load Diff
+3 -2
View File
@@ -21,7 +21,7 @@ async def run_log_aggregation(store: PgStore) -> None:
"""Aggregate log counts for the last 2 hours across all periods."""
repo = LogAggregationRepository(store)
since = datetime.now(UTC) - timedelta(hours=2)
periods = ["1h", "3h", "6h", "1d", "1w", "1mo"]
periods = ["1h", "1d", "1w", "1mo"]
for period in periods:
try:
await repo.aggregate_since(since, period)
@@ -36,7 +36,8 @@ async def run_log_archive(store: PgStore, retention_days: int = _RETENTION_DAYS)
repo = LogArchiveRepository(store)
count = await repo.archive_before(cutoff)
if count > 0:
_LOG.info("log_archive_complete", cutoff=cutoff.isoformat(), archived=count)
_LOG.info("log_archive_complete",
cutoff=cutoff.isoformat(), archived=count)
return count
+18 -11
View File
@@ -258,7 +258,8 @@ class AuditRepository:
record.actor,
record.event_type,
record.decision,
(None if record.payload is None else orjson.dumps(record.payload).decode("utf-8")),
(None if record.payload is None else orjson.dumps(
record.payload).decode("utf-8")),
record.correlation_id,
)
@@ -291,7 +292,8 @@ class AuditRepository:
decision=str(row["decision"]),
payload=payload,
correlation_id=(
str(row["correlation_id"]) if row["correlation_id"] is not None else None
str(row["correlation_id"]
) if row["correlation_id"] is not None else None
),
)
)
@@ -362,7 +364,8 @@ class RuntimeStateRepository:
is_running=bool(row["is_running"]),
kill_switch_active=bool(row["kill_switch_active"]),
kill_switch_reason=(
str(row["kill_switch_reason"]) if row["kill_switch_reason"] is not None else None
str(row["kill_switch_reason"]
) if row["kill_switch_reason"] is not None else None
),
open_trade_count=int(row["open_trade_count"]),
last_known_balances=balances,
@@ -772,7 +775,8 @@ class ConfigBacktestingDefaultsRepository:
if row:
return ConfigBacktestingDefaults(
starting_balances=(
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
orjson.loads(row["starting_balances"]
) if row["starting_balances"] else None
),
trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"],
@@ -793,7 +797,8 @@ class ConfigBacktestingDefaultsRepository:
if row:
return ConfigBacktestingDefaults(
starting_balances=(
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
orjson.loads(row["starting_balances"]
) if row["starting_balances"] else None
),
trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"],
@@ -830,7 +835,8 @@ class ConfigBacktestingDefaultsRepository:
if row:
return ConfigBacktestingDefaults(
starting_balances=(
orjson.loads(row["starting_balances"]) if row["starting_balances"] else None
orjson.loads(row["starting_balances"]
) if row["starting_balances"] else None
),
trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"],
@@ -901,10 +907,12 @@ class KrakenAccountSnapshotRepository:
taker_fee=row["taker_fee"],
thirty_day_volume=row["thirty_day_volume"],
trade_balance_raw=(
orjson.loads(row["trade_balance_raw"]) if row["trade_balance_raw"] else None
orjson.loads(row["trade_balance_raw"]
) if row["trade_balance_raw"] else None
),
fee_schedule_raw=(
orjson.loads(row["fee_schedule_raw"]) if row["fee_schedule_raw"] else None
orjson.loads(row["fee_schedule_raw"]
) if row["fee_schedule_raw"] else None
),
)
@@ -1074,7 +1082,8 @@ class LogRepository:
record.level,
record.logger,
record.message,
orjson.dumps(record.context).decode("utf-8") if record.context else None,
orjson.dumps(record.context).decode(
"utf-8") if record.context else None,
)
async def query(
@@ -1202,8 +1211,6 @@ class LogAggregationRepository:
"""Aggregate log counts per level for entries >= since, grouped by period."""
period_map = {
"1h": "date_trunc('hour', recorded_at)",
"3h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 3)",
"6h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 6)",
"1d": "date_trunc('day', recorded_at)",
"1w": "date_trunc('week', recorded_at)",
"1mo": "date_trunc('month', recorded_at)",