feat: implement logging system with aggregation and archiving tasks

This commit is contained in:
2026-06-07 18:06:35 +02:00
parent db2e02c316
commit cf5ff2e2d8
8 changed files with 486 additions and 15 deletions
+10
View File
@@ -17,6 +17,8 @@ from arbitrade.config.settings import Settings
from arbitrade.exchange.fee_service import run_fee_sync_loop from arbitrade.exchange.fee_service import run_fee_sync_loop
from arbitrade.exchange.kraken_rest import KrakenRestClient from arbitrade.exchange.kraken_rest import KrakenRestClient
from arbitrade.exchange.kraken_ws import KrakenWsClient from arbitrade.exchange.kraken_ws import KrakenWsClient
from arbitrade.logging.db_sink import get_db_sink
from arbitrade.logging.maintenance import run_log_aggregation_loop, run_log_archive_loop
from arbitrade.logging_setup import configure_logging from arbitrade.logging_setup import configure_logging
from arbitrade.market_data.feed import MarketDataFeed from arbitrade.market_data.feed import MarketDataFeed
from arbitrade.market_data.feed_builder import ( from arbitrade.market_data.feed_builder import (
@@ -108,6 +110,7 @@ def create_app(settings: Settings) -> FastAPI:
async def lifespan(app: FastAPI) -> AsyncIterator[None]: async def lifespan(app: FastAPI) -> AsyncIterator[None]:
await app.state.store.start() await app.state.store.start()
await app.state.store.migrate() await app.state.store.migrate()
get_db_sink().start_consumer(db)
await app.state.configuration_service.load_database_settings() await app.state.configuration_service.load_database_settings()
await restore_runtime_state(app) await restore_runtime_state(app)
fee_sync_task = asyncio.create_task( fee_sync_task = asyncio.create_task(
@@ -135,6 +138,12 @@ def create_app(settings: Settings) -> FastAPI:
app.state.fee_sync_task = fee_sync_task app.state.fee_sync_task = fee_sync_task
app.state.pairing_sync_task = pairing_sync_task app.state.pairing_sync_task = pairing_sync_task
app.state.backtest_task = backtest_task app.state.backtest_task = backtest_task
app.state.log_aggregation_task = asyncio.create_task(
run_log_aggregation_loop(db), name="log_aggregation"
)
app.state.log_archive_task = asyncio.create_task(
run_log_archive_loop(db), name="log_archive"
)
yield yield
fee_sync_stop_event.set() fee_sync_stop_event.set()
pairing_sync_stop_event.set() pairing_sync_stop_event.set()
@@ -170,6 +179,7 @@ def create_app(settings: Settings) -> FastAPI:
await kraken_client.close() await kraken_client.close()
await graceful_shutdown(app) await graceful_shutdown(app)
await app.state.store.stop() await app.state.store.stop()
await get_db_sink().stop_consumer()
app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan) app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan)
app.state.settings = settings app.state.settings = settings
+12 -2
View File
@@ -27,6 +27,7 @@ from arbitrade.storage.repositories import (
BacktestJobRepository, BacktestJobRepository,
ConfigPairingRepository, ConfigPairingRepository,
KrakenAccountSnapshotRepository, KrakenAccountSnapshotRepository,
LogRepository,
) )
router = APIRouter(dependencies=[Depends(require_dashboard_auth)]) router = APIRouter(dependencies=[Depends(require_dashboard_auth)])
@@ -988,6 +989,15 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
) )
fee_rate = await _fee_rate_for_profile(defaults["fee_profile"], custom_fee_rate, request=request) fee_rate = await _fee_rate_for_profile(defaults["fee_profile"], custom_fee_rate, request=request)
# Use enabled pairings from DB when none selected
symbols_str = defaults["symbols"]
if not symbols_str.strip():
pairing_repo = ConfigPairingRepository(request.app.state.store)
enabled = await pairing_repo.list_pairings(enabled_only=True)
symbols_str = ",".join(
f"{p.base_asset}/{p.quote_asset}" for p in enabled
)
config_dict: dict[str, object] = { config_dict: dict[str, object] = {
"source": defaults["source"], "source": defaults["source"],
"starting_balances": defaults["starting_balances"], "starting_balances": defaults["starting_balances"],
@@ -999,12 +1009,12 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
"execution_latency_ms": float(defaults["execution_latency_ms"]), "execution_latency_ms": float(defaults["execution_latency_ms"]),
"start_time": defaults["start_time"], "start_time": defaults["start_time"],
"end_time": defaults["end_time"], "end_time": defaults["end_time"],
"symbols": defaults["symbols"], "symbols": symbols_str,
} }
store = request.app.state.store store = request.app.state.store
repo = BacktestJobRepository(store) repo = BacktestJobRepository(store)
events_label = defaults["symbols"] if defaults["symbols"] else "DB-sourced" events_label = symbols_str if symbols_str else "DB-sourced"
job = await repo.create_job(events_label, config_dict) job = await repo.create_job(events_label, config_dict)
msg_job = job.id[:8] if job.id else "unknown" msg_job = job.id[:8] if job.id else "unknown"
+123
View File
@@ -0,0 +1,123 @@
"""DB sink — writes structlog events to app_logs table via background queue."""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime
from typing import Any
import structlog
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import LogRecord, LogRepository
_LOG = structlog.get_logger(__name__)
class DbSinkProcessor:
"""structlog processor that queues log events for DB writes.
Must be registered in the structlog processor chain. The consumer
task must be started on app init via ``start_consumer(store)``.
"""
def __init__(self) -> None:
self._queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=2000)
self._consumer_task: asyncio.Task[None] | None = None
def __call__(self, logger: Any, method_name: str, event_dict: dict[str, Any]) -> dict[str, Any]:
"""Processor — called for every structlog event. Non-blocking."""
try:
self._queue.put_nowait(dict(event_dict))
except asyncio.QueueFull:
pass # drop event if queue full, avoid backpressure
return event_dict
def start_consumer(self, store: PgStore) -> None:
"""Start background consumer task."""
if self._consumer_task is not None and not self._consumer_task.done():
return
self._consumer_task = asyncio.create_task(
self._consume(store), name="log_db_sink"
)
async def stop_consumer(self) -> None:
"""Drain queue and cancel consumer."""
if self._consumer_task is None:
return
self._consumer_task.cancel()
try:
await self._consumer_task
except asyncio.CancelledError:
pass
self._consumer_task = None
# Flush remaining
await self._flush(store=None)
async def _consume(self, store: PgStore) -> None:
repo = LogRepository(store)
while True:
try:
event = await self._queue.get()
await self._write_one(repo, event)
except asyncio.CancelledError:
break
except Exception:
pass # swallow consumer errors, never crash
# Final flush
await self._flush(repo)
async def _write_one(self, repo: LogRepository, event: dict[str, Any]) -> None:
recorded_at = event.pop("timestamp", None)
if isinstance(recorded_at, str):
try:
recorded_at = datetime.fromisoformat(recorded_at)
except ValueError:
recorded_at = datetime.now(UTC)
elif not isinstance(recorded_at, datetime):
recorded_at = datetime.now(UTC)
level = str(event.pop("level", "info")).upper()
logger = str(event.pop("logger", "root"))
message = str(event.pop("event", event.pop("message", "")))
context = {k: v for k, v in event.items() if not k.startswith("_")} if event else None
record = LogRecord(
recorded_at=recorded_at,
level=level,
logger=logger,
message=message,
context=context if context else None,
)
try:
await repo.insert(record)
except Exception:
pass # never crash from DB write failure
async def _flush(self, repo: LogRepository | None) -> None:
drained = 0
while not self._queue.empty() and drained < 500:
try:
event = self._queue.get_nowait()
if repo is not None:
await self._write_one(repo, event)
drained += 1
except asyncio.QueueEmpty:
break
except Exception:
pass
# Module-level singleton
_db_sink = DbSinkProcessor()
def get_db_sink() -> DbSinkProcessor:
return _db_sink
def db_sink_processor(
logger: Any, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
"""Standalone processor function wrapping the singleton."""
return _db_sink(logger, method_name, event_dict)
+61
View File
@@ -0,0 +1,61 @@
"""Log maintenance — aggregation and archiving tasks."""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime, timedelta
import structlog
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import LogAggregationRepository, LogArchiveRepository
_LOG = structlog.get_logger(__name__)
_AGGREGATE_INTERVAL = 3600 # 1 hour
_ARCHIVE_INTERVAL = 86400 # 1 day
_RETENTION_DAYS = 30
async def run_log_aggregation(store: PgStore) -> None:
"""Aggregate log counts for the last 2 hours across all periods."""
repo = LogAggregationRepository(store)
since = datetime.now(UTC) - timedelta(hours=2)
periods = ["1h", "3h", "6h", "1d", "1w", "1mo"]
for period in periods:
try:
await repo.aggregate_since(since, period)
except Exception:
_LOG.exception("log_aggregation_failed", period=period)
_LOG.info("log_aggregation_complete", since=since.isoformat())
async def run_log_archive(store: PgStore, retention_days: int = _RETENTION_DAYS) -> int:
"""Archive log entries older than retention_days."""
cutoff = datetime.now(UTC) - timedelta(days=retention_days)
repo = LogArchiveRepository(store)
count = await repo.archive_before(cutoff)
if count > 0:
_LOG.info("log_archive_complete",
cutoff=cutoff.isoformat(), archived=count)
return count
async def run_log_aggregation_loop(store: PgStore) -> None:
"""Periodic aggregation loop."""
while True:
try:
await run_log_aggregation(store)
except Exception:
_LOG.exception("log_aggregation_loop_error")
await asyncio.sleep(_AGGREGATE_INTERVAL)
async def run_log_archive_loop(store: PgStore) -> None:
"""Periodic archive loop."""
while True:
try:
await run_log_archive(store)
except Exception:
_LOG.exception("log_archive_loop_error")
await asyncio.sleep(_ARCHIVE_INTERVAL)
+3
View File
@@ -6,6 +6,8 @@ from typing import Any
import structlog import structlog
from arbitrade.logging.db_sink import db_sink_processor
def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None: def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None:
level = getattr(logging, log_level.upper(), logging.INFO) level = getattr(logging, log_level.upper(), logging.INFO)
@@ -17,6 +19,7 @@ def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None:
structlog.stdlib.add_log_level, structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name, structlog.stdlib.add_logger_name,
timestamper, timestamper,
db_sink_processor,
] ]
if json_logs: if json_logs:
+235 -1
View File
@@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import UTC, datetime, timedelta
from typing import Any from typing import Any
import orjson import orjson
@@ -1009,3 +1009,237 @@ class BacktestJobRepository:
elif isinstance(result, str): elif isinstance(result, str):
return result != "DELETE 0" return result != "DELETE 0"
return False return False
@dataclass(slots=True)
class LogRecord:
recorded_at: datetime
level: str
logger: str
message: str
context: dict[str, Any] | None = None
@dataclass(slots=True)
class LogAggregateRecord:
bucket_start: datetime
period: str
level: str
count: int
class LogRepository:
def __init__(self, store: PgStore) -> None:
self._store = store
async def insert(self, record: LogRecord) -> None:
async with self._store.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO app_logs (recorded_at, level, logger, message, context)
VALUES ($1, $2, $3, $4, $5)
""",
record.recorded_at,
record.level,
record.logger,
record.message,
orjson.dumps(record.context).decode("utf-8") if record.context else None,
)
async def query(
self,
*,
level: str | None = None,
before: datetime | None = None,
after: datetime | None = None,
limit: int = 50,
offset: int = 0,
) -> list[LogRecord]:
async with self._store.pool.acquire() as conn:
conditions: list[str] = []
params: list[Any] = []
idx = 1
if level:
conditions.append(f"level = ${idx}")
params.append(level.upper())
idx += 1
if before:
conditions.append(f"recorded_at < ${idx}")
params.append(before)
idx += 1
if after:
conditions.append(f"recorded_at >= ${idx}")
params.append(after)
idx += 1
where = ""
if conditions:
where = "WHERE " + " AND ".join(conditions)
rows = await conn.fetch(
f"""
SELECT recorded_at, level, logger, message, context
FROM app_logs
{where}
ORDER BY recorded_at DESC
LIMIT ${idx} OFFSET ${idx + 1}
""",
*params, limit, offset,
)
return [
LogRecord(
recorded_at=r["recorded_at"],
level=r["level"],
logger=r["logger"],
message=r["message"],
context=r["context"],
)
for r in rows
]
async def count(self, level: str | None = None) -> int:
async with self._store.pool.acquire() as conn:
if level:
row = await conn.fetchrow(
"SELECT COUNT(*) as cnt FROM app_logs WHERE level = $1", level.upper()
)
else:
row = await conn.fetchrow("SELECT COUNT(*) as cnt FROM app_logs")
return row["cnt"] if row else 0
async def count_filtered(
self,
*,
level: str | None = None,
before: datetime | None = None,
after: datetime | None = None,
) -> int:
async with self._store.pool.acquire() as conn:
conditions: list[str] = []
params: list[Any] = []
idx = 1
if level:
conditions.append(f"level = ${idx}")
params.append(level.upper())
idx += 1
if before:
conditions.append(f"recorded_at < ${idx}")
params.append(before)
idx += 1
if after:
conditions.append(f"recorded_at >= ${idx}")
params.append(after)
idx += 1
where = ""
if conditions:
where = "WHERE " + " AND ".join(conditions)
row = await conn.fetchrow(f"SELECT COUNT(*) as cnt FROM app_logs {where}", *params)
return row["cnt"] if row else 0
class LogArchiveRepository:
def __init__(self, store: PgStore) -> None:
self._store = store
async def archive_before(self, cutoff: datetime) -> int:
"""Move rows older than cutoff from app_logs to app_log_archives."""
async with self._store.pool.acquire() as conn:
# Insert into archive
result = await conn.execute(
"""
INSERT INTO app_log_archives (id, recorded_at, level, logger, message, context)
SELECT id, recorded_at, level, logger, message, context
FROM app_logs
WHERE recorded_at < $1
""",
cutoff,
)
# Delete originals
await conn.execute(
"DELETE FROM app_logs WHERE recorded_at < $1", cutoff
)
if isinstance(result, str):
parts = result.split()
if len(parts) == 2 and parts[0] == "INSERT":
return int(parts[1])
return 0
class LogAggregationRepository:
def __init__(self, store: PgStore) -> None:
self._store = store
async def aggregate_since(self, since: datetime, period: str) -> None:
"""Aggregate log counts per level for entries >= since, grouped by period."""
period_map = {
"1h": "date_trunc('hour', recorded_at)",
"3h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 3)",
"6h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 6)",
"1d": "date_trunc('day', recorded_at)",
"1w": "date_trunc('week', recorded_at)",
"1mo": "date_trunc('month', recorded_at)",
}
bucket_expr = period_map.get(period)
if bucket_expr is None:
raise ValueError(f"Unknown period: {period}")
async with self._store.pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT {bucket_expr} AS bucket_start, level, COUNT(*) AS cnt
FROM app_logs
WHERE recorded_at >= $1
GROUP BY bucket_start, level
""",
since,
)
for row in rows:
await conn.execute(
"""
INSERT INTO app_log_aggregates (bucket_start, period, level, count)
VALUES ($1, $2, $3, $4)
ON CONFLICT (bucket_start, period, level)
DO UPDATE SET count = EXCLUDED.count
""",
row["bucket_start"],
period,
str(row["level"]),
row["cnt"],
)
async def query_aggregates(
self,
period: str,
level: str | None = None,
limit: int = 50,
) -> list[LogAggregateRecord]:
async with self._store.pool.acquire() as conn:
if level:
rows = await conn.fetch(
"""
SELECT bucket_start, period, level, count
FROM app_log_aggregates
WHERE period = $1 AND level = $2
ORDER BY bucket_start DESC
LIMIT $3
""",
period, level.upper(), limit,
)
else:
rows = await conn.fetch(
"""
SELECT bucket_start, period, level, count
FROM app_log_aggregates
WHERE period = $1
ORDER BY bucket_start DESC
LIMIT $2
""",
period, limit,
)
return [
LogAggregateRecord(
bucket_start=r["bucket_start"],
period=r["period"],
level=r["level"],
count=r["count"],
)
for r in rows
]
+36 -1
View File
@@ -188,4 +188,39 @@ ALTER TABLE market_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPT
ALTER TABLE kraken_account_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC'; ALTER TABLE kraken_account_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC';
ALTER TABLE backtest_jobs ALTER COLUMN created_at TYPE TIMESTAMPTZ USING created_at AT TIME ZONE 'UTC'; ALTER TABLE backtest_jobs ALTER COLUMN created_at TYPE TIMESTAMPTZ USING created_at AT TIME ZONE 'UTC';
ALTER TABLE backtest_jobs ALTER COLUMN started_at TYPE TIMESTAMPTZ USING started_at AT TIME ZONE 'UTC'; ALTER TABLE backtest_jobs ALTER COLUMN started_at TYPE TIMESTAMPTZ USING started_at AT TIME ZONE 'UTC';
ALTER TABLE backtest_jobs ALTER COLUMN finished_at TYPE TIMESTAMPTZ USING finished_at AT TIME ZONE 'UTC'; ALTER TABLE backtest_jobs ALTER COLUMN finished_at TYPE TIMESTAMPTZ USING finished_at AT TIME ZONE 'UTC';
-- ========================================
-- Logging tables
-- ========================================
CREATE TABLE IF NOT EXISTS app_logs (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
recorded_at TIMESTAMPTZ NOT NULL,
level VARCHAR NOT NULL,
logger VARCHAR NOT NULL,
message TEXT NOT NULL,
context JSONB
);
CREATE INDEX IF NOT EXISTS idx_app_logs_recorded_at ON app_logs (recorded_at DESC);
CREATE INDEX IF NOT EXISTS idx_app_logs_level ON app_logs (level);
CREATE TABLE IF NOT EXISTS app_log_archives (
id UUID PRIMARY KEY,
recorded_at TIMESTAMPTZ NOT NULL,
level VARCHAR NOT NULL,
logger VARCHAR NOT NULL,
message TEXT NOT NULL,
context JSONB,
archived_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_app_log_archives_recorded_at ON app_log_archives (recorded_at DESC);
CREATE TABLE IF NOT EXISTS app_log_aggregates (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
bucket_start TIMESTAMPTZ NOT NULL,
period VARCHAR NOT NULL,
level VARCHAR NOT NULL,
count INTEGER NOT NULL DEFAULT 0,
UNIQUE (bucket_start, period, level)
);
CREATE INDEX IF NOT EXISTS idx_app_log_aggregates_bucket ON app_log_aggregates (bucket_start DESC, period);
@@ -52,17 +52,12 @@
hx-swap="outerHTML" hx-swap="outerHTML"
> >
<input type="hidden" name="source" value="db" /> <input type="hidden" name="source" value="db" />
<label class="field"> <input type="hidden" name="symbols" value="" />
<span>Pairings</span> <div class="meta" style="margin-bottom: 12px">
<div Pairings managed in
id="pairing-checkboxes" <a href="/dashboard/config/pairings">Configuration → Pairings</a>. Only
hx-get="/dashboard/fragment/backtesting-pairings" enabled pairings are backtested.
hx-trigger="load" </div>
style="display: flex; flex-wrap: wrap; gap: 6px; margin-top: 4px"
>
<span style="opacity: 0.5">Loading pairings...</span>
</div>
</label>
<label class="field"> <label class="field">
<span>Start time (ISO datetime, optional)</span> <span>Start time (ISO datetime, optional)</span>
<input <input