Compare commits

..

10 Commits

Author SHA1 Message Date
zwitschi e4f5d8dfcc refactor: clean up imports and improve code formatting across multiple files
CI / lint-test-build (push) Successful in 2m21s
2026-06-09 10:02:41 +02:00
zwitschi 403daa6cf1 Refactor log aggregation periods and improve code formatting
- Removed the "3h" and "6h" periods from the log aggregation process in maintenance.py to streamline log counts.
- Enhanced code readability by adjusting line breaks and indentation in repositories.py for better clarity.
2026-06-09 09:41:23 +02:00
zwitschi dc99f1604e Refactor code for improved readability and consistency
CI / lint-test-build (push) Successful in 54s
- Cleaned up multiline statements and removed unnecessary line breaks in various files.
- Ensured consistent formatting in function definitions and calls across the codebase.
- Updated docstrings and comments for clarity where applicable.
- Removed trailing newlines in module docstrings.
- Enhanced logging statements for better clarity in maintenance tasks.
2026-06-07 21:59:09 +02:00
zwitschi f221464daa feat: enhance backtesting panel with flash messages and pairing checks
CI / lint-test-build (push) Failing after 12s
2026-06-07 21:51:09 +02:00
zwitschi 5e7732b85f feat: add flash message support to configuration panel and improve layout
CI / lint-test-build (push) Successful in 52s
2026-06-07 19:57:42 +02:00
zwitschi 77dfb08b23 feat: update package data inclusion to add dashboard templates and reorder partials
CI / lint-test-build (push) Successful in 1m6s
2026-06-07 19:35:05 +02:00
zwitschi 9acabddb7e feat: include config HTML templates in package data
CI / lint-test-build (push) Successful in 2m23s
2026-06-07 18:46:12 +02:00
zwitschi 2fbc78f7a9 feat: add logging package with DB sink and maintenance tasks
CI / lint-test-build (push) Successful in 2m24s
2026-06-07 18:31:27 +02:00
zwitschi f58634d438 feat: add storage schema SQL file to package data inclusion
CI / lint-test-build (push) Successful in 2m31s
2026-06-07 18:29:16 +02:00
zwitschi e44876c7c7 refactor: clean up imports and improve code formatting in various modules
CI / lint-test-build (push) Successful in 49s
2026-06-07 18:17:45 +02:00
33 changed files with 1951 additions and 1480 deletions
+10 -7
View File
@@ -53,7 +53,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
- `detection/` - triangular graph and incremental detector.
- `risk/` - pre-trade and trade-limit guards.
- `execution/` - multi-leg trade sequencing.
- `backtesting/` - replay engine, parameter sweep, experiment scaffolds.
- `backtesting/` - replay engine, parameter sweep, experiment scaffolds. See [backtesting.md](backtesting.md).
- `strategy/` - experimental strategy modules such as stat-arb.
- `storage/` - PostgreSQL schema and repositories.
- `alerting/` - multi-channel notifications.
@@ -77,7 +77,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
3. Incremental detector scores impacted cycles.
4. Risk manager validates the opportunity.
5. Execution sequencer places legs if approved.
7. Trades and snapshots persist to PostgreSQL.
6. Trades and snapshots persist to PostgreSQL.
7. Dashboard and alerts reflect state changes.
### 6.2 Dashboard Control Flow
@@ -89,11 +89,14 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
### 6.3 Backtesting Flow
1. User selects JSONL replay file and run parameters.
2. Replay engine loads ordered book events.
3. Detector, risk, and execution logic run in simulation mode.
4. Report is stored in memory for recent UI display.
5. Parameter sweeps split data into train/test windows, rank results, and flag overfit.
See [backtesting.md](backtesting.md) for full design and implementation details.
1. User picks currency pairs (from config/pairings page, or all enabled).
2. User sets starting balances (required), time range (required), min profit threshold (required).
3. Fee profile defaults to "api (from Kraken)"; slippage (4.0 bps) and execution latency (20 ms) are optional with sensible defaults.
4. Job is queued via `POST /dashboard/backtesting/run`.
5. Backend loads events from `market_snapshots` table, builds triangular cycles, runs replay engine.
6. Report stored in `backtest_jobs` table, visible in recent jobs list.
## 7. Deployment View
+130
View File
@@ -0,0 +1,130 @@
# Backtesting Architecture
> Detailed design and implementation of the backtesting subsystem.
> See [`README.md`](README.md#63-backtesting-flow) for the high-level user flow.
## Data Flow
```txt
market_snapshots (DB) ─┐
├──→ load_replay_events_from_db() ──→ list[ReplayBookEvent]
JSONL file ─────────────┘
BacktestReplayEngine.run()
BacktestReport
BacktestJobRepository.store_report()
```
Two event sources:
- **DB mode** (default) — loads snapshots from `market_snapshots` table. Supports symbol/time filtering.
- **File mode** — reads JSONL files from disk (legacy, used by `backtest_replay.py` script).
## Core Types
### `ReplayClock`
Timekeeper for simulation. Ensures events advance monotonically. Supports `advance_ms()` to model execution latency.
### `ReplayBookEvent`
One atomic book state at a point in time. Fields: `occurred_at`, `symbol`, `bids: tuple[BookLevel]`, `asks: tuple[BookLevel]`.
### `BacktestConfig`
| Field | Default | Description |
| ------------------------ | -------- | ----------------------------------------------------- |
| `fee_rate` | `0.0` | 0.0 → API-sourced fee from `kraken_account_snapshots` |
| `min_profit_threshold` | `0.0005` | Minimum net profit to attempt trade |
| `trade_capital` | `100.0` | Capital allocated per trade |
| `quote_asset` | `"USD"` | Base currency for P&L |
| `slippage_bps` | `4.0` | Simulated slippage in basis points |
| `execution_latency_ms` | `20.0` | Simulated latency per leg |
| `max_depth_levels` | `10` | Order book depth for detection |
| `max_concurrent_trades` | `1` | Max simultaneous trades |
| `min_order_size_by_pair` | `None` | Per-pair min order size overrides |
### `BacktestReport`
| Field | Type | Description |
| -------------------------------- | -------------- | ---------------------------------- |
| `started_at` / `finished_at` | datetime | Simulation window |
| `processed_events` | int | Events consumed |
| `opportunities_seen` | int | Detected opportunities |
| `trades_executed` | int | Successful trades |
| `win_rate` | float or None | Fraction of profitable trades |
| `fill_rate` | float or None | Average fill ratio |
| `realized_pnl_usd` | float | Net P&L after slippage |
| `max_drawdown_usd` | float | Peak-to-trough equity drop |
| `miss_reasons` | dict[str, int] | Counters for skipped opportunities |
| `execution_latency_p50/95/99_ms` | float or None | Latency percentiles |
## Simulation Client
`_SimulatedRestClient` replaces the real Kraken REST client during backtesting.
- **Slippage model:** `fill_ratio = max(0.85, 1.0 - (slippage_bps / 10000.0) * 8.0)`
- **Latency model:** Clock advances by `execution_latency_ms` before each simulated fill
- Orders always fill (status = `"closed"`) at the modeled ratio
## Job Worker
`backtest_worker` is an `asyncio.Task` started in `create_app()` lifespan:
```python
backtest_task = asyncio.create_task(
backtest_worker(backtest_queue, db),
name="backtest_worker",
)
```
Workflow per job:
1. Dequeue `(job_id, config_dict)` from `asyncio.Queue`
2. Update status → `"running"` in `backtest_jobs` table
3. Load events (DB or file)
4. Build currency graph → triangular cycles
5. Instantiate `BacktestReplayEngine``engine.run()`
6. Store report → update status → `"completed"` (or `"failed"` on exception)
## Sweep Pipeline
`run_parameter_search` performs grid search over backtest parameters:
1. **Split** events into train/test windows by time ratio
2. **Build grid** — cartesian product of `theta_values × trade_capital_values × pair_universes × staleness_threshold_values`
3. **For each parameter set:**
- Filter events to pair universe + apply staleness gate
- Build cycles restricted to pair universe
- Run engine on train window → `train_report`
- Run engine on test window → `test_report`
- Score = `realized_pnl + win_rate_bonus + fill_rate_bonus - max_drawdown`
- Compute generalization gap = `|train_score - test_score| / max(train_score, test_score)`
4. **Evaluate promotion:**
- `PromotionCriteria` checks: min test P&L, min win rate ≥ 0.5, min fill rate ≥ 0.9, max drawdown ≤ $25, generalization gap ≤ 0.5
- Results passing all criteria are flagged `promotion_ready`
## UI
> See `backtesting.html` → `partials/backtesting_panel.html`.
- **Shell page** loads the panel via `hx-get="/dashboard/fragment/backtesting"`
- **Run form** — starting balances, time range, profit threshold (required); fee profile, slippage, latency (advanced/collapsible)
- **Status card** — current job status + message
- **Recent jobs table** — lists last 20 jobs with status, events, trades, P&L; each row has a detail button
- **Job detail** — `GET /dashboard/backtesting/job/{id}` returns report HTML
Pairings are managed on the `/dashboard/config/pairings` page. Backtest uses DB-enabled pairings by default when no symbols are specified.
## Source Files
| File | Role |
| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `backtesting/replay.py` | `ReplayClock`, `ReplayBookEvent`, `BacktestConfig`, `BacktestReport`, `_SimulatedRestClient`, `BacktestReplayEngine`, `load_replay_events`, `load_replay_events_from_db` |
| `backtesting/runner.py` | `run_backtest_job`, `backtest_worker`, `_build_cycles_from_events`, `_parse_balances` |
| `backtesting/sweep.py` | `SweepParameters`, `SweepResult`, `SweepArtifacts`, `PromotionCriteria`, `split_events_time_windows`, `build_parameter_grid`, `run_parameter_search`, `persist_sweep_results` |
+3
View File
@@ -27,7 +27,10 @@ include-package-data = true
[tool.setuptools.package-data]
arbitrade = [
"web/templates/*.html",
"web/templates/config/*.html",
"web/templates/dashboard/*.html",
"web/templates/partials/*.html",
"storage/schema_pg.sql",
]
[tool.setuptools.packages.find]
+1 -2
View File
@@ -67,8 +67,7 @@ async def _seed_dataset(store: PgStore) -> None:
opportunity_rows: list[tuple[object, ...]] = []
for i in range(5000):
detected_at = now + timedelta(milliseconds=200 * i)
opportunity_rows.append(
(detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2)))
opportunity_rows.append((detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2)))
order_rows: list[tuple[object, ...]] = []
for i in range(3500):
File diff suppressed because it is too large Load Diff
+38 -76
View File
@@ -31,41 +31,26 @@ class Settings(BaseSettings):
)
alerts_enabled: bool = Field(default=True, alias="ALERTS_ENABLED")
alert_min_severity: str = Field(
default="warning", alias="ALERT_MIN_SEVERITY")
alert_dedup_seconds: float = Field(
default=30.0, alias="ALERT_DEDUP_SECONDS")
alert_on_trade_events: bool = Field(
default=True, alias="ALERT_ON_TRADE_EVENTS")
alert_on_error_events: bool = Field(
default=True, alias="ALERT_ON_ERROR_EVENTS")
alert_on_threshold_events: bool = Field(
default=True, alias="ALERT_ON_THRESHOLD_EVENTS")
alert_on_system_events: bool = Field(
default=True, alias="ALERT_ON_SYSTEM_EVENTS")
alert_min_severity: str = Field(default="warning", alias="ALERT_MIN_SEVERITY")
alert_dedup_seconds: float = Field(default=30.0, alias="ALERT_DEDUP_SECONDS")
alert_on_trade_events: bool = Field(default=True, alias="ALERT_ON_TRADE_EVENTS")
alert_on_error_events: bool = Field(default=True, alias="ALERT_ON_ERROR_EVENTS")
alert_on_threshold_events: bool = Field(default=True, alias="ALERT_ON_THRESHOLD_EVENTS")
alert_on_system_events: bool = Field(default=True, alias="ALERT_ON_SYSTEM_EVENTS")
telegram_alerts_enabled: bool = Field(
default=False, alias="TELEGRAM_ALERTS_ENABLED")
telegram_bot_token: str | None = Field(
default=None, alias="TELEGRAM_BOT_TOKEN")
telegram_chat_id: str | None = Field(
default=None, alias="TELEGRAM_CHAT_ID")
telegram_alerts_enabled: bool = Field(default=False, alias="TELEGRAM_ALERTS_ENABLED")
telegram_bot_token: str | None = Field(default=None, alias="TELEGRAM_BOT_TOKEN")
telegram_chat_id: str | None = Field(default=None, alias="TELEGRAM_CHAT_ID")
discord_alerts_enabled: bool = Field(
default=False, alias="DISCORD_ALERTS_ENABLED")
discord_webhook_url: str | None = Field(
default=None, alias="DISCORD_WEBHOOK_URL")
discord_alerts_enabled: bool = Field(default=False, alias="DISCORD_ALERTS_ENABLED")
discord_webhook_url: str | None = Field(default=None, alias="DISCORD_WEBHOOK_URL")
email_alerts_enabled: bool = Field(
default=False, alias="EMAIL_ALERTS_ENABLED")
email_alerts_enabled: bool = Field(default=False, alias="EMAIL_ALERTS_ENABLED")
email_smtp_host: str | None = Field(default=None, alias="EMAIL_SMTP_HOST")
email_smtp_port: int = Field(default=587, alias="EMAIL_SMTP_PORT")
email_smtp_username: str | None = Field(
default=None, alias="EMAIL_SMTP_USERNAME")
email_smtp_password: str | None = Field(
default=None, alias="EMAIL_SMTP_PASSWORD")
email_alert_from: str | None = Field(
default=None, alias="EMAIL_ALERT_FROM")
email_smtp_username: str | None = Field(default=None, alias="EMAIL_SMTP_USERNAME")
email_smtp_password: str | None = Field(default=None, alias="EMAIL_SMTP_PASSWORD")
email_alert_from: str | None = Field(default=None, alias="EMAIL_ALERT_FROM")
email_alert_to: str | None = Field(default=None, alias="EMAIL_ALERT_TO")
email_smtp_use_tls: bool = Field(default=True, alias="EMAIL_SMTP_USE_TLS")
@@ -78,31 +63,24 @@ class Settings(BaseSettings):
pg_min_connections: int = Field(default=2, alias="PG_MIN_CONNECTIONS")
pg_max_connections: int = Field(default=10, alias="PG_MAX_CONNECTIONS")
kraken_rest_url: str = Field(
default="https://api.kraken.com", alias="KRAKEN_REST_URL")
kraken_ws_url: str = Field(
default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL")
kraken_rest_url: str = Field(default="https://api.kraken.com", alias="KRAKEN_REST_URL")
kraken_ws_url: str = Field(default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL")
kraken_private_rate_limit_seconds: float = Field(
default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS"
)
kraken_http_timeout_seconds: float = Field(
default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS")
kraken_retry_attempts: int = Field(
default=3, alias="KRAKEN_RETRY_ATTEMPTS")
kraken_http_timeout_seconds: float = Field(default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS")
kraken_retry_attempts: int = Field(default=3, alias="KRAKEN_RETRY_ATTEMPTS")
kraken_retry_base_delay_seconds: float = Field(
default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS"
)
kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY")
kraken_api_secret: str | None = Field(
default=None, alias="KRAKEN_API_SECRET")
kraken_api_secret: str | None = Field(default=None, alias="KRAKEN_API_SECRET")
kraken_api_key_permissions: str = Field(
default="query,trade",
alias="KRAKEN_API_KEY_PERMISSIONS",
)
ws_heartbeat_timeout_seconds: float = Field(
default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS")
ws_max_staleness_seconds: float = Field(
default=5.0, alias="WS_MAX_STALENESS_SECONDS")
ws_heartbeat_timeout_seconds: float = Field(default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS")
ws_max_staleness_seconds: float = Field(default=5.0, alias="WS_MAX_STALENESS_SECONDS")
strategy_enable_stat_arb_experiment: bool = Field(
default=False,
alias="STRATEGY_ENABLE_STAT_ARB_EXPERIMENT",
@@ -125,29 +103,20 @@ class Settings(BaseSettings):
)
paper_trading_mode: bool = Field(default=True, alias="PAPER_TRADING_MODE")
trade_capital_usd: float = Field(default=100.0, alias="TRADE_CAPITAL_USD")
max_trade_capital_usd: float = Field(
default=100.0, alias="MAX_TRADE_CAPITAL_USD")
max_concurrent_trades: int | None = Field(
default=None, alias="MAX_CONCURRENT_TRADES")
max_trade_capital_usd: float = Field(default=100.0, alias="MAX_TRADE_CAPITAL_USD")
max_concurrent_trades: int | None = Field(default=None, alias="MAX_CONCURRENT_TRADES")
max_exposure_per_asset_usd: float | None = Field(
default=None,
alias="MAX_EXPOSURE_PER_ASSET_USD",
)
quote_balance_asset: str = Field(
default="USD", alias="QUOTE_BALANCE_ASSET")
min_order_size_usd: float | None = Field(
default=None, alias="MIN_ORDER_SIZE_USD")
quote_balance_asset: str = Field(default="USD", alias="QUOTE_BALANCE_ASSET")
min_order_size_usd: float | None = Field(default=None, alias="MIN_ORDER_SIZE_USD")
kill_switch_active: bool = Field(default=False, alias="KILL_SWITCH_ACTIVE")
daily_loss_limit_usd: float | None = Field(
default=None, alias="DAILY_LOSS_LIMIT_USD")
cumulative_loss_limit_usd: float | None = Field(
default=None, alias="CUMULATIVE_LOSS_LIMIT_USD")
max_source_latency_ms: float | None = Field(
default=None, alias="MAX_SOURCE_LATENCY_MS")
max_apply_latency_ms: float | None = Field(
default=None, alias="MAX_APPLY_LATENCY_MS")
max_consecutive_failures: int | None = Field(
default=None, alias="MAX_CONSECUTIVE_FAILURES")
daily_loss_limit_usd: float | None = Field(default=None, alias="DAILY_LOSS_LIMIT_USD")
cumulative_loss_limit_usd: float | None = Field(default=None, alias="CUMULATIVE_LOSS_LIMIT_USD")
max_source_latency_ms: float | None = Field(default=None, alias="MAX_SOURCE_LATENCY_MS")
max_apply_latency_ms: float | None = Field(default=None, alias="MAX_APPLY_LATENCY_MS")
max_consecutive_failures: int | None = Field(default=None, alias="MAX_CONSECUTIVE_FAILURES")
fernet_key: str | None = Field(default=None, alias="FERNET_KEY")
@@ -164,8 +133,7 @@ class Settings(BaseSettings):
def _validate_log_level(cls, value: str) -> str:
normalized = value.strip().upper()
if normalized not in {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}:
raise ValueError(
"LOG_LEVEL must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL")
raise ValueError("LOG_LEVEL must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL")
return normalized
@field_validator("alert_min_severity")
@@ -173,19 +141,16 @@ class Settings(BaseSettings):
def _validate_alert_severity(cls, value: str) -> str:
normalized = value.strip().lower()
if normalized not in {"info", "warning", "error", "critical"}:
raise ValueError(
"ALERT_MIN_SEVERITY must be one of: info, warning, error, critical")
raise ValueError("ALERT_MIN_SEVERITY must be one of: info, warning, error, critical")
return normalized
@model_validator(mode="after")
def _validate_security_constraints(self) -> Settings:
if bool(self.dashboard_auth_username) ^ bool(self.dashboard_auth_password):
raise ValueError(
"dashboard auth requires both username and password")
raise ValueError("dashboard auth requires both username and password")
if bool(self.kraken_api_key) ^ bool(self.kraken_api_secret):
raise ValueError(
"Kraken API auth requires both API key and secret")
raise ValueError("Kraken API auth requires both API key and secret")
permissions = {
token.strip().lower()
@@ -193,11 +158,9 @@ class Settings(BaseSettings):
if token.strip()
}
if permissions and ("query" not in permissions or "trade" not in permissions):
raise ValueError(
"KRAKEN_API_KEY_PERMISSIONS must include query and trade")
raise ValueError("KRAKEN_API_KEY_PERMISSIONS must include query and trade")
if "withdraw" in permissions or "withdrawals" in permissions:
raise ValueError(
"KRAKEN_API_KEY_PERMISSIONS must not include withdrawal scope")
raise ValueError("KRAKEN_API_KEY_PERMISSIONS must not include withdrawal scope")
if self.alert_dedup_seconds < 0.0:
raise ValueError("ALERT_DEDUP_SECONDS must be >= 0")
@@ -213,8 +176,7 @@ class Settings(BaseSettings):
"STRATEGY_STAT_ARB_ENTRY_ZSCORE must be greater than STRATEGY_STAT_ARB_EXIT_ZSCORE"
)
if self.strategy_stat_arb_max_holding_seconds <= 0.0:
raise ValueError(
"STRATEGY_STAT_ARB_MAX_HOLDING_SECONDS must be > 0")
raise ValueError("STRATEGY_STAT_ARB_MAX_HOLDING_SECONDS must be > 0")
return self
+1
View File
@@ -0,0 +1 @@
"""Dashboard module for monitoring and controlling the arbitrage bot."""
+63
View File
@@ -0,0 +1,63 @@
from __future__ import annotations
from fastapi import Request
from arbitrade.storage.repositories import (
BacktestJobRepository,
)
async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
repo = BacktestJobRepository(request.app.state.store)
jobs = await repo.list_jobs(limit=5)
reports = []
for job in jobs:
report: dict[str, object] = {
"id": str(job.id),
"status": job.status,
}
if job.created_at is not None:
report["created_at"] = job.created_at.isoformat()
if job.finished_at is not None:
report["finished_at"] = job.finished_at.isoformat()
reports.append(report)
return reports
async def _backtesting_panel_context(
request: Request,
*,
status: str = "idle",
message: str = "Configure a replay run and execute backtest.",
latest_report: dict[str, object] | None = None,
defaults: dict[str, str] | None = None,
) -> dict[str, object]:
default_values = {
"symbols": "",
"start_time": "",
"end_time": "",
"starting_balances": "USD=1000.0",
"trade_capital": "100.0",
"min_profit_threshold": "0.0005",
"fee_profile": "api",
"custom_fee_rate": "",
"slippage_bps": "4.0",
"execution_latency_ms": "20.0",
}
if defaults is not None:
default_values.update(defaults)
reports = await _recent_backtest_reports(request)
latest = latest_report or (reports[0] if reports else None)
return {
"status": status,
"message": message,
"flash_message": "",
"no_enabled_pairings": False,
"latest_report": latest,
"recent_reports": reports,
"run_endpoint": "/dashboard/backtesting/run",
"reports_endpoint": "/dashboard/api/backtesting/reports",
**default_values,
}
File diff suppressed because it is too large Load Diff
+7 -14
View File
@@ -42,12 +42,9 @@ async def fetch_and_store_account_snapshot(
_LOG.exception("trade_balance_fetch_failed")
return None
fee_tier = volume_data.get("fee_tier") if isinstance(
volume_data, dict) else None
fees_dict = volume_data.get("fees") if isinstance(
volume_data, dict) else None
fees_maker = volume_data.get("fees_maker") if isinstance(
volume_data, dict) else None
fee_tier = volume_data.get("fee_tier") if isinstance(volume_data, dict) else None
fees_dict = volume_data.get("fees") if isinstance(volume_data, dict) else None
fees_maker = volume_data.get("fees_maker") if isinstance(volume_data, dict) else None
currency = volume_data.get("currency")
thirty_day_volume_str = volume_data.get("volume")
@@ -73,8 +70,7 @@ async def fetch_and_store_account_snapshot(
if currency is not None:
fee_schedule["currency"] = currency
thirty_day_volume = float(
thirty_day_volume_str) if thirty_day_volume_str is not None else None
thirty_day_volume = float(thirty_day_volume_str) if thirty_day_volume_str is not None else None
snapshot = KrakenAccountSnapshot(
snapshot_at=datetime.now(UTC),
@@ -82,8 +78,7 @@ async def fetch_and_store_account_snapshot(
maker_fee=maker_fee,
taker_fee=taker_fee,
thirty_day_volume=thirty_day_volume,
trade_balance_raw=balance_data if isinstance(
balance_data, dict) else None,
trade_balance_raw=balance_data if isinstance(balance_data, dict) else None,
fee_schedule_raw=fee_schedule if fee_schedule else None,
)
@@ -107,8 +102,7 @@ async def fetch_and_store_account_snapshot(
"INSERT INTO portfolio_snapshots"
" (snapshot_at, balances, total_value_usd) VALUES ($1, $2, $3)",
datetime.now(UTC),
orjson.dumps(wallet_balances).decode(
"utf-8") if wallet_balances else None,
orjson.dumps(wallet_balances).decode("utf-8") if wallet_balances else None,
total_value,
)
_LOG.info("portfolio_snapshot_stored", total_value_usd=total_value)
@@ -127,8 +121,7 @@ async def run_fee_sync_loop(
Runs until stop_event is set.
"""
_LOG.info("fee_sync_loop_started",
interval_s=_FEE_REFRESH_INTERVAL_SECONDS)
_LOG.info("fee_sync_loop_started", interval_s=_FEE_REFRESH_INTERVAL_SECONDS)
while not stop_event.is_set():
try:
+7 -14
View File
@@ -47,15 +47,13 @@ class TriangularExecutionSequencer:
rest_client: SupportsOrderPlacement,
*,
available_pairs: Sequence[str],
volume_for_leg: Callable[[OpportunityEvent,
ExecutionLeg, int], float] | None = None,
volume_for_leg: Callable[[OpportunityEvent, ExecutionLeg, int], float] | None = None,
execution_writer: AsyncExecutionWriter | None = None,
alert_notifier: SupportsAlerts | None = None,
audit_repository: AuditRepository | None = None,
) -> None:
self._rest_client = rest_client
self._available_pairs = {self._normalize_pair(
pair) for pair in available_pairs}
self._available_pairs = {self._normalize_pair(pair) for pair in available_pairs}
self._volume_for_leg = volume_for_leg or self._default_volume_for_leg
self._execution_writer = execution_writer
self._alert_notifier = alert_notifier
@@ -102,15 +100,12 @@ class TriangularExecutionSequencer:
raise ValueError(f"No tradable pair for leg {from_cur}->{to_cur}")
def _build_legs(self, event: OpportunityEvent) -> tuple[ExecutionLeg, ...]:
currencies = [part.strip().upper()
for part in event.cycle.split("->") if part.strip()]
currencies = [part.strip().upper() for part in event.cycle.split("->") if part.strip()]
if len(currencies) < 4 or currencies[0] != currencies[-1]:
raise ValueError(
"cycle must be a closed triangular path like A->B->C->A")
raise ValueError("cycle must be a closed triangular path like A->B->C->A")
if len(currencies) != 4:
raise ValueError(
"cycle must contain exactly three unique currencies")
raise ValueError("cycle must contain exactly three unique currencies")
legs: list[ExecutionLeg] = []
for idx in range(3):
@@ -125,8 +120,7 @@ class TriangularExecutionSequencer:
)
volume = self._volume_for_leg(event, placeholder_leg, idx)
if volume <= 0.0:
raise ValueError(
"volume_for_leg must return a positive volume")
raise ValueError("volume_for_leg must return a positive volume")
legs.append(self._resolve_leg(from_currency, to_currency, volume))
return tuple(legs)
@@ -215,8 +209,7 @@ class TriangularExecutionSequencer:
responses.append(response)
if self._execution_writer is not None:
order_ref = self._order_ref_from_response(
response, f"leg-{idx}")
order_ref = self._order_ref_from_response(response, f"leg-{idx}")
await self._execution_writer.enqueue(
OrderRecord(
trade_ref=trade_ref,
+1
View File
@@ -0,0 +1 @@
"""Logging package — DB sink, maintenance tasks."""
+4 -8
View File
@@ -37,9 +37,7 @@ class DbSinkProcessor:
"""Start background consumer task."""
if self._consumer_task is not None and not self._consumer_task.done():
return
self._consumer_task = asyncio.create_task(
self._consume(store), name="log_db_sink"
)
self._consumer_task = asyncio.create_task(self._consume(store), name="log_db_sink")
async def stop_consumer(self) -> None:
"""Drain queue and cancel consumer."""
@@ -52,7 +50,7 @@ class DbSinkProcessor:
pass
self._consumer_task = None
# Flush remaining
await self._flush(store=None)
await self._flush(store=None) # type: ignore[call-arg]
async def _consume(self, store: PgStore) -> None:
repo = LogRepository(store)
@@ -116,8 +114,6 @@ def get_db_sink() -> DbSinkProcessor:
return _db_sink
def db_sink_processor(
logger: Any, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
def db_sink_processor(logger: Any, method_name: str, event_dict: dict[str, Any]) -> dict[str, Any]:
"""Standalone processor function wrapping the singleton."""
return _db_sink(logger, method_name, event_dict)
return _db_sink(logger, method_name, event_dict)
+2 -3
View File
@@ -21,7 +21,7 @@ async def run_log_aggregation(store: PgStore) -> None:
"""Aggregate log counts for the last 2 hours across all periods."""
repo = LogAggregationRepository(store)
since = datetime.now(UTC) - timedelta(hours=2)
periods = ["1h", "3h", "6h", "1d", "1w", "1mo"]
periods = ["1h", "1d", "1w", "1mo"]
for period in periods:
try:
await repo.aggregate_since(since, period)
@@ -36,8 +36,7 @@ async def run_log_archive(store: PgStore, retention_days: int = _RETENTION_DAYS)
repo = LogArchiveRepository(store)
count = await repo.archive_before(cutoff)
if count > 0:
_LOG.info("log_archive_complete",
cutoff=cutoff.isoformat(), archived=count)
_LOG.info("log_archive_complete", cutoff=cutoff.isoformat(), archived=count)
return count
+5 -10
View File
@@ -38,8 +38,7 @@ class MarketDataFeed:
opportunity_writer: AsyncOpportunityWriter | None = None,
paper_trading_mode: bool = True,
opportunity_executor: (
Callable[[OpportunityEvent],
Awaitable[ExecutionOutcome | float | None]] | None
Callable[[OpportunityEvent], Awaitable[ExecutionOutcome | float | None]] | None
) = None,
trade_capital: float = 1.0,
max_trade_capital: float | None = None,
@@ -93,8 +92,7 @@ class MarketDataFeed:
return {}
start = currencies[0]
exposure_assets = {
currency for currency in currencies[1:] if currency != start}
exposure_assets = {currency for currency in currencies[1:] if currency != start}
return {asset: event.allocated_capital for asset in exposure_assets}
async def run(self) -> None:
@@ -315,8 +313,7 @@ class MarketDataFeed:
continue
if self._pre_trade_validator is not None and self._balance_provider is not None:
required_balances = {
self._quote_balance_asset: event.allocated_capital}
required_balances = {self._quote_balance_asset: event.allocated_capital}
balances = {
asset.upper(): amount
for asset, amount in self._balance_provider().items()
@@ -384,8 +381,7 @@ class MarketDataFeed:
outcome = await self._opportunity_executor(event)
except Exception as exc:
if self._trade_limits_guard is not None:
self._trade_limits_guard.close_trade(
exposure_by_asset)
self._trade_limits_guard.close_trade(exposure_by_asset)
dispatch_alert_nowait(
self._alert_notifier,
@@ -451,8 +447,7 @@ class MarketDataFeed:
realized_pnl = outcome
if realized_pnl is not None and self._loss_limit_guard is not None:
self._loss_limit_guard.register_realized_pnl(
realized_pnl)
self._loss_limit_guard.register_realized_pnl(realized_pnl)
if self._loss_limit_guard.is_halted:
_LOG.warning(
"loss_limit_halt_triggered",
+1 -2
View File
@@ -86,8 +86,7 @@ class OrderBook:
BookLevel(price=price, volume=self._bids[price])
for price in reversed(bid_keys[-depth:])
]
asks = [BookLevel(price=price, volume=self._asks[price])
for price in ask_keys[:depth]]
asks = [BookLevel(price=price, volume=self._asks[price]) for price in ask_keys[:depth]]
return bids, asks
def compute_checksum(self, depth: int = 10) -> int:
+41 -22
View File
@@ -51,23 +51,34 @@ class MetricsCalculator:
WHERE volume > 0 AND filled_volume IS NOT NULL
""")
r_pnl_usd = float(
tm["realized_pnl_usd"]) if tm and tm["realized_pnl_usd"] is not None else 0.0
tt = int(tm["total_trades"]
) if tm and tm["total_trades"] is not None else 0
wt = int(tm["winning_trades"]
) if tm and tm["winning_trades"] is not None else 0
r_pnl_usd = (
float(tm["realized_pnl_usd"]) if tm and tm["realized_pnl_usd"] is not None else 0.0
)
tt = int(tm["total_trades"]) if tm and tm["total_trades"] is not None else 0
wt = int(tm["winning_trades"]) if tm and tm["winning_trades"] is not None else 0
wr = wt / tt if tt > 0 else None
atd = float(tm["avg_trade_duration_seconds"]
) if tm and tm["avg_trade_duration_seconds"] is not None else None
atd = (
float(tm["avg_trade_duration_seconds"])
if tm and tm["avg_trade_duration_seconds"] is not None
else None
)
oc = int(om["opportunity_count"]
) if om is not None and om["opportunity_count"] is not None else 0
fo = om["first_detected_at"] if om is not None and isinstance(
om["first_detected_at"], datetime) else None
lo = om["last_detected_at"] if om is not None and isinstance(
om["last_detected_at"], datetime) else None
oc = (
int(om["opportunity_count"])
if om is not None and om["opportunity_count"] is not None
else 0
)
fo = (
om["first_detected_at"]
if om is not None and isinstance(om["first_detected_at"], datetime)
else None
)
lo = (
om["last_detected_at"]
if om is not None and isinstance(om["last_detected_at"], datetime)
else None
)
opportunities_per_minute: float | None
if oc >= 2 and fo is not None and lo is not None:
@@ -80,15 +91,23 @@ class MetricsCalculator:
else:
opportunities_per_minute = None
fill_rate = float(
fm["fill_rate"]) if fm and fm["fill_rate"] is not None else None
fill_rate = float(fm["fill_rate"]) if fm and fm["fill_rate"] is not None else None
lp50 = float(tm["latency_p50_seconds"]
) if tm and tm["latency_p50_seconds"] is not None else None
lp95 = float(tm["latency_p95_seconds"]
) if tm and tm["latency_p95_seconds"] is not None else None
lp99 = float(tm["latency_p99_seconds"]
) if tm and tm["latency_p99_seconds"] is not None else None
lp50 = (
float(tm["latency_p50_seconds"])
if tm and tm["latency_p50_seconds"] is not None
else None
)
lp95 = (
float(tm["latency_p95_seconds"])
if tm and tm["latency_p95_seconds"] is not None
else None
)
lp99 = (
float(tm["latency_p99_seconds"])
if tm and tm["latency_p99_seconds"] is not None
else None
)
return PerformanceMetrics(
realized_pnl_usd=r_pnl_usd,
+3 -1
View File
@@ -106,7 +106,9 @@ async def _run_startup_reconciler(app: FastAPI) -> None:
await result
async def persist_runtime_snapshot(app: FastAPI, *, note: str | None = None) -> RuntimeStateRecord | None:
async def persist_runtime_snapshot(
app: FastAPI, *, note: str | None = None
) -> RuntimeStateRecord | None:
repository = _runtime_repository(app)
if repository is None:
return None
+1 -2
View File
@@ -36,8 +36,7 @@ class AsyncExecutionWriter:
async def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(
self._run(), name="execution-writer")
self._task = asyncio.create_task(self._run(), name="execution-writer")
async def stop(self) -> None:
self._stop.set()
+3 -6
View File
@@ -24,16 +24,14 @@ class MarketSnapshot:
class AsyncMarketSnapshotWriter:
def __init__(self, repository: MarketSnapshotRepository, max_queue_size: int = 50_000) -> None:
self._repository = repository
self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue(
maxsize=max_queue_size)
self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue(maxsize=max_queue_size)
self._task: asyncio.Task[None] | None = None
self._stop = asyncio.Event()
async def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(
self._run(), name="market-snapshot-writer")
self._task = asyncio.create_task(self._run(), name="market-snapshot-writer")
async def stop(self) -> None:
self._stop.set()
@@ -61,7 +59,6 @@ class AsyncMarketSnapshotWriter:
)
)
except Exception as exc:
_LOG.error("market_snapshot_write_failed",
error=str(exc), symbol=item.symbol)
_LOG.error("market_snapshot_write_failed", error=str(exc), symbol=item.symbol)
finally:
self._queue.task_done()
+2 -4
View File
@@ -13,16 +13,14 @@ _LOG = structlog.get_logger(__name__)
class AsyncOpportunityWriter:
def __init__(self, repository: OpportunityRepository, max_queue_size: int = 50_000) -> None:
self._repository = repository
self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue(
maxsize=max_queue_size)
self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue(maxsize=max_queue_size)
self._task: asyncio.Task[None] | None = None
self._stop = asyncio.Event()
async def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(
self._run(), name="opportunity-writer")
self._task = asyncio.create_task(self._run(), name="opportunity-writer")
async def stop(self) -> None:
self._stop.set()
+1 -3
View File
@@ -128,7 +128,5 @@ class PgStore:
col_name = column_def.split()[0]
if col_name not in existing:
async with self.pool.acquire() as conn:
await conn.execute(
f"ALTER TABLE {table_name} ADD COLUMN {column_def}"
)
await conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_def}")
_LOG.info("pg_column_added", table=table_name, column=col_name)
+95 -43
View File
@@ -1,7 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from datetime import datetime
from typing import Any
import orjson
@@ -242,6 +242,12 @@ class AuditRepository:
async def insert(self, record: AuditRecord) -> None:
async with self._store.pool.acquire() as conn:
payload = None
if record.payload is not None:
try:
payload = orjson.dumps(record.payload).decode("utf-8")
except Exception:
payload = None
await conn.execute(
"""
INSERT INTO audit_events (
@@ -258,11 +264,7 @@ class AuditRepository:
record.actor,
record.event_type,
record.decision,
(
None
if record.payload is None
else orjson.dumps(record.payload).decode("utf-8")
),
payload,
record.correlation_id,
)
@@ -282,6 +284,9 @@ class AuditRepository:
for row in rows:
payload: dict[str, Any] | None = None
raw_payload = row["payload"]
correlation_id = None
if row["correlation_id"] is not None:
correlation_id = str(row["correlation_id"])
if isinstance(raw_payload, str) and raw_payload:
decoded = orjson.loads(raw_payload)
if isinstance(decoded, dict):
@@ -294,8 +299,7 @@ class AuditRepository:
event_type=str(row["event_type"]),
decision=str(row["decision"]),
payload=payload,
correlation_id=str(
row["correlation_id"]) if row["correlation_id"] is not None else None,
correlation_id=correlation_id,
)
)
@@ -355,6 +359,9 @@ class RuntimeStateRepository:
balances: dict[str, Any] | None = None
raw_balances = row["last_known_balances"]
kill_switch_reason = None
if row["kill_switch_reason"] is not None:
kill_switch_reason = str(row["kill_switch_reason"])
if isinstance(raw_balances, str) and raw_balances:
decoded = orjson.loads(raw_balances)
if isinstance(decoded, dict):
@@ -364,8 +371,7 @@ class RuntimeStateRepository:
snapshot_at=row["snapshot_at"],
is_running=bool(row["is_running"]),
kill_switch_active=bool(row["kill_switch_active"]),
kill_switch_reason=str(
row["kill_switch_reason"]) if row["kill_switch_reason"] is not None else None,
kill_switch_reason=kill_switch_reason,
open_trade_count=int(row["open_trade_count"]),
last_known_balances=balances,
note=str(row["note"]) if row["note"] is not None else None,
@@ -386,10 +392,16 @@ class ConfigSectionRepository:
VALUES ($1, $2)
RETURNING id, name, description, updated_at
""",
section.name, section.description,
section.name,
section.description,
)
if row:
return ConfigSection(id=row["id"], name=row["name"], description=row["description"], updated_at=row["updated_at"])
return ConfigSection(
id=row["id"],
name=row["name"],
description=row["description"],
updated_at=row["updated_at"],
)
raise ValueError("Failed to create section")
async def get_section(self, name: str) -> ConfigSection | None:
@@ -404,7 +416,12 @@ class ConfigSectionRepository:
name,
)
if row:
return ConfigSection(id=row["id"], name=row["name"], description=row["description"], updated_at=row["updated_at"])
return ConfigSection(
id=row["id"],
name=row["name"],
description=row["description"],
updated_at=row["updated_at"],
)
return None
async def list_sections(self) -> list[ConfigSection]:
@@ -417,7 +434,11 @@ class ConfigSectionRepository:
""")
return [
ConfigSection(
id=r["id"], name=r["name"], description=r["description"], updated_at=r["updated_at"])
id=r["id"],
name=r["name"],
description=r["description"],
updated_at=r["updated_at"],
)
for r in rows
]
@@ -571,7 +592,7 @@ class ConfigSettingRepository:
ts = row["latest_updated_at"]
if isinstance(ts, str):
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
return ts # type: ignore[no-any-return]
return ts # type: ignore[no-any-return]
return None
@@ -614,7 +635,8 @@ class ConfigPairingRepository:
FROM config_pairings
WHERE base_asset = $1 AND quote_asset = $2
""",
base_asset, quote_asset,
base_asset,
quote_asset,
)
if row:
return ConfigPairing(
@@ -665,7 +687,8 @@ class ConfigPairingRepository:
DELETE FROM config_pairings
WHERE base_asset = $1 AND quote_asset = $2
""",
base_asset, quote_asset,
base_asset,
quote_asset,
)
if result is None:
return False
@@ -732,7 +755,9 @@ class ConfigBacktestingDefaultsRepository:
def __init__(self, store: PgStore) -> None:
self._store = store
async def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults:
async def create_defaults(
self, defaults: ConfigBacktestingDefaults
) -> ConfigBacktestingDefaults:
"""Create new backtesting defaults."""
async with self._store.pool.acquire() as conn:
balances_json = (
@@ -753,9 +778,12 @@ class ConfigBacktestingDefaultsRepository:
defaults.execution_latency_ms,
)
if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults(
starting_balances=orjson.loads(
row["starting_balances"]) if row["starting_balances"] else None,
starting_balances=starting_balances,
trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"],
@@ -773,9 +801,11 @@ class ConfigBacktestingDefaultsRepository:
LIMIT 1
""")
if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults(
starting_balances=orjson.loads(
row["starting_balances"]) if row["starting_balances"] else None,
starting_balances=starting_balances,
trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"],
@@ -783,7 +813,9 @@ class ConfigBacktestingDefaultsRepository:
)
return None
async def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults:
async def update_defaults(
self, defaults: ConfigBacktestingDefaults
) -> ConfigBacktestingDefaults:
"""Update the backtesting defaults."""
async with self._store.pool.acquire() as conn:
starting_balances_json = (
@@ -807,9 +839,11 @@ class ConfigBacktestingDefaultsRepository:
defaults.execution_latency_ms,
)
if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults(
starting_balances=orjson.loads(
row["starting_balances"]) if row["starting_balances"] else None,
starting_balances=starting_balances,
trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"],
@@ -872,16 +906,20 @@ class KrakenAccountSnapshotRepository:
""")
if row is None:
return None
trade_balance_raw = None
fee_schedule_raw = None
if row["trade_balance_raw"] is not None:
trade_balance_raw = orjson.loads(row["trade_balance_raw"])
if row["fee_schedule_raw"] is not None:
fee_schedule_raw = orjson.loads(row["fee_schedule_raw"])
return KrakenAccountSnapshot(
snapshot_at=row["snapshot_at"],
fee_tier=row["fee_tier"],
maker_fee=row["maker_fee"],
taker_fee=row["taker_fee"],
thirty_day_volume=row["thirty_day_volume"],
trade_balance_raw=orjson.loads(
row["trade_balance_raw"]) if row["trade_balance_raw"] else None,
fee_schedule_raw=orjson.loads(
row["fee_schedule_raw"]) if row["fee_schedule_raw"] else None,
trade_balance_raw=trade_balance_raw,
fee_schedule_raw=fee_schedule_raw,
)
@@ -916,7 +954,8 @@ class BacktestJobRepository:
VALUES ($1, $2)
RETURNING id, status, events_path, config, created_at
""",
events_path, job_config_json,
events_path,
job_config_json,
)
if row is None:
raise ValueError("Failed to create backtest job")
@@ -933,24 +972,30 @@ class BacktestJobRepository:
if status == "running":
await conn.execute(
"UPDATE backtest_jobs SET status = $1, started_at = CURRENT_TIMESTAMP WHERE id = $2",
status, job_id,
status,
job_id,
)
elif status in ("completed", "failed"):
await conn.execute(
"UPDATE backtest_jobs SET status = $1, finished_at = CURRENT_TIMESTAMP, error = $2 WHERE id = $3",
status, error, job_id,
status,
error,
job_id,
)
else:
await conn.execute(
"UPDATE backtest_jobs SET status = $1, error = $2 WHERE id = $3",
status, error, job_id,
status,
error,
job_id,
)
async def store_report(self, job_id: str, report: dict[str, Any]) -> None:
async with self._store.pool.acquire() as conn:
await conn.execute(
"UPDATE backtest_jobs SET report = $1 WHERE id = $2",
orjson.dumps(report).decode("utf-8"), job_id,
orjson.dumps(report).decode("utf-8"),
job_id,
)
async def get_job(self, job_id: str) -> BacktestJobRecord | None:
@@ -1034,6 +1079,12 @@ class LogRepository:
async def insert(self, record: LogRecord) -> None:
async with self._store.pool.acquire() as conn:
context = None
if record.context:
try:
context = orjson.dumps(record.context).decode("utf-8")
except Exception:
context = None
await conn.execute(
"""
INSERT INTO app_logs (recorded_at, level, logger, message, context)
@@ -1043,7 +1094,7 @@ class LogRepository:
record.level,
record.logger,
record.message,
orjson.dumps(record.context).decode("utf-8") if record.context else None,
context,
)
async def query(
@@ -1082,7 +1133,9 @@ class LogRepository:
ORDER BY recorded_at DESC
LIMIT ${idx} OFFSET ${idx + 1}
""",
*params, limit, offset,
*params,
limit,
offset,
)
return [
LogRecord(
@@ -1153,9 +1206,7 @@ class LogArchiveRepository:
cutoff,
)
# Delete originals
await conn.execute(
"DELETE FROM app_logs WHERE recorded_at < $1", cutoff
)
await conn.execute("DELETE FROM app_logs WHERE recorded_at < $1", cutoff)
if isinstance(result, str):
parts = result.split()
if len(parts) == 2 and parts[0] == "INSERT":
@@ -1171,8 +1222,6 @@ class LogAggregationRepository:
"""Aggregate log counts per level for entries >= since, grouped by period."""
period_map = {
"1h": "date_trunc('hour', recorded_at)",
"3h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 3)",
"6h": "date_trunc('hour', recorded_at) - interval '1 hour' * (extract(hour from recorded_at)::int %% 6)",
"1d": "date_trunc('day', recorded_at)",
"1w": "date_trunc('week', recorded_at)",
"1mo": "date_trunc('month', recorded_at)",
@@ -1221,7 +1270,9 @@ class LogAggregationRepository:
ORDER BY bucket_start DESC
LIMIT $3
""",
period, level.upper(), limit,
period,
level.upper(),
limit,
)
else:
rows = await conn.fetch(
@@ -1232,7 +1283,8 @@ class LogAggregationRepository:
ORDER BY bucket_start DESC
LIMIT $2
""",
period, limit,
period,
limit,
)
return [
LogAggregateRecord(
@@ -45,6 +45,46 @@
<article class="card" style="margin-top: 16px">
<div class="label">Run Backtest</div>
{% if no_enabled_pairings %}
<div
class="flash"
style="
background: rgba(255, 193, 7, 0.15);
border: 1px solid rgba(255, 193, 7, 0.3);
border-radius: 8px;
padding: 10px 16px;
margin-bottom: 16px;
color: #ffe58f;
font-size: 0.9rem;
"
>
No enabled pairings found. Enable at least one pairing on the
<a href="/dashboard/config/pairings" style="color: #ffe58f"
>Pairings page</a
>
before running a backtest.
</div>
{% endif %} {% if flash_message %}
<div
class="flash"
style="
background: rgba(82, 196, 26, 0.15);
border: 1px solid rgba(82, 196, 26, 0.3);
border-radius: 8px;
padding: 10px 16px;
margin-bottom: 16px;
color: #b7eb8f;
font-size: 0.9rem;
"
hx-trigger="load delay:5s"
hx-target="this"
hx-swap="delete"
>
{{ flash_message }}
</div>
{% endif %}
<form
class="form-grid"
hx-post="{{ run_endpoint }}"
@@ -58,26 +98,10 @@
<a href="/dashboard/config/pairings">Configuration → Pairings</a>. Only
enabled pairings are backtested.
</div>
<!-- Required fields -->
<label class="field">
<span>Start time (ISO datetime, optional)</span>
<input
name="start_time"
type="text"
value="{{ start_time | default('') }}"
placeholder="2025-01-01T00:00:00"
/>
</label>
<label class="field">
<span>End time (ISO datetime, optional)</span>
<input
name="end_time"
type="text"
value="{{ end_time | default('') }}"
placeholder="2025-01-02T00:00:00"
/>
</label>
<label class="field">
<span>Starting balances</span>
<span>Starting balances <span style="color: #ff4d4f">*</span></span>
<input
name="starting_balances"
type="text"
@@ -86,17 +110,25 @@
/>
</label>
<label class="field">
<span>Trade capital</span>
<span>Start time <span style="color: #ff4d4f">*</span></span>
<input
name="trade_capital"
type="number"
min="0"
step="0.01"
value="{{ trade_capital }}"
name="start_time"
type="text"
value="{{ start_time | default('') }}"
placeholder="2025-01-01T00:00:00"
/>
</label>
<label class="field">
<span>Min profit threshold</span>
<span>End time <span style="color: #ff4d4f">*</span></span>
<input
name="end_time"
type="text"
value="{{ end_time | default('') }}"
placeholder="2025-01-02T00:00:00"
/>
</label>
<label class="field">
<span>Min profit threshold <span style="color: #ff4d4f">*</span></span>
<input
name="min_profit_threshold"
type="number"
@@ -105,51 +137,67 @@
value="{{ min_profit_threshold }}"
/>
</label>
<label class="field">
<span>Fee profile</span>
<select name="fee_profile">
{% set sel = "selected" if fee_profile == "api" else "" %}
<option value="api" {{ sel }}>api (from Kraken)</option>
{% set sel = "selected" if fee_profile == "standard" else "" %}
<option value="standard" {{ sel }}>standard</option>
{% set sel = "selected" if fee_profile == "maker_heavy" else "" %}
<option value="maker_heavy" {{ sel }}>maker_heavy</option>
{% set sel = "selected" if fee_profile == "taker_heavy" else "" %}
<option value="taker_heavy" {{ sel }}>taker_heavy</option>
{% set sel = "selected" if fee_profile == "custom" else "" %}
<option value="custom" {{ sel }}>custom</option>
</select>
</label>
<label class="field">
<span>Custom fee rate (if fee profile = custom)</span>
<input
name="custom_fee_rate"
type="number"
min="0"
step="0.0001"
value="{{ custom_fee_rate }}"
/>
</label>
<label class="field">
<span>Slippage (bps)</span>
<input
name="slippage_bps"
type="number"
min="0"
step="0.1"
value="{{ slippage_bps }}"
/>
</label>
<label class="field">
<span>Execution latency (ms)</span>
<input
name="execution_latency_ms"
type="number"
min="0"
step="0.1"
value="{{ execution_latency_ms }}"
/>
</label>
<!-- Advanced -->
<details style="grid-column: 1 / -1; margin-top: 8px">
<summary style="cursor: pointer; opacity: 0.7; font-size: 0.85rem">
Advanced options (fee profile, slippage, latency)
</summary>
<div
class="form-grid"
style="
margin-top: 12px;
grid-template-columns: repeat(auto-fit, minmax(240px, 1fr));
"
>
<label class="field">
<span>Fee profile</span>
<select name="fee_profile">
{% set sel = "selected" if fee_profile == "api" else "" %}
<option value="api" {{ sel }}>api (from Kraken)</option>
{% set sel = "selected" if fee_profile == "standard" else "" %}
<option value="standard" {{ sel }}>standard</option>
{% set sel = "selected" if fee_profile == "maker_heavy" else "" %}
<option value="maker_heavy" {{ sel }}>maker_heavy</option>
{% set sel = "selected" if fee_profile == "taker_heavy" else "" %}
<option value="taker_heavy" {{ sel }}>taker_heavy</option>
{% set sel = "selected" if fee_profile == "custom" else "" %}
<option value="custom" {{ sel }}>custom</option>
</select>
</label>
<label class="field">
<span>Custom fee rate (if custom profile)</span>
<input
name="custom_fee_rate"
type="number"
min="0"
step="0.0001"
value="{{ custom_fee_rate }}"
/>
</label>
<label class="field">
<span>Slippage (bps)</span>
<input
name="slippage_bps"
type="number"
min="0"
step="0.1"
value="{{ slippage_bps }}"
/>
</label>
<label class="field">
<span>Execution latency (ms)</span>
<input
name="execution_latency_ms"
type="number"
min="0"
step="0.1"
value="{{ execution_latency_ms }}"
/>
</label>
</div>
</details>
<button type="submit" class="button">Submit Job</button>
</form>
</article>
@@ -1,15 +1,35 @@
<div id="config-panel" class="panel" style="margin-top: 16px">
{% if flash_message %}
<div
class="flash"
style="
background: rgba(82, 196, 26, 0.15);
border: 1px solid rgba(82, 196, 26, 0.3);
border-radius: 8px;
padding: 10px 16px;
margin-bottom: 16px;
color: #b7eb8f;
font-size: 0.9rem;
"
hx-trigger="load delay:3s"
hx-target="this"
hx-swap="delete"
>
{{ flash_message }}
</div>
{% endif %}
<form
class="form-grid"
hx-post="{{ config_endpoint }}"
hx-target="#config-panel"
hx-swap="outerHTML"
style="grid-template-columns: repeat(auto-fit, minmax(320px, 1fr)); gap: 20px"
style="
grid-template-columns: repeat(auto-fit, minmax(320px, 1fr));
gap: 20px;
"
>
{% include "config/runtime.html" %}
{% include "config/alerts.html" %}
{% include "config/kraken.html" %}
{% include "config/risk.html" %}
{% include "config/runtime.html" %} {% include "config/alerts.html" %} {%
include "config/kraken.html" %} {% include "config/risk.html" %}
<div style="grid-column: 1 / -1">
<button type="submit" class="button">Save Settings</button>
</div>
+1 -1
View File
@@ -1 +1 @@
"""Integration tests for PostgreSQL schema and connectivity."""
"""Integration tests for PostgreSQL schema and connectivity."""
+2 -6
View File
@@ -11,13 +11,9 @@ import pathlib
import pytest
def pytest_ignore_collect(
collection_path: pathlib.Path, config: pytest.Config
) -> bool:
def pytest_ignore_collect(collection_path: pathlib.Path, config: pytest.Config) -> bool:
"""Skip integration tests unless --integration is passed."""
if "integration" in str(collection_path) and not config.getoption(
"--integration", False
):
if "integration" in str(collection_path) and not config.getoption("--integration", False):
return True
return False
+60 -10
View File
@@ -42,9 +42,24 @@ async def test_metrics_calculator_summarizes_execution_data() -> None:
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9),
($10, $11, $12, $13, $14, $15, $16, $17, $18)
""",
"trade-1", started, finished, "filled", 12.5, 10.0, 100.0, "USD->BTC->ETH->USD", 3,
"trade-2", started_two, finished_two, "filled", -
4.5, -2.0, 200.0, "USD->ETH->BTC->USD", 3,
"trade-1",
started,
finished,
"filled",
12.5,
10.0,
100.0,
"USD->BTC->ETH->USD",
3,
"trade-2",
started_two,
finished_two,
"filled",
-4.5,
-2.0,
200.0,
"USD->ETH->BTC->USD",
3,
)
await conn.execute(
"""
@@ -53,11 +68,24 @@ async def test_metrics_calculator_summarizes_execution_data() -> None:
($7, $8, $9, $10, $11, $12),
($13, $14, $15, $16, $17, $18)
""",
started, "USD->BTC->ETH->USD", 4.0, 3.0, 0.03, True,
started_two, "USD->ETH->BTC->USD", 2.0, 1.0, 0.01, False,
started_two +
timedelta(
seconds=30), "USD->BTC->ETH->USD", 5.0, 4.0, 0.04, True,
started,
"USD->BTC->ETH->USD",
4.0,
3.0,
0.03,
True,
started_two,
"USD->ETH->BTC->USD",
2.0,
1.0,
0.01,
False,
started_two + timedelta(seconds=30),
"USD->BTC->ETH->USD",
5.0,
4.0,
0.04,
True,
)
await conn.execute(
"""
@@ -67,8 +95,30 @@ async def test_metrics_calculator_summarizes_execution_data() -> None:
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12),
($13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
""",
"trade-1", "order-1", 0, "BTC/USD", "buy", 2.0, 101, "closed", 2.0, 100.0, "{}", started,
"trade-2", "order-2", 0, "ETH/USD", "sell", 4.0, 202, "closed", 3.0, 200.0, "{}", started_two,
"trade-1",
"order-1",
0,
"BTC/USD",
"buy",
2.0,
101,
"closed",
2.0,
100.0,
"{}",
started,
"trade-2",
"order-2",
0,
"ETH/USD",
"sell",
4.0,
202,
"closed",
3.0,
200.0,
"{}",
started_two,
)
metrics = await MetricsCalculator(store).compute()
+99 -35
View File
@@ -25,50 +25,116 @@ EXPECTED_TABLES: dict[str, list[str]] = {
"schema_migrations": ["version", "applied_at"],
"config_sections": ["id", "name", "description", "updated_at"],
"config_settings": [
"key", "section", "value_json", "value_type", "is_secret",
"is_runtime_reloadable", "updated_at", "updated_by",
"key",
"section",
"value_json",
"value_type",
"is_secret",
"is_runtime_reloadable",
"updated_at",
"updated_by",
],
"config_pairings": [
"id", "base_asset", "quote_asset", "enabled", "source",
"created_at", "updated_at",
"id",
"base_asset",
"quote_asset",
"enabled",
"source",
"created_at",
"updated_at",
],
"config_backtesting_defaults": [
"id", "starting_balances", "trade_capital", "min_profit_threshold",
"slippage_bps", "execution_latency_ms", "fee_source",
"id",
"starting_balances",
"trade_capital",
"min_profit_threshold",
"slippage_bps",
"execution_latency_ms",
"fee_source",
],
"opportunities": [
"id", "detected_at", "cycle", "gross_pct", "net_pct",
"est_profit", "executed",
"id",
"detected_at",
"cycle",
"gross_pct",
"net_pct",
"est_profit",
"executed",
],
"trades": [
"id", "trade_ref", "started_at", "finished_at", "status",
"realized_pnl", "estimated_pnl", "capital_used", "cycle", "leg_count",
"id",
"trade_ref",
"started_at",
"finished_at",
"status",
"realized_pnl",
"estimated_pnl",
"capital_used",
"cycle",
"leg_count",
],
"orders": [
"id", "trade_ref", "order_ref", "leg_index", "pair", "side",
"volume", "user_ref", "status", "filled_volume", "avg_price",
"raw_response", "recorded_at",
"id",
"trade_ref",
"order_ref",
"leg_index",
"pair",
"side",
"volume",
"user_ref",
"status",
"filled_volume",
"avg_price",
"raw_response",
"recorded_at",
],
"pnl_events": [
"id", "trade_ref", "recorded_at", "kind", "pnl_usd", "source",
"id",
"trade_ref",
"recorded_at",
"kind",
"pnl_usd",
"source",
],
"portfolio_snapshots": ["snapshot_at", "balances", "total_value_usd"],
"market_snapshots": ["snapshot_at", "symbol", "source", "payload", "latency_ms"],
"audit_events": [
"id", "occurred_at", "actor", "event_type", "decision",
"payload", "correlation_id",
"id",
"occurred_at",
"actor",
"event_type",
"decision",
"payload",
"correlation_id",
],
"runtime_state_snapshots": [
"snapshot_at", "is_running", "kill_switch_active", "kill_switch_reason",
"open_trade_count", "last_known_balances", "note",
"snapshot_at",
"is_running",
"kill_switch_active",
"kill_switch_reason",
"open_trade_count",
"last_known_balances",
"note",
],
"kraken_account_snapshots": [
"snapshot_at", "fee_tier", "maker_fee", "taker_fee",
"thirty_day_volume", "trade_balance_raw", "fee_schedule_raw",
"snapshot_at",
"fee_tier",
"maker_fee",
"taker_fee",
"thirty_day_volume",
"trade_balance_raw",
"fee_schedule_raw",
],
"backtest_jobs": [
"id", "status", "events_path", "config", "report", "error",
"created_at", "started_at", "finished_at",
"id",
"status",
"events_path",
"config",
"report",
"error",
"created_at",
"started_at",
"finished_at",
],
}
@@ -96,6 +162,7 @@ TABLES_WITH_UNIQUE_CONSTRAINTS: dict[str, list[str]] = {
# ── fixtures ────────────────────────────────────────────────────────────────
@asynccontextmanager
async def _pg_lifecycle() -> AsyncIterator[PgStore]:
"""Connect, yield store, then disconnect."""
@@ -116,6 +183,7 @@ async def pg_fixture() -> AsyncIterator[PgStore]:
# ── helpers ─────────────────────────────────────────────────────────────────
async def _get_actual_tables(store: PgStore) -> dict[str, list[str]]:
"""Return {table_name: [column_name, ...]} for the public schema."""
actual: dict[str, list[str]] = {}
@@ -139,6 +207,7 @@ async def _table_row_count(store: PgStore, table: str) -> int:
# ── tests ───────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_pg_connect(pg: PgStore) -> None:
"""Can connect to PostgreSQL and ping the server."""
@@ -165,8 +234,7 @@ async def test_schema_migration_applies(pg: PgStore) -> None:
for table in EXPECTED_TABLES:
assert table in actual, (
f"Table '{table}' missing after migration. "
f"Found tables: {sorted(actual)}"
f"Table '{table}' missing after migration. " f"Found tables: {sorted(actual)}"
)
@@ -190,8 +258,7 @@ async def test_table_columns(pg: PgStore) -> None:
actual_cols = actual.get(table, [])
for col in expected_cols:
assert col in actual_cols, (
f"Column '{col}' missing from table '{table}'. "
f"Actual columns: {actual_cols}"
f"Column '{col}' missing from table '{table}'. " f"Actual columns: {actual_cols}"
)
@@ -250,8 +317,7 @@ async def test_table_row_count_is_zero(pg: PgStore) -> None:
for table in EXPECTED_TABLES:
count = await _table_row_count(pg, table)
assert count == 0, (
f"Table '{table}' should be empty after migration, "
f"but has {count} rows"
f"Table '{table}' should be empty after migration, " f"but has {count} rows"
)
@@ -262,13 +328,10 @@ async def test_schema_migration_version_recorded(pg: PgStore) -> None:
await pg.migrate()
async with pg.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT MAX(version) AS v FROM schema_migrations"
)
row = await conn.fetchrow("SELECT MAX(version) AS v FROM schema_migrations")
assert row is not None
assert row["v"] == SCHEMA_VERSION, (
f"Expected schema version {SCHEMA_VERSION}, "
f"got {row['v']}"
f"Expected schema version {SCHEMA_VERSION}, " f"got {row['v']}"
)
@@ -280,7 +343,8 @@ async def test_create_and_query_row(pg: PgStore) -> None:
# ConfigSections round-trip
await conn.execute(
"INSERT INTO config_sections (name, description) VALUES ($1, $2)",
"test_section", "A test section for integration test",
"test_section",
"A test section for integration test",
)
row = await conn.fetchrow(
"SELECT name, description FROM config_sections WHERE name = $1",
@@ -357,4 +421,4 @@ async def test_audit_list_recent(pg: PgStore) -> None:
# Verify payload serialization worked
first = recent[0]
if first.payload:
assert "index" in first.payload
assert "index" in first.payload
+1 -2
View File
@@ -39,8 +39,7 @@ async def test_end_to_end_config_workflow():
# Mock the setting creation
mock_created_setting = Mock()
mock_created_setting.updated_at = "2023-01-01T00:00:00"
mock_repo_instance.create_setting = AsyncMock(
return_value=mock_created_setting)
mock_repo_instance.create_setting = AsyncMock(return_value=mock_created_setting)
mock_repo_instance.get_setting = AsyncMock(return_value=None)
mock_repo_instance.get_latest_updated_at = AsyncMock(return_value=None)
mock_repo_instance.list_settings = AsyncMock(return_value=[])
+3 -7
View File
@@ -136,10 +136,8 @@ async def test_config_setting_repository_list_settings(mock_store):
repo = ConfigSettingRepository(mock_store)
conn = await mock_store.pool.acquire().__aenter__()
row1 = _make_row({**SETTING_ROW, "key": "test_key1",
"value_json": "test_value1"})
row2 = _make_row({**SETTING_ROW, "key": "test_key2",
"value_json": "test_value2"})
row1 = _make_row({**SETTING_ROW, "key": "test_key1", "value_json": "test_value1"})
row2 = _make_row({**SETTING_ROW, "key": "test_key2", "value_json": "test_value2"})
conn.fetch = AsyncMock(return_value=[row1, row2])
result = await repo.list_settings()
@@ -176,9 +174,7 @@ async def test_config_pairing_repository_create_pairing(mock_store):
conn = await mock_store.pool.acquire().__aenter__()
conn.fetchrow = AsyncMock(return_value=_make_row(PAIRING_ROW))
pairing = ConfigPairing(
base_asset="BTC", quote_asset="USD", enabled=True, source="Kraken"
)
pairing = ConfigPairing(base_asset="BTC", quote_asset="USD", enabled=True, source="Kraken")
result = await repo.create_pairing(pairing)
+11 -12
View File
@@ -63,8 +63,7 @@ async def test_configuration_service_set_setting(mock_settings, mock_store, mock
mock_created_setting = Mock()
mock_created_setting.updated_at = "2023-01-01T00:00:00"
mock_repo_instance.create_setting = AsyncMock(
return_value=mock_created_setting)
mock_repo_instance.create_setting = AsyncMock(return_value=mock_created_setting)
mock_repo_instance.get_setting = AsyncMock(return_value=None)
await service.set_setting("test_key", "test_value", "test_user")
@@ -73,7 +72,9 @@ async def test_configuration_service_set_setting(mock_settings, mock_store, mock
@pytest.mark.asyncio
async def test_configuration_service_hot_reload_detection(mock_settings, mock_store, mock_audit_repo):
async def test_configuration_service_hot_reload_detection(
mock_settings, mock_store, mock_audit_repo
):
"""Test hot-reload detection functionality."""
service = ConfigurationService(mock_settings, mock_store, mock_audit_repo)
@@ -86,8 +87,7 @@ async def test_configuration_service_hot_reload_detection(mock_settings, mock_st
from datetime import datetime
mock_repo_instance.get_latest_updated_at = AsyncMock(
return_value=datetime.now())
mock_repo_instance.get_latest_updated_at = AsyncMock(return_value=datetime.now())
assert await service.is_config_outdated() is True
@@ -105,8 +105,7 @@ async def test_configuration_service_reload_if_changed(mock_settings, mock_store
from datetime import datetime
mock_repo_instance.get_latest_updated_at = AsyncMock(
return_value=datetime.now())
mock_repo_instance.get_latest_updated_at = AsyncMock(return_value=datetime.now())
result = await service.reload_if_changed()
assert result is True
@@ -125,8 +124,7 @@ async def test_configuration_service_get_config_version(mock_settings, mock_stor
mock_created_setting = Mock()
mock_created_setting.updated_at = "2023-01-01T00:00:00"
mock_repo_instance.create_setting = AsyncMock(
return_value=mock_created_setting)
mock_repo_instance.create_setting = AsyncMock(return_value=mock_created_setting)
mock_repo_instance.get_setting = AsyncMock(return_value=None)
await service.set_setting("test_key", "test_value", "test_user")
@@ -134,7 +132,9 @@ async def test_configuration_service_get_config_version(mock_settings, mock_stor
@pytest.mark.asyncio
async def test_configuration_service_get_last_updated_at(mock_settings, mock_store, mock_audit_repo):
async def test_configuration_service_get_last_updated_at(
mock_settings, mock_store, mock_audit_repo
):
"""Test getting last updated timestamp."""
service = ConfigurationService(mock_settings, mock_store, mock_audit_repo)
assert service.get_last_updated_at() is None
@@ -145,8 +145,7 @@ async def test_configuration_service_get_last_updated_at(mock_settings, mock_sto
mock_created_setting = Mock()
mock_created_setting.updated_at = "2023-01-01T00:00:00"
mock_repo_instance.create_setting = AsyncMock(
return_value=mock_created_setting)
mock_repo_instance.create_setting = AsyncMock(return_value=mock_created_setting)
mock_repo_instance.get_setting = AsyncMock(return_value=None)
await service.set_setting("test_key", "test_value", "test_user")
+3 -7
View File
@@ -49,9 +49,7 @@ def _mock_pg_store():
@pytest.fixture
def app():
"""Create a test app with a mocked PgStore and audit repository."""
a = create_app(
Settings(_env_file=None, APP_MODE="paper", paper_trading_mode=True)
)
a = create_app(Settings(_env_file=None, APP_MODE="paper", paper_trading_mode=True))
a.state.store = _mock_pg_store()
a.state.runtime_state_repository.insert = AsyncMock()
a.state.runtime_state_repository.latest = AsyncMock(return_value=None)
@@ -69,16 +67,14 @@ async def test_persist_runtime_snapshot_writes_record(app) -> None:
# Mock _open_trade_count → 0, _latest_balances → None
conn = await app.state.store.pool.acquire().__aenter__()
conn.fetchrow = AsyncMock(return_value=MagicMock(
**{"__getitem__": lambda s, k: 0}))
conn.fetchrow = AsyncMock(return_value=MagicMock(**{"__getitem__": lambda s, k: 0}))
snapshot = await persist_runtime_snapshot(app, note="unit-test")
assert snapshot is not None
assert snapshot.note == "unit-test"
app.state.runtime_state_repository.latest = AsyncMock(
return_value=snapshot)
app.state.runtime_state_repository.latest = AsyncMock(return_value=snapshot)
latest = await app.state.runtime_state_repository.latest()
assert latest is not None
assert latest.note == "unit-test"