diff --git a/src/arbitrade/api/app.py b/src/arbitrade/api/app.py index e63232b..a4c7cba 100644 --- a/src/arbitrade/api/app.py +++ b/src/arbitrade/api/app.py @@ -17,6 +17,8 @@ from arbitrade.config.settings import Settings from arbitrade.exchange.fee_service import run_fee_sync_loop from arbitrade.exchange.kraken_rest import KrakenRestClient from arbitrade.exchange.kraken_ws import KrakenWsClient +from arbitrade.logging.db_sink import get_db_sink +from arbitrade.logging.maintenance import run_log_aggregation_loop, run_log_archive_loop from arbitrade.logging_setup import configure_logging from arbitrade.market_data.feed import MarketDataFeed from arbitrade.market_data.feed_builder import ( @@ -108,6 +110,7 @@ def create_app(settings: Settings) -> FastAPI: async def lifespan(app: FastAPI) -> AsyncIterator[None]: await app.state.store.start() await app.state.store.migrate() + get_db_sink().start_consumer(db) await app.state.configuration_service.load_database_settings() await restore_runtime_state(app) fee_sync_task = asyncio.create_task( @@ -135,6 +138,12 @@ def create_app(settings: Settings) -> FastAPI: app.state.fee_sync_task = fee_sync_task app.state.pairing_sync_task = pairing_sync_task app.state.backtest_task = backtest_task + app.state.log_aggregation_task = asyncio.create_task( + run_log_aggregation_loop(db), name="log_aggregation" + ) + app.state.log_archive_task = asyncio.create_task( + run_log_archive_loop(db), name="log_archive" + ) yield fee_sync_stop_event.set() pairing_sync_stop_event.set() @@ -170,6 +179,7 @@ def create_app(settings: Settings) -> FastAPI: await kraken_client.close() await graceful_shutdown(app) await app.state.store.stop() + await get_db_sink().stop_consumer() app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan) app.state.settings = settings diff --git a/src/arbitrade/api/routes.py b/src/arbitrade/api/routes.py index a7fcaae..c5b048a 100644 --- a/src/arbitrade/api/routes.py +++ b/src/arbitrade/api/routes.py @@ -27,6 +27,7 @@ from arbitrade.storage.repositories import ( BacktestJobRepository, ConfigPairingRepository, KrakenAccountSnapshotRepository, + LogRepository, ) router = APIRouter(dependencies=[Depends(require_dashboard_auth)]) @@ -988,6 +989,15 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse: ) fee_rate = await _fee_rate_for_profile(defaults["fee_profile"], custom_fee_rate, request=request) + # Use enabled pairings from DB when none selected + symbols_str = defaults["symbols"] + if not symbols_str.strip(): + pairing_repo = ConfigPairingRepository(request.app.state.store) + enabled = await pairing_repo.list_pairings(enabled_only=True) + symbols_str = ",".join( + f"{p.base_asset}/{p.quote_asset}" for p in enabled + ) + config_dict: dict[str, object] = { "source": defaults["source"], "starting_balances": defaults["starting_balances"], @@ -999,12 +1009,12 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse: "execution_latency_ms": float(defaults["execution_latency_ms"]), "start_time": defaults["start_time"], "end_time": defaults["end_time"], - "symbols": defaults["symbols"], + "symbols": symbols_str, } store = request.app.state.store repo = BacktestJobRepository(store) - events_label = defaults["symbols"] if defaults["symbols"] else "DB-sourced" + events_label = symbols_str if symbols_str else "DB-sourced" job = await repo.create_job(events_label, config_dict) msg_job = job.id[:8] if job.id else "unknown" diff --git a/src/arbitrade/logging/db_sink.py b/src/arbitrade/logging/db_sink.py new file mode 100644 index 0000000..6ad71d9 --- /dev/null +++ b/src/arbitrade/logging/db_sink.py @@ -0,0 +1,123 @@ +"""DB sink — writes structlog events to app_logs table via background queue.""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime +from typing import Any + +import structlog + +from arbitrade.storage.pg_store import PgStore +from arbitrade.storage.repositories import LogRecord, LogRepository + +_LOG = structlog.get_logger(__name__) + + +class DbSinkProcessor: + """structlog processor that queues log events for DB writes. + + Must be registered in the structlog processor chain. The consumer + task must be started on app init via ``start_consumer(store)``. + """ + + def __init__(self) -> None: + self._queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=2000) + self._consumer_task: asyncio.Task[None] | None = None + + def __call__(self, logger: Any, method_name: str, event_dict: dict[str, Any]) -> dict[str, Any]: + """Processor — called for every structlog event. Non-blocking.""" + try: + self._queue.put_nowait(dict(event_dict)) + except asyncio.QueueFull: + pass # drop event if queue full, avoid backpressure + return event_dict + + def start_consumer(self, store: PgStore) -> None: + """Start background consumer task.""" + if self._consumer_task is not None and not self._consumer_task.done(): + return + self._consumer_task = asyncio.create_task( + self._consume(store), name="log_db_sink" + ) + + async def stop_consumer(self) -> None: + """Drain queue and cancel consumer.""" + if self._consumer_task is None: + return + self._consumer_task.cancel() + try: + await self._consumer_task + except asyncio.CancelledError: + pass + self._consumer_task = None + # Flush remaining + await self._flush(store=None) + + async def _consume(self, store: PgStore) -> None: + repo = LogRepository(store) + while True: + try: + event = await self._queue.get() + await self._write_one(repo, event) + except asyncio.CancelledError: + break + except Exception: + pass # swallow consumer errors, never crash + # Final flush + await self._flush(repo) + + async def _write_one(self, repo: LogRepository, event: dict[str, Any]) -> None: + recorded_at = event.pop("timestamp", None) + if isinstance(recorded_at, str): + try: + recorded_at = datetime.fromisoformat(recorded_at) + except ValueError: + recorded_at = datetime.now(UTC) + elif not isinstance(recorded_at, datetime): + recorded_at = datetime.now(UTC) + + level = str(event.pop("level", "info")).upper() + logger = str(event.pop("logger", "root")) + message = str(event.pop("event", event.pop("message", ""))) + context = {k: v for k, v in event.items() if not k.startswith("_")} if event else None + + record = LogRecord( + recorded_at=recorded_at, + level=level, + logger=logger, + message=message, + context=context if context else None, + ) + try: + await repo.insert(record) + except Exception: + pass # never crash from DB write failure + + async def _flush(self, repo: LogRepository | None) -> None: + drained = 0 + while not self._queue.empty() and drained < 500: + try: + event = self._queue.get_nowait() + if repo is not None: + await self._write_one(repo, event) + drained += 1 + except asyncio.QueueEmpty: + break + except Exception: + pass + + +# Module-level singleton +_db_sink = DbSinkProcessor() + + +def get_db_sink() -> DbSinkProcessor: + return _db_sink + + +def db_sink_processor( + logger: Any, method_name: str, event_dict: dict[str, Any] +) -> dict[str, Any]: + """Standalone processor function wrapping the singleton.""" + return _db_sink(logger, method_name, event_dict) \ No newline at end of file diff --git a/src/arbitrade/logging/maintenance.py b/src/arbitrade/logging/maintenance.py new file mode 100644 index 0000000..6ef3726 --- /dev/null +++ b/src/arbitrade/logging/maintenance.py @@ -0,0 +1,61 @@ +"""Log maintenance — aggregation and archiving tasks.""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime, timedelta + +import structlog + +from arbitrade.storage.pg_store import PgStore +from arbitrade.storage.repositories import LogAggregationRepository, LogArchiveRepository + +_LOG = structlog.get_logger(__name__) + +_AGGREGATE_INTERVAL = 3600 # 1 hour +_ARCHIVE_INTERVAL = 86400 # 1 day +_RETENTION_DAYS = 30 + + +async def run_log_aggregation(store: PgStore) -> None: + """Aggregate log counts for the last 2 hours across all periods.""" + repo = LogAggregationRepository(store) + since = datetime.now(UTC) - timedelta(hours=2) + periods = ["1h", "3h", "6h", "1d", "1w", "1mo"] + for period in periods: + try: + await repo.aggregate_since(since, period) + except Exception: + _LOG.exception("log_aggregation_failed", period=period) + _LOG.info("log_aggregation_complete", since=since.isoformat()) + + +async def run_log_archive(store: PgStore, retention_days: int = _RETENTION_DAYS) -> int: + """Archive log entries older than retention_days.""" + cutoff = datetime.now(UTC) - timedelta(days=retention_days) + repo = LogArchiveRepository(store) + count = await repo.archive_before(cutoff) + if count > 0: + _LOG.info("log_archive_complete", + cutoff=cutoff.isoformat(), archived=count) + return count + + +async def run_log_aggregation_loop(store: PgStore) -> None: + """Periodic aggregation loop.""" + while True: + try: + await run_log_aggregation(store) + except Exception: + _LOG.exception("log_aggregation_loop_error") + await asyncio.sleep(_AGGREGATE_INTERVAL) + + +async def run_log_archive_loop(store: PgStore) -> None: + """Periodic archive loop.""" + while True: + try: + await run_log_archive(store) + except Exception: + _LOG.exception("log_archive_loop_error") + await asyncio.sleep(_ARCHIVE_INTERVAL) diff --git a/src/arbitrade/logging_setup.py b/src/arbitrade/logging_setup.py index 000f9da..bc44e7a 100644 --- a/src/arbitrade/logging_setup.py +++ b/src/arbitrade/logging_setup.py @@ -6,6 +6,8 @@ from typing import Any import structlog +from arbitrade.logging.db_sink import db_sink_processor + def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None: level = getattr(logging, log_level.upper(), logging.INFO) @@ -17,6 +19,7 @@ def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None: structlog.stdlib.add_log_level, structlog.stdlib.add_logger_name, timestamper, + db_sink_processor, ] if json_logs: diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index ad37aa6..b72ea36 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from datetime import datetime +from datetime import UTC, datetime, timedelta from typing import Any import orjson @@ -1009,3 +1009,237 @@ class BacktestJobRepository: elif isinstance(result, str): return result != "DELETE 0" return False + + +@dataclass(slots=True) +class LogRecord: + recorded_at: datetime + level: str + logger: str + message: str + context: dict[str, Any] | None = None + + +@dataclass(slots=True) +class LogAggregateRecord: + bucket_start: datetime + period: str + level: str + count: int + + +class LogRepository: + def __init__(self, store: PgStore) -> None: + self._store = store + + async def insert(self, record: LogRecord) -> None: + async with self._store.pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO app_logs (recorded_at, level, logger, message, context) + VALUES ($1, $2, $3, $4, $5) + """, + record.recorded_at, + record.level, + record.logger, + record.message, + orjson.dumps(record.context).decode("utf-8") if record.context else None, + ) + + async def query( + self, + *, + level: str | None = None, + before: datetime | None = None, + after: datetime | None = None, + limit: int = 50, + offset: int = 0, + ) -> list[LogRecord]: + async with self._store.pool.acquire() as conn: + conditions: list[str] = [] + params: list[Any] = [] + idx = 1 + if level: + conditions.append(f"level = ${idx}") + params.append(level.upper()) + idx += 1 + if before: + conditions.append(f"recorded_at < ${idx}") + params.append(before) + idx += 1 + if after: + conditions.append(f"recorded_at >= ${idx}") + params.append(after) + idx += 1 + where = "" + if conditions: + where = "WHERE " + " AND ".join(conditions) + rows = await conn.fetch( + f""" + SELECT recorded_at, level, logger, message, context + FROM app_logs + {where} + ORDER BY recorded_at DESC + LIMIT ${idx} OFFSET ${idx + 1} + """, + *params, limit, offset, + ) + return [ + LogRecord( + recorded_at=r["recorded_at"], + level=r["level"], + logger=r["logger"], + message=r["message"], + context=r["context"], + ) + for r in rows + ] + + async def count(self, level: str | None = None) -> int: + async with self._store.pool.acquire() as conn: + if level: + row = await conn.fetchrow( + "SELECT COUNT(*) as cnt FROM app_logs WHERE level = $1", level.upper() + ) + else: + row = await conn.fetchrow("SELECT COUNT(*) as cnt FROM app_logs") + return row["cnt"] if row else 0 + + async def count_filtered( + self, + *, + level: str | None = None, + before: datetime | None = None, + after: datetime | None = None, + ) -> int: + async with self._store.pool.acquire() as conn: + conditions: list[str] = [] + params: list[Any] = [] + idx = 1 + if level: + conditions.append(f"level = ${idx}") + params.append(level.upper()) + idx += 1 + if before: + conditions.append(f"recorded_at < ${idx}") + params.append(before) + idx += 1 + if after: + conditions.append(f"recorded_at >= ${idx}") + params.append(after) + idx += 1 + where = "" + if conditions: + where = "WHERE " + " AND ".join(conditions) + row = await conn.fetchrow(f"SELECT COUNT(*) as cnt FROM app_logs {where}", *params) + return row["cnt"] if row else 0 + + +class LogArchiveRepository: + def __init__(self, store: PgStore) -> None: + self._store = store + + async def archive_before(self, cutoff: datetime) -> int: + """Move rows older than cutoff from app_logs to app_log_archives.""" + async with self._store.pool.acquire() as conn: + # Insert into archive + result = await conn.execute( + """ + INSERT INTO app_log_archives (id, recorded_at, level, logger, message, context) + SELECT id, recorded_at, level, logger, message, context + FROM app_logs + WHERE recorded_at < $1 + """, + cutoff, + ) + # Delete originals + await conn.execute( + "DELETE FROM app_logs WHERE recorded_at < $1", cutoff + ) + if isinstance(result, str): + parts = result.split() + if len(parts) == 2 and parts[0] == "INSERT": + return int(parts[1]) + return 0 + + +class LogAggregationRepository: + def __init__(self, store: PgStore) -> None: + self._store = store + + async def aggregate_since(self, since: datetime, period: str) -> None: + """Aggregate log counts per level for entries >= since, grouped by period.""" + period_map = { + "1h": "date_trunc('hour', recorded_at)", + "3h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 3)", + "6h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 6)", + "1d": "date_trunc('day', recorded_at)", + "1w": "date_trunc('week', recorded_at)", + "1mo": "date_trunc('month', recorded_at)", + } + bucket_expr = period_map.get(period) + if bucket_expr is None: + raise ValueError(f"Unknown period: {period}") + + async with self._store.pool.acquire() as conn: + rows = await conn.fetch( + f""" + SELECT {bucket_expr} AS bucket_start, level, COUNT(*) AS cnt + FROM app_logs + WHERE recorded_at >= $1 + GROUP BY bucket_start, level + """, + since, + ) + for row in rows: + await conn.execute( + """ + INSERT INTO app_log_aggregates (bucket_start, period, level, count) + VALUES ($1, $2, $3, $4) + ON CONFLICT (bucket_start, period, level) + DO UPDATE SET count = EXCLUDED.count + """, + row["bucket_start"], + period, + str(row["level"]), + row["cnt"], + ) + + async def query_aggregates( + self, + period: str, + level: str | None = None, + limit: int = 50, + ) -> list[LogAggregateRecord]: + async with self._store.pool.acquire() as conn: + if level: + rows = await conn.fetch( + """ + SELECT bucket_start, period, level, count + FROM app_log_aggregates + WHERE period = $1 AND level = $2 + ORDER BY bucket_start DESC + LIMIT $3 + """, + period, level.upper(), limit, + ) + else: + rows = await conn.fetch( + """ + SELECT bucket_start, period, level, count + FROM app_log_aggregates + WHERE period = $1 + ORDER BY bucket_start DESC + LIMIT $2 + """, + period, limit, + ) + return [ + LogAggregateRecord( + bucket_start=r["bucket_start"], + period=r["period"], + level=r["level"], + count=r["count"], + ) + for r in rows + ] diff --git a/src/arbitrade/storage/schema_pg.sql b/src/arbitrade/storage/schema_pg.sql index 3ea1f23..65fc20e 100644 --- a/src/arbitrade/storage/schema_pg.sql +++ b/src/arbitrade/storage/schema_pg.sql @@ -188,4 +188,39 @@ ALTER TABLE market_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPT ALTER TABLE kraken_account_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC'; ALTER TABLE backtest_jobs ALTER COLUMN created_at TYPE TIMESTAMPTZ USING created_at AT TIME ZONE 'UTC'; ALTER TABLE backtest_jobs ALTER COLUMN started_at TYPE TIMESTAMPTZ USING started_at AT TIME ZONE 'UTC'; -ALTER TABLE backtest_jobs ALTER COLUMN finished_at TYPE TIMESTAMPTZ USING finished_at AT TIME ZONE 'UTC'; \ No newline at end of file +ALTER TABLE backtest_jobs ALTER COLUMN finished_at TYPE TIMESTAMPTZ USING finished_at AT TIME ZONE 'UTC'; + +-- ======================================== +-- Logging tables +-- ======================================== +CREATE TABLE IF NOT EXISTS app_logs ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + recorded_at TIMESTAMPTZ NOT NULL, + level VARCHAR NOT NULL, + logger VARCHAR NOT NULL, + message TEXT NOT NULL, + context JSONB +); +CREATE INDEX IF NOT EXISTS idx_app_logs_recorded_at ON app_logs (recorded_at DESC); +CREATE INDEX IF NOT EXISTS idx_app_logs_level ON app_logs (level); + +CREATE TABLE IF NOT EXISTS app_log_archives ( + id UUID PRIMARY KEY, + recorded_at TIMESTAMPTZ NOT NULL, + level VARCHAR NOT NULL, + logger VARCHAR NOT NULL, + message TEXT NOT NULL, + context JSONB, + archived_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); +CREATE INDEX IF NOT EXISTS idx_app_log_archives_recorded_at ON app_log_archives (recorded_at DESC); + +CREATE TABLE IF NOT EXISTS app_log_aggregates ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + bucket_start TIMESTAMPTZ NOT NULL, + period VARCHAR NOT NULL, + level VARCHAR NOT NULL, + count INTEGER NOT NULL DEFAULT 0, + UNIQUE (bucket_start, period, level) +); +CREATE INDEX IF NOT EXISTS idx_app_log_aggregates_bucket ON app_log_aggregates (bucket_start DESC, period); \ No newline at end of file diff --git a/src/arbitrade/web/templates/partials/backtesting_panel.html b/src/arbitrade/web/templates/partials/backtesting_panel.html index 0fa79aa..5d285b9 100644 --- a/src/arbitrade/web/templates/partials/backtesting_panel.html +++ b/src/arbitrade/web/templates/partials/backtesting_panel.html @@ -52,17 +52,12 @@ hx-swap="outerHTML" > - + +