Compare commits

..

13 Commits

Author SHA1 Message Date
zwitschi e4f5d8dfcc refactor: clean up imports and improve code formatting across multiple files
CI / lint-test-build (push) Successful in 2m21s
2026-06-09 10:02:41 +02:00
zwitschi 403daa6cf1 Refactor log aggregation periods and improve code formatting
- Removed the "3h" and "6h" periods from the log aggregation process in maintenance.py to streamline log counts.
- Enhanced code readability by adjusting line breaks and indentation in repositories.py for better clarity.
2026-06-09 09:41:23 +02:00
zwitschi dc99f1604e Refactor code for improved readability and consistency
CI / lint-test-build (push) Successful in 54s
- Cleaned up multiline statements and removed unnecessary line breaks in various files.
- Ensured consistent formatting in function definitions and calls across the codebase.
- Updated docstrings and comments for clarity where applicable.
- Removed trailing newlines in module docstrings.
- Enhanced logging statements for better clarity in maintenance tasks.
2026-06-07 21:59:09 +02:00
zwitschi f221464daa feat: enhance backtesting panel with flash messages and pairing checks
CI / lint-test-build (push) Failing after 12s
2026-06-07 21:51:09 +02:00
zwitschi 5e7732b85f feat: add flash message support to configuration panel and improve layout
CI / lint-test-build (push) Successful in 52s
2026-06-07 19:57:42 +02:00
zwitschi 77dfb08b23 feat: update package data inclusion to add dashboard templates and reorder partials
CI / lint-test-build (push) Successful in 1m6s
2026-06-07 19:35:05 +02:00
zwitschi 9acabddb7e feat: include config HTML templates in package data
CI / lint-test-build (push) Successful in 2m23s
2026-06-07 18:46:12 +02:00
zwitschi 2fbc78f7a9 feat: add logging package with DB sink and maintenance tasks
CI / lint-test-build (push) Successful in 2m24s
2026-06-07 18:31:27 +02:00
zwitschi f58634d438 feat: add storage schema SQL file to package data inclusion
CI / lint-test-build (push) Successful in 2m31s
2026-06-07 18:29:16 +02:00
zwitschi e44876c7c7 refactor: clean up imports and improve code formatting in various modules
CI / lint-test-build (push) Successful in 49s
2026-06-07 18:17:45 +02:00
zwitschi 1e4086a0fd feat: add logging routes and update health page to display system logs
CI / lint-test-build (push) Failing after 12s
2026-06-07 18:10:50 +02:00
zwitschi cf5ff2e2d8 feat: implement logging system with aggregation and archiving tasks 2026-06-07 18:06:35 +02:00
zwitschi db2e02c316 feat: add pairings management page and integrate with Kraken API for syncing
feat: create configuration templates for alerts, Kraken settings, risk limits, and runtime settings
refactor: streamline config form by including separate template files for better organization
2026-06-07 17:44:26 +02:00
43 changed files with 3124 additions and 2110 deletions
+10 -7
View File
@@ -53,7 +53,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
- `detection/` - triangular graph and incremental detector. - `detection/` - triangular graph and incremental detector.
- `risk/` - pre-trade and trade-limit guards. - `risk/` - pre-trade and trade-limit guards.
- `execution/` - multi-leg trade sequencing. - `execution/` - multi-leg trade sequencing.
- `backtesting/` - replay engine, parameter sweep, experiment scaffolds. - `backtesting/` - replay engine, parameter sweep, experiment scaffolds. See [backtesting.md](backtesting.md).
- `strategy/` - experimental strategy modules such as stat-arb. - `strategy/` - experimental strategy modules such as stat-arb.
- `storage/` - PostgreSQL schema and repositories. - `storage/` - PostgreSQL schema and repositories.
- `alerting/` - multi-channel notifications. - `alerting/` - multi-channel notifications.
@@ -77,7 +77,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
3. Incremental detector scores impacted cycles. 3. Incremental detector scores impacted cycles.
4. Risk manager validates the opportunity. 4. Risk manager validates the opportunity.
5. Execution sequencer places legs if approved. 5. Execution sequencer places legs if approved.
7. Trades and snapshots persist to PostgreSQL. 6. Trades and snapshots persist to PostgreSQL.
7. Dashboard and alerts reflect state changes. 7. Dashboard and alerts reflect state changes.
### 6.2 Dashboard Control Flow ### 6.2 Dashboard Control Flow
@@ -89,11 +89,14 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
### 6.3 Backtesting Flow ### 6.3 Backtesting Flow
1. User selects JSONL replay file and run parameters. See [backtesting.md](backtesting.md) for full design and implementation details.
2. Replay engine loads ordered book events.
3. Detector, risk, and execution logic run in simulation mode. 1. User picks currency pairs (from config/pairings page, or all enabled).
4. Report is stored in memory for recent UI display. 2. User sets starting balances (required), time range (required), min profit threshold (required).
5. Parameter sweeps split data into train/test windows, rank results, and flag overfit. 3. Fee profile defaults to "api (from Kraken)"; slippage (4.0 bps) and execution latency (20 ms) are optional with sensible defaults.
4. Job is queued via `POST /dashboard/backtesting/run`.
5. Backend loads events from `market_snapshots` table, builds triangular cycles, runs replay engine.
6. Report stored in `backtest_jobs` table, visible in recent jobs list.
## 7. Deployment View ## 7. Deployment View
+130
View File
@@ -0,0 +1,130 @@
# Backtesting Architecture
> Detailed design and implementation of the backtesting subsystem.
> See [`README.md`](README.md#63-backtesting-flow) for the high-level user flow.
## Data Flow
```txt
market_snapshots (DB) ─┐
├──→ load_replay_events_from_db() ──→ list[ReplayBookEvent]
JSONL file ─────────────┘
BacktestReplayEngine.run()
BacktestReport
BacktestJobRepository.store_report()
```
Two event sources:
- **DB mode** (default) — loads snapshots from `market_snapshots` table. Supports symbol/time filtering.
- **File mode** — reads JSONL files from disk (legacy, used by `backtest_replay.py` script).
## Core Types
### `ReplayClock`
Timekeeper for simulation. Ensures events advance monotonically. Supports `advance_ms()` to model execution latency.
### `ReplayBookEvent`
One atomic book state at a point in time. Fields: `occurred_at`, `symbol`, `bids: tuple[BookLevel]`, `asks: tuple[BookLevel]`.
### `BacktestConfig`
| Field | Default | Description |
| ------------------------ | -------- | ----------------------------------------------------- |
| `fee_rate` | `0.0` | 0.0 → API-sourced fee from `kraken_account_snapshots` |
| `min_profit_threshold` | `0.0005` | Minimum net profit to attempt trade |
| `trade_capital` | `100.0` | Capital allocated per trade |
| `quote_asset` | `"USD"` | Base currency for P&L |
| `slippage_bps` | `4.0` | Simulated slippage in basis points |
| `execution_latency_ms` | `20.0` | Simulated latency per leg |
| `max_depth_levels` | `10` | Order book depth for detection |
| `max_concurrent_trades` | `1` | Max simultaneous trades |
| `min_order_size_by_pair` | `None` | Per-pair min order size overrides |
### `BacktestReport`
| Field | Type | Description |
| -------------------------------- | -------------- | ---------------------------------- |
| `started_at` / `finished_at` | datetime | Simulation window |
| `processed_events` | int | Events consumed |
| `opportunities_seen` | int | Detected opportunities |
| `trades_executed` | int | Successful trades |
| `win_rate` | float or None | Fraction of profitable trades |
| `fill_rate` | float or None | Average fill ratio |
| `realized_pnl_usd` | float | Net P&L after slippage |
| `max_drawdown_usd` | float | Peak-to-trough equity drop |
| `miss_reasons` | dict[str, int] | Counters for skipped opportunities |
| `execution_latency_p50/95/99_ms` | float or None | Latency percentiles |
## Simulation Client
`_SimulatedRestClient` replaces the real Kraken REST client during backtesting.
- **Slippage model:** `fill_ratio = max(0.85, 1.0 - (slippage_bps / 10000.0) * 8.0)`
- **Latency model:** Clock advances by `execution_latency_ms` before each simulated fill
- Orders always fill (status = `"closed"`) at the modeled ratio
## Job Worker
`backtest_worker` is an `asyncio.Task` started in `create_app()` lifespan:
```python
backtest_task = asyncio.create_task(
backtest_worker(backtest_queue, db),
name="backtest_worker",
)
```
Workflow per job:
1. Dequeue `(job_id, config_dict)` from `asyncio.Queue`
2. Update status → `"running"` in `backtest_jobs` table
3. Load events (DB or file)
4. Build currency graph → triangular cycles
5. Instantiate `BacktestReplayEngine``engine.run()`
6. Store report → update status → `"completed"` (or `"failed"` on exception)
## Sweep Pipeline
`run_parameter_search` performs grid search over backtest parameters:
1. **Split** events into train/test windows by time ratio
2. **Build grid** — cartesian product of `theta_values × trade_capital_values × pair_universes × staleness_threshold_values`
3. **For each parameter set:**
- Filter events to pair universe + apply staleness gate
- Build cycles restricted to pair universe
- Run engine on train window → `train_report`
- Run engine on test window → `test_report`
- Score = `realized_pnl + win_rate_bonus + fill_rate_bonus - max_drawdown`
- Compute generalization gap = `|train_score - test_score| / max(train_score, test_score)`
4. **Evaluate promotion:**
- `PromotionCriteria` checks: min test P&L, min win rate ≥ 0.5, min fill rate ≥ 0.9, max drawdown ≤ $25, generalization gap ≤ 0.5
- Results passing all criteria are flagged `promotion_ready`
## UI
> See `backtesting.html` → `partials/backtesting_panel.html`.
- **Shell page** loads the panel via `hx-get="/dashboard/fragment/backtesting"`
- **Run form** — starting balances, time range, profit threshold (required); fee profile, slippage, latency (advanced/collapsible)
- **Status card** — current job status + message
- **Recent jobs table** — lists last 20 jobs with status, events, trades, P&L; each row has a detail button
- **Job detail** — `GET /dashboard/backtesting/job/{id}` returns report HTML
Pairings are managed on the `/dashboard/config/pairings` page. Backtest uses DB-enabled pairings by default when no symbols are specified.
## Source Files
| File | Role |
| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `backtesting/replay.py` | `ReplayClock`, `ReplayBookEvent`, `BacktestConfig`, `BacktestReport`, `_SimulatedRestClient`, `BacktestReplayEngine`, `load_replay_events`, `load_replay_events_from_db` |
| `backtesting/runner.py` | `run_backtest_job`, `backtest_worker`, `_build_cycles_from_events`, `_parse_balances` |
| `backtesting/sweep.py` | `SweepParameters`, `SweepResult`, `SweepArtifacts`, `PromotionCriteria`, `split_events_time_windows`, `build_parameter_grid`, `run_parameter_search`, `persist_sweep_results` |
+3
View File
@@ -27,7 +27,10 @@ include-package-data = true
[tool.setuptools.package-data] [tool.setuptools.package-data]
arbitrade = [ arbitrade = [
"web/templates/*.html", "web/templates/*.html",
"web/templates/config/*.html",
"web/templates/dashboard/*.html",
"web/templates/partials/*.html", "web/templates/partials/*.html",
"storage/schema_pg.sql",
] ]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
+1 -2
View File
@@ -67,8 +67,7 @@ async def _seed_dataset(store: PgStore) -> None:
opportunity_rows: list[tuple[object, ...]] = [] opportunity_rows: list[tuple[object, ...]] = []
for i in range(5000): for i in range(5000):
detected_at = now + timedelta(milliseconds=200 * i) detected_at = now + timedelta(milliseconds=200 * i)
opportunity_rows.append( opportunity_rows.append((detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2)))
(detected_at, "USD->BTC->ETH->USD", 2.5, 1.2, 0.03, bool(i % 2)))
order_rows: list[tuple[object, ...]] = [] order_rows: list[tuple[object, ...]] = []
for i in range(3500): for i in range(3500):
+10
View File
@@ -17,6 +17,8 @@ from arbitrade.config.settings import Settings
from arbitrade.exchange.fee_service import run_fee_sync_loop from arbitrade.exchange.fee_service import run_fee_sync_loop
from arbitrade.exchange.kraken_rest import KrakenRestClient from arbitrade.exchange.kraken_rest import KrakenRestClient
from arbitrade.exchange.kraken_ws import KrakenWsClient from arbitrade.exchange.kraken_ws import KrakenWsClient
from arbitrade.logging.db_sink import get_db_sink
from arbitrade.logging.maintenance import run_log_aggregation_loop, run_log_archive_loop
from arbitrade.logging_setup import configure_logging from arbitrade.logging_setup import configure_logging
from arbitrade.market_data.feed import MarketDataFeed from arbitrade.market_data.feed import MarketDataFeed
from arbitrade.market_data.feed_builder import ( from arbitrade.market_data.feed_builder import (
@@ -108,6 +110,7 @@ def create_app(settings: Settings) -> FastAPI:
async def lifespan(app: FastAPI) -> AsyncIterator[None]: async def lifespan(app: FastAPI) -> AsyncIterator[None]:
await app.state.store.start() await app.state.store.start()
await app.state.store.migrate() await app.state.store.migrate()
get_db_sink().start_consumer(db)
await app.state.configuration_service.load_database_settings() await app.state.configuration_service.load_database_settings()
await restore_runtime_state(app) await restore_runtime_state(app)
fee_sync_task = asyncio.create_task( fee_sync_task = asyncio.create_task(
@@ -135,6 +138,12 @@ def create_app(settings: Settings) -> FastAPI:
app.state.fee_sync_task = fee_sync_task app.state.fee_sync_task = fee_sync_task
app.state.pairing_sync_task = pairing_sync_task app.state.pairing_sync_task = pairing_sync_task
app.state.backtest_task = backtest_task app.state.backtest_task = backtest_task
app.state.log_aggregation_task = asyncio.create_task(
run_log_aggregation_loop(db), name="log_aggregation"
)
app.state.log_archive_task = asyncio.create_task(
run_log_archive_loop(db), name="log_archive"
)
yield yield
fee_sync_stop_event.set() fee_sync_stop_event.set()
pairing_sync_stop_event.set() pairing_sync_stop_event.set()
@@ -170,6 +179,7 @@ def create_app(settings: Settings) -> FastAPI:
await kraken_client.close() await kraken_client.close()
await graceful_shutdown(app) await graceful_shutdown(app)
await app.state.store.stop() await app.state.store.stop()
await get_db_sink().stop_consumer()
app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan) app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan)
app.state.settings = settings app.state.settings = settings
+120 -1101
View File
File diff suppressed because it is too large Load Diff
+38 -76
View File
@@ -31,41 +31,26 @@ class Settings(BaseSettings):
) )
alerts_enabled: bool = Field(default=True, alias="ALERTS_ENABLED") alerts_enabled: bool = Field(default=True, alias="ALERTS_ENABLED")
alert_min_severity: str = Field( alert_min_severity: str = Field(default="warning", alias="ALERT_MIN_SEVERITY")
default="warning", alias="ALERT_MIN_SEVERITY") alert_dedup_seconds: float = Field(default=30.0, alias="ALERT_DEDUP_SECONDS")
alert_dedup_seconds: float = Field( alert_on_trade_events: bool = Field(default=True, alias="ALERT_ON_TRADE_EVENTS")
default=30.0, alias="ALERT_DEDUP_SECONDS") alert_on_error_events: bool = Field(default=True, alias="ALERT_ON_ERROR_EVENTS")
alert_on_trade_events: bool = Field( alert_on_threshold_events: bool = Field(default=True, alias="ALERT_ON_THRESHOLD_EVENTS")
default=True, alias="ALERT_ON_TRADE_EVENTS") alert_on_system_events: bool = Field(default=True, alias="ALERT_ON_SYSTEM_EVENTS")
alert_on_error_events: bool = Field(
default=True, alias="ALERT_ON_ERROR_EVENTS")
alert_on_threshold_events: bool = Field(
default=True, alias="ALERT_ON_THRESHOLD_EVENTS")
alert_on_system_events: bool = Field(
default=True, alias="ALERT_ON_SYSTEM_EVENTS")
telegram_alerts_enabled: bool = Field( telegram_alerts_enabled: bool = Field(default=False, alias="TELEGRAM_ALERTS_ENABLED")
default=False, alias="TELEGRAM_ALERTS_ENABLED") telegram_bot_token: str | None = Field(default=None, alias="TELEGRAM_BOT_TOKEN")
telegram_bot_token: str | None = Field( telegram_chat_id: str | None = Field(default=None, alias="TELEGRAM_CHAT_ID")
default=None, alias="TELEGRAM_BOT_TOKEN")
telegram_chat_id: str | None = Field(
default=None, alias="TELEGRAM_CHAT_ID")
discord_alerts_enabled: bool = Field( discord_alerts_enabled: bool = Field(default=False, alias="DISCORD_ALERTS_ENABLED")
default=False, alias="DISCORD_ALERTS_ENABLED") discord_webhook_url: str | None = Field(default=None, alias="DISCORD_WEBHOOK_URL")
discord_webhook_url: str | None = Field(
default=None, alias="DISCORD_WEBHOOK_URL")
email_alerts_enabled: bool = Field( email_alerts_enabled: bool = Field(default=False, alias="EMAIL_ALERTS_ENABLED")
default=False, alias="EMAIL_ALERTS_ENABLED")
email_smtp_host: str | None = Field(default=None, alias="EMAIL_SMTP_HOST") email_smtp_host: str | None = Field(default=None, alias="EMAIL_SMTP_HOST")
email_smtp_port: int = Field(default=587, alias="EMAIL_SMTP_PORT") email_smtp_port: int = Field(default=587, alias="EMAIL_SMTP_PORT")
email_smtp_username: str | None = Field( email_smtp_username: str | None = Field(default=None, alias="EMAIL_SMTP_USERNAME")
default=None, alias="EMAIL_SMTP_USERNAME") email_smtp_password: str | None = Field(default=None, alias="EMAIL_SMTP_PASSWORD")
email_smtp_password: str | None = Field( email_alert_from: str | None = Field(default=None, alias="EMAIL_ALERT_FROM")
default=None, alias="EMAIL_SMTP_PASSWORD")
email_alert_from: str | None = Field(
default=None, alias="EMAIL_ALERT_FROM")
email_alert_to: str | None = Field(default=None, alias="EMAIL_ALERT_TO") email_alert_to: str | None = Field(default=None, alias="EMAIL_ALERT_TO")
email_smtp_use_tls: bool = Field(default=True, alias="EMAIL_SMTP_USE_TLS") email_smtp_use_tls: bool = Field(default=True, alias="EMAIL_SMTP_USE_TLS")
@@ -78,31 +63,24 @@ class Settings(BaseSettings):
pg_min_connections: int = Field(default=2, alias="PG_MIN_CONNECTIONS") pg_min_connections: int = Field(default=2, alias="PG_MIN_CONNECTIONS")
pg_max_connections: int = Field(default=10, alias="PG_MAX_CONNECTIONS") pg_max_connections: int = Field(default=10, alias="PG_MAX_CONNECTIONS")
kraken_rest_url: str = Field( kraken_rest_url: str = Field(default="https://api.kraken.com", alias="KRAKEN_REST_URL")
default="https://api.kraken.com", alias="KRAKEN_REST_URL") kraken_ws_url: str = Field(default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL")
kraken_ws_url: str = Field(
default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL")
kraken_private_rate_limit_seconds: float = Field( kraken_private_rate_limit_seconds: float = Field(
default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS" default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS"
) )
kraken_http_timeout_seconds: float = Field( kraken_http_timeout_seconds: float = Field(default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS")
default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS") kraken_retry_attempts: int = Field(default=3, alias="KRAKEN_RETRY_ATTEMPTS")
kraken_retry_attempts: int = Field(
default=3, alias="KRAKEN_RETRY_ATTEMPTS")
kraken_retry_base_delay_seconds: float = Field( kraken_retry_base_delay_seconds: float = Field(
default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS" default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS"
) )
kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY") kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY")
kraken_api_secret: str | None = Field( kraken_api_secret: str | None = Field(default=None, alias="KRAKEN_API_SECRET")
default=None, alias="KRAKEN_API_SECRET")
kraken_api_key_permissions: str = Field( kraken_api_key_permissions: str = Field(
default="query,trade", default="query,trade",
alias="KRAKEN_API_KEY_PERMISSIONS", alias="KRAKEN_API_KEY_PERMISSIONS",
) )
ws_heartbeat_timeout_seconds: float = Field( ws_heartbeat_timeout_seconds: float = Field(default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS")
default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS") ws_max_staleness_seconds: float = Field(default=5.0, alias="WS_MAX_STALENESS_SECONDS")
ws_max_staleness_seconds: float = Field(
default=5.0, alias="WS_MAX_STALENESS_SECONDS")
strategy_enable_stat_arb_experiment: bool = Field( strategy_enable_stat_arb_experiment: bool = Field(
default=False, default=False,
alias="STRATEGY_ENABLE_STAT_ARB_EXPERIMENT", alias="STRATEGY_ENABLE_STAT_ARB_EXPERIMENT",
@@ -125,29 +103,20 @@ class Settings(BaseSettings):
) )
paper_trading_mode: bool = Field(default=True, alias="PAPER_TRADING_MODE") paper_trading_mode: bool = Field(default=True, alias="PAPER_TRADING_MODE")
trade_capital_usd: float = Field(default=100.0, alias="TRADE_CAPITAL_USD") trade_capital_usd: float = Field(default=100.0, alias="TRADE_CAPITAL_USD")
max_trade_capital_usd: float = Field( max_trade_capital_usd: float = Field(default=100.0, alias="MAX_TRADE_CAPITAL_USD")
default=100.0, alias="MAX_TRADE_CAPITAL_USD") max_concurrent_trades: int | None = Field(default=None, alias="MAX_CONCURRENT_TRADES")
max_concurrent_trades: int | None = Field(
default=None, alias="MAX_CONCURRENT_TRADES")
max_exposure_per_asset_usd: float | None = Field( max_exposure_per_asset_usd: float | None = Field(
default=None, default=None,
alias="MAX_EXPOSURE_PER_ASSET_USD", alias="MAX_EXPOSURE_PER_ASSET_USD",
) )
quote_balance_asset: str = Field( quote_balance_asset: str = Field(default="USD", alias="QUOTE_BALANCE_ASSET")
default="USD", alias="QUOTE_BALANCE_ASSET") min_order_size_usd: float | None = Field(default=None, alias="MIN_ORDER_SIZE_USD")
min_order_size_usd: float | None = Field(
default=None, alias="MIN_ORDER_SIZE_USD")
kill_switch_active: bool = Field(default=False, alias="KILL_SWITCH_ACTIVE") kill_switch_active: bool = Field(default=False, alias="KILL_SWITCH_ACTIVE")
daily_loss_limit_usd: float | None = Field( daily_loss_limit_usd: float | None = Field(default=None, alias="DAILY_LOSS_LIMIT_USD")
default=None, alias="DAILY_LOSS_LIMIT_USD") cumulative_loss_limit_usd: float | None = Field(default=None, alias="CUMULATIVE_LOSS_LIMIT_USD")
cumulative_loss_limit_usd: float | None = Field( max_source_latency_ms: float | None = Field(default=None, alias="MAX_SOURCE_LATENCY_MS")
default=None, alias="CUMULATIVE_LOSS_LIMIT_USD") max_apply_latency_ms: float | None = Field(default=None, alias="MAX_APPLY_LATENCY_MS")
max_source_latency_ms: float | None = Field( max_consecutive_failures: int | None = Field(default=None, alias="MAX_CONSECUTIVE_FAILURES")
default=None, alias="MAX_SOURCE_LATENCY_MS")
max_apply_latency_ms: float | None = Field(
default=None, alias="MAX_APPLY_LATENCY_MS")
max_consecutive_failures: int | None = Field(
default=None, alias="MAX_CONSECUTIVE_FAILURES")
fernet_key: str | None = Field(default=None, alias="FERNET_KEY") fernet_key: str | None = Field(default=None, alias="FERNET_KEY")
@@ -164,8 +133,7 @@ class Settings(BaseSettings):
def _validate_log_level(cls, value: str) -> str: def _validate_log_level(cls, value: str) -> str:
normalized = value.strip().upper() normalized = value.strip().upper()
if normalized not in {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}: if normalized not in {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}:
raise ValueError( raise ValueError("LOG_LEVEL must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL")
"LOG_LEVEL must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL")
return normalized return normalized
@field_validator("alert_min_severity") @field_validator("alert_min_severity")
@@ -173,19 +141,16 @@ class Settings(BaseSettings):
def _validate_alert_severity(cls, value: str) -> str: def _validate_alert_severity(cls, value: str) -> str:
normalized = value.strip().lower() normalized = value.strip().lower()
if normalized not in {"info", "warning", "error", "critical"}: if normalized not in {"info", "warning", "error", "critical"}:
raise ValueError( raise ValueError("ALERT_MIN_SEVERITY must be one of: info, warning, error, critical")
"ALERT_MIN_SEVERITY must be one of: info, warning, error, critical")
return normalized return normalized
@model_validator(mode="after") @model_validator(mode="after")
def _validate_security_constraints(self) -> Settings: def _validate_security_constraints(self) -> Settings:
if bool(self.dashboard_auth_username) ^ bool(self.dashboard_auth_password): if bool(self.dashboard_auth_username) ^ bool(self.dashboard_auth_password):
raise ValueError( raise ValueError("dashboard auth requires both username and password")
"dashboard auth requires both username and password")
if bool(self.kraken_api_key) ^ bool(self.kraken_api_secret): if bool(self.kraken_api_key) ^ bool(self.kraken_api_secret):
raise ValueError( raise ValueError("Kraken API auth requires both API key and secret")
"Kraken API auth requires both API key and secret")
permissions = { permissions = {
token.strip().lower() token.strip().lower()
@@ -193,11 +158,9 @@ class Settings(BaseSettings):
if token.strip() if token.strip()
} }
if permissions and ("query" not in permissions or "trade" not in permissions): if permissions and ("query" not in permissions or "trade" not in permissions):
raise ValueError( raise ValueError("KRAKEN_API_KEY_PERMISSIONS must include query and trade")
"KRAKEN_API_KEY_PERMISSIONS must include query and trade")
if "withdraw" in permissions or "withdrawals" in permissions: if "withdraw" in permissions or "withdrawals" in permissions:
raise ValueError( raise ValueError("KRAKEN_API_KEY_PERMISSIONS must not include withdrawal scope")
"KRAKEN_API_KEY_PERMISSIONS must not include withdrawal scope")
if self.alert_dedup_seconds < 0.0: if self.alert_dedup_seconds < 0.0:
raise ValueError("ALERT_DEDUP_SECONDS must be >= 0") raise ValueError("ALERT_DEDUP_SECONDS must be >= 0")
@@ -213,8 +176,7 @@ class Settings(BaseSettings):
"STRATEGY_STAT_ARB_ENTRY_ZSCORE must be greater than STRATEGY_STAT_ARB_EXIT_ZSCORE" "STRATEGY_STAT_ARB_ENTRY_ZSCORE must be greater than STRATEGY_STAT_ARB_EXIT_ZSCORE"
) )
if self.strategy_stat_arb_max_holding_seconds <= 0.0: if self.strategy_stat_arb_max_holding_seconds <= 0.0:
raise ValueError( raise ValueError("STRATEGY_STAT_ARB_MAX_HOLDING_SECONDS must be > 0")
"STRATEGY_STAT_ARB_MAX_HOLDING_SECONDS must be > 0")
return self return self
+1
View File
@@ -0,0 +1 @@
"""Dashboard module for monitoring and controlling the arbitrage bot."""
+63
View File
@@ -0,0 +1,63 @@
from __future__ import annotations
from fastapi import Request
from arbitrade.storage.repositories import (
BacktestJobRepository,
)
async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
repo = BacktestJobRepository(request.app.state.store)
jobs = await repo.list_jobs(limit=5)
reports = []
for job in jobs:
report: dict[str, object] = {
"id": str(job.id),
"status": job.status,
}
if job.created_at is not None:
report["created_at"] = job.created_at.isoformat()
if job.finished_at is not None:
report["finished_at"] = job.finished_at.isoformat()
reports.append(report)
return reports
async def _backtesting_panel_context(
request: Request,
*,
status: str = "idle",
message: str = "Configure a replay run and execute backtest.",
latest_report: dict[str, object] | None = None,
defaults: dict[str, str] | None = None,
) -> dict[str, object]:
default_values = {
"symbols": "",
"start_time": "",
"end_time": "",
"starting_balances": "USD=1000.0",
"trade_capital": "100.0",
"min_profit_threshold": "0.0005",
"fee_profile": "api",
"custom_fee_rate": "",
"slippage_bps": "4.0",
"execution_latency_ms": "20.0",
}
if defaults is not None:
default_values.update(defaults)
reports = await _recent_backtest_reports(request)
latest = latest_report or (reports[0] if reports else None)
return {
"status": status,
"message": message,
"flash_message": "",
"no_enabled_pairings": False,
"latest_report": latest,
"recent_reports": reports,
"run_endpoint": "/dashboard/backtesting/run",
"reports_endpoint": "/dashboard/api/backtesting/reports",
**default_values,
}
File diff suppressed because it is too large Load Diff
+7 -14
View File
@@ -42,12 +42,9 @@ async def fetch_and_store_account_snapshot(
_LOG.exception("trade_balance_fetch_failed") _LOG.exception("trade_balance_fetch_failed")
return None return None
fee_tier = volume_data.get("fee_tier") if isinstance( fee_tier = volume_data.get("fee_tier") if isinstance(volume_data, dict) else None
volume_data, dict) else None fees_dict = volume_data.get("fees") if isinstance(volume_data, dict) else None
fees_dict = volume_data.get("fees") if isinstance( fees_maker = volume_data.get("fees_maker") if isinstance(volume_data, dict) else None
volume_data, dict) else None
fees_maker = volume_data.get("fees_maker") if isinstance(
volume_data, dict) else None
currency = volume_data.get("currency") currency = volume_data.get("currency")
thirty_day_volume_str = volume_data.get("volume") thirty_day_volume_str = volume_data.get("volume")
@@ -73,8 +70,7 @@ async def fetch_and_store_account_snapshot(
if currency is not None: if currency is not None:
fee_schedule["currency"] = currency fee_schedule["currency"] = currency
thirty_day_volume = float( thirty_day_volume = float(thirty_day_volume_str) if thirty_day_volume_str is not None else None
thirty_day_volume_str) if thirty_day_volume_str is not None else None
snapshot = KrakenAccountSnapshot( snapshot = KrakenAccountSnapshot(
snapshot_at=datetime.now(UTC), snapshot_at=datetime.now(UTC),
@@ -82,8 +78,7 @@ async def fetch_and_store_account_snapshot(
maker_fee=maker_fee, maker_fee=maker_fee,
taker_fee=taker_fee, taker_fee=taker_fee,
thirty_day_volume=thirty_day_volume, thirty_day_volume=thirty_day_volume,
trade_balance_raw=balance_data if isinstance( trade_balance_raw=balance_data if isinstance(balance_data, dict) else None,
balance_data, dict) else None,
fee_schedule_raw=fee_schedule if fee_schedule else None, fee_schedule_raw=fee_schedule if fee_schedule else None,
) )
@@ -107,8 +102,7 @@ async def fetch_and_store_account_snapshot(
"INSERT INTO portfolio_snapshots" "INSERT INTO portfolio_snapshots"
" (snapshot_at, balances, total_value_usd) VALUES ($1, $2, $3)", " (snapshot_at, balances, total_value_usd) VALUES ($1, $2, $3)",
datetime.now(UTC), datetime.now(UTC),
orjson.dumps(wallet_balances).decode( orjson.dumps(wallet_balances).decode("utf-8") if wallet_balances else None,
"utf-8") if wallet_balances else None,
total_value, total_value,
) )
_LOG.info("portfolio_snapshot_stored", total_value_usd=total_value) _LOG.info("portfolio_snapshot_stored", total_value_usd=total_value)
@@ -127,8 +121,7 @@ async def run_fee_sync_loop(
Runs until stop_event is set. Runs until stop_event is set.
""" """
_LOG.info("fee_sync_loop_started", _LOG.info("fee_sync_loop_started", interval_s=_FEE_REFRESH_INTERVAL_SECONDS)
interval_s=_FEE_REFRESH_INTERVAL_SECONDS)
while not stop_event.is_set(): while not stop_event.is_set():
try: try:
+7 -14
View File
@@ -47,15 +47,13 @@ class TriangularExecutionSequencer:
rest_client: SupportsOrderPlacement, rest_client: SupportsOrderPlacement,
*, *,
available_pairs: Sequence[str], available_pairs: Sequence[str],
volume_for_leg: Callable[[OpportunityEvent, volume_for_leg: Callable[[OpportunityEvent, ExecutionLeg, int], float] | None = None,
ExecutionLeg, int], float] | None = None,
execution_writer: AsyncExecutionWriter | None = None, execution_writer: AsyncExecutionWriter | None = None,
alert_notifier: SupportsAlerts | None = None, alert_notifier: SupportsAlerts | None = None,
audit_repository: AuditRepository | None = None, audit_repository: AuditRepository | None = None,
) -> None: ) -> None:
self._rest_client = rest_client self._rest_client = rest_client
self._available_pairs = {self._normalize_pair( self._available_pairs = {self._normalize_pair(pair) for pair in available_pairs}
pair) for pair in available_pairs}
self._volume_for_leg = volume_for_leg or self._default_volume_for_leg self._volume_for_leg = volume_for_leg or self._default_volume_for_leg
self._execution_writer = execution_writer self._execution_writer = execution_writer
self._alert_notifier = alert_notifier self._alert_notifier = alert_notifier
@@ -102,15 +100,12 @@ class TriangularExecutionSequencer:
raise ValueError(f"No tradable pair for leg {from_cur}->{to_cur}") raise ValueError(f"No tradable pair for leg {from_cur}->{to_cur}")
def _build_legs(self, event: OpportunityEvent) -> tuple[ExecutionLeg, ...]: def _build_legs(self, event: OpportunityEvent) -> tuple[ExecutionLeg, ...]:
currencies = [part.strip().upper() currencies = [part.strip().upper() for part in event.cycle.split("->") if part.strip()]
for part in event.cycle.split("->") if part.strip()]
if len(currencies) < 4 or currencies[0] != currencies[-1]: if len(currencies) < 4 or currencies[0] != currencies[-1]:
raise ValueError( raise ValueError("cycle must be a closed triangular path like A->B->C->A")
"cycle must be a closed triangular path like A->B->C->A")
if len(currencies) != 4: if len(currencies) != 4:
raise ValueError( raise ValueError("cycle must contain exactly three unique currencies")
"cycle must contain exactly three unique currencies")
legs: list[ExecutionLeg] = [] legs: list[ExecutionLeg] = []
for idx in range(3): for idx in range(3):
@@ -125,8 +120,7 @@ class TriangularExecutionSequencer:
) )
volume = self._volume_for_leg(event, placeholder_leg, idx) volume = self._volume_for_leg(event, placeholder_leg, idx)
if volume <= 0.0: if volume <= 0.0:
raise ValueError( raise ValueError("volume_for_leg must return a positive volume")
"volume_for_leg must return a positive volume")
legs.append(self._resolve_leg(from_currency, to_currency, volume)) legs.append(self._resolve_leg(from_currency, to_currency, volume))
return tuple(legs) return tuple(legs)
@@ -215,8 +209,7 @@ class TriangularExecutionSequencer:
responses.append(response) responses.append(response)
if self._execution_writer is not None: if self._execution_writer is not None:
order_ref = self._order_ref_from_response( order_ref = self._order_ref_from_response(response, f"leg-{idx}")
response, f"leg-{idx}")
await self._execution_writer.enqueue( await self._execution_writer.enqueue(
OrderRecord( OrderRecord(
trade_ref=trade_ref, trade_ref=trade_ref,
+1
View File
@@ -0,0 +1 @@
"""Logging package — DB sink, maintenance tasks."""
+119
View File
@@ -0,0 +1,119 @@
"""DB sink — writes structlog events to app_logs table via background queue."""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime
from typing import Any
import structlog
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import LogRecord, LogRepository
_LOG = structlog.get_logger(__name__)
class DbSinkProcessor:
"""structlog processor that queues log events for DB writes.
Must be registered in the structlog processor chain. The consumer
task must be started on app init via ``start_consumer(store)``.
"""
def __init__(self) -> None:
self._queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=2000)
self._consumer_task: asyncio.Task[None] | None = None
def __call__(self, logger: Any, method_name: str, event_dict: dict[str, Any]) -> dict[str, Any]:
"""Processor — called for every structlog event. Non-blocking."""
try:
self._queue.put_nowait(dict(event_dict))
except asyncio.QueueFull:
pass # drop event if queue full, avoid backpressure
return event_dict
def start_consumer(self, store: PgStore) -> None:
"""Start background consumer task."""
if self._consumer_task is not None and not self._consumer_task.done():
return
self._consumer_task = asyncio.create_task(self._consume(store), name="log_db_sink")
async def stop_consumer(self) -> None:
"""Drain queue and cancel consumer."""
if self._consumer_task is None:
return
self._consumer_task.cancel()
try:
await self._consumer_task
except asyncio.CancelledError:
pass
self._consumer_task = None
# Flush remaining
await self._flush(store=None) # type: ignore[call-arg]
async def _consume(self, store: PgStore) -> None:
repo = LogRepository(store)
while True:
try:
event = await self._queue.get()
await self._write_one(repo, event)
except asyncio.CancelledError:
break
except Exception:
pass # swallow consumer errors, never crash
# Final flush
await self._flush(repo)
async def _write_one(self, repo: LogRepository, event: dict[str, Any]) -> None:
recorded_at = event.pop("timestamp", None)
if isinstance(recorded_at, str):
try:
recorded_at = datetime.fromisoformat(recorded_at)
except ValueError:
recorded_at = datetime.now(UTC)
elif not isinstance(recorded_at, datetime):
recorded_at = datetime.now(UTC)
level = str(event.pop("level", "info")).upper()
logger = str(event.pop("logger", "root"))
message = str(event.pop("event", event.pop("message", "")))
context = {k: v for k, v in event.items() if not k.startswith("_")} if event else None
record = LogRecord(
recorded_at=recorded_at,
level=level,
logger=logger,
message=message,
context=context if context else None,
)
try:
await repo.insert(record)
except Exception:
pass # never crash from DB write failure
async def _flush(self, repo: LogRepository | None) -> None:
drained = 0
while not self._queue.empty() and drained < 500:
try:
event = self._queue.get_nowait()
if repo is not None:
await self._write_one(repo, event)
drained += 1
except asyncio.QueueEmpty:
break
except Exception:
pass
# Module-level singleton
_db_sink = DbSinkProcessor()
def get_db_sink() -> DbSinkProcessor:
return _db_sink
def db_sink_processor(logger: Any, method_name: str, event_dict: dict[str, Any]) -> dict[str, Any]:
"""Standalone processor function wrapping the singleton."""
return _db_sink(logger, method_name, event_dict)
+60
View File
@@ -0,0 +1,60 @@
"""Log maintenance — aggregation and archiving tasks."""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime, timedelta
import structlog
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import LogAggregationRepository, LogArchiveRepository
_LOG = structlog.get_logger(__name__)
_AGGREGATE_INTERVAL = 3600 # 1 hour
_ARCHIVE_INTERVAL = 86400 # 1 day
_RETENTION_DAYS = 30
async def run_log_aggregation(store: PgStore) -> None:
"""Aggregate log counts for the last 2 hours across all periods."""
repo = LogAggregationRepository(store)
since = datetime.now(UTC) - timedelta(hours=2)
periods = ["1h", "1d", "1w", "1mo"]
for period in periods:
try:
await repo.aggregate_since(since, period)
except Exception:
_LOG.exception("log_aggregation_failed", period=period)
_LOG.info("log_aggregation_complete", since=since.isoformat())
async def run_log_archive(store: PgStore, retention_days: int = _RETENTION_DAYS) -> int:
"""Archive log entries older than retention_days."""
cutoff = datetime.now(UTC) - timedelta(days=retention_days)
repo = LogArchiveRepository(store)
count = await repo.archive_before(cutoff)
if count > 0:
_LOG.info("log_archive_complete", cutoff=cutoff.isoformat(), archived=count)
return count
async def run_log_aggregation_loop(store: PgStore) -> None:
"""Periodic aggregation loop."""
while True:
try:
await run_log_aggregation(store)
except Exception:
_LOG.exception("log_aggregation_loop_error")
await asyncio.sleep(_AGGREGATE_INTERVAL)
async def run_log_archive_loop(store: PgStore) -> None:
"""Periodic archive loop."""
while True:
try:
await run_log_archive(store)
except Exception:
_LOG.exception("log_archive_loop_error")
await asyncio.sleep(_ARCHIVE_INTERVAL)
+3
View File
@@ -6,6 +6,8 @@ from typing import Any
import structlog import structlog
from arbitrade.logging.db_sink import db_sink_processor
def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None: def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None:
level = getattr(logging, log_level.upper(), logging.INFO) level = getattr(logging, log_level.upper(), logging.INFO)
@@ -17,6 +19,7 @@ def configure_logging(log_level: str = "INFO", json_logs: bool = True) -> None:
structlog.stdlib.add_log_level, structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name, structlog.stdlib.add_logger_name,
timestamper, timestamper,
db_sink_processor,
] ]
if json_logs: if json_logs:
+5 -10
View File
@@ -38,8 +38,7 @@ class MarketDataFeed:
opportunity_writer: AsyncOpportunityWriter | None = None, opportunity_writer: AsyncOpportunityWriter | None = None,
paper_trading_mode: bool = True, paper_trading_mode: bool = True,
opportunity_executor: ( opportunity_executor: (
Callable[[OpportunityEvent], Callable[[OpportunityEvent], Awaitable[ExecutionOutcome | float | None]] | None
Awaitable[ExecutionOutcome | float | None]] | None
) = None, ) = None,
trade_capital: float = 1.0, trade_capital: float = 1.0,
max_trade_capital: float | None = None, max_trade_capital: float | None = None,
@@ -93,8 +92,7 @@ class MarketDataFeed:
return {} return {}
start = currencies[0] start = currencies[0]
exposure_assets = { exposure_assets = {currency for currency in currencies[1:] if currency != start}
currency for currency in currencies[1:] if currency != start}
return {asset: event.allocated_capital for asset in exposure_assets} return {asset: event.allocated_capital for asset in exposure_assets}
async def run(self) -> None: async def run(self) -> None:
@@ -315,8 +313,7 @@ class MarketDataFeed:
continue continue
if self._pre_trade_validator is not None and self._balance_provider is not None: if self._pre_trade_validator is not None and self._balance_provider is not None:
required_balances = { required_balances = {self._quote_balance_asset: event.allocated_capital}
self._quote_balance_asset: event.allocated_capital}
balances = { balances = {
asset.upper(): amount asset.upper(): amount
for asset, amount in self._balance_provider().items() for asset, amount in self._balance_provider().items()
@@ -384,8 +381,7 @@ class MarketDataFeed:
outcome = await self._opportunity_executor(event) outcome = await self._opportunity_executor(event)
except Exception as exc: except Exception as exc:
if self._trade_limits_guard is not None: if self._trade_limits_guard is not None:
self._trade_limits_guard.close_trade( self._trade_limits_guard.close_trade(exposure_by_asset)
exposure_by_asset)
dispatch_alert_nowait( dispatch_alert_nowait(
self._alert_notifier, self._alert_notifier,
@@ -451,8 +447,7 @@ class MarketDataFeed:
realized_pnl = outcome realized_pnl = outcome
if realized_pnl is not None and self._loss_limit_guard is not None: if realized_pnl is not None and self._loss_limit_guard is not None:
self._loss_limit_guard.register_realized_pnl( self._loss_limit_guard.register_realized_pnl(realized_pnl)
realized_pnl)
if self._loss_limit_guard.is_halted: if self._loss_limit_guard.is_halted:
_LOG.warning( _LOG.warning(
"loss_limit_halt_triggered", "loss_limit_halt_triggered",
+41 -22
View File
@@ -51,23 +51,34 @@ class MetricsCalculator:
WHERE volume > 0 AND filled_volume IS NOT NULL WHERE volume > 0 AND filled_volume IS NOT NULL
""") """)
r_pnl_usd = float( r_pnl_usd = (
tm["realized_pnl_usd"]) if tm and tm["realized_pnl_usd"] is not None else 0.0 float(tm["realized_pnl_usd"]) if tm and tm["realized_pnl_usd"] is not None else 0.0
tt = int(tm["total_trades"] )
) if tm and tm["total_trades"] is not None else 0 tt = int(tm["total_trades"]) if tm and tm["total_trades"] is not None else 0
wt = int(tm["winning_trades"] wt = int(tm["winning_trades"]) if tm and tm["winning_trades"] is not None else 0
) if tm and tm["winning_trades"] is not None else 0
wr = wt / tt if tt > 0 else None wr = wt / tt if tt > 0 else None
atd = float(tm["avg_trade_duration_seconds"] atd = (
) if tm and tm["avg_trade_duration_seconds"] is not None else None float(tm["avg_trade_duration_seconds"])
if tm and tm["avg_trade_duration_seconds"] is not None
else None
)
oc = int(om["opportunity_count"] oc = (
) if om is not None and om["opportunity_count"] is not None else 0 int(om["opportunity_count"])
fo = om["first_detected_at"] if om is not None and isinstance( if om is not None and om["opportunity_count"] is not None
om["first_detected_at"], datetime) else None else 0
lo = om["last_detected_at"] if om is not None and isinstance( )
om["last_detected_at"], datetime) else None fo = (
om["first_detected_at"]
if om is not None and isinstance(om["first_detected_at"], datetime)
else None
)
lo = (
om["last_detected_at"]
if om is not None and isinstance(om["last_detected_at"], datetime)
else None
)
opportunities_per_minute: float | None opportunities_per_minute: float | None
if oc >= 2 and fo is not None and lo is not None: if oc >= 2 and fo is not None and lo is not None:
@@ -80,15 +91,23 @@ class MetricsCalculator:
else: else:
opportunities_per_minute = None opportunities_per_minute = None
fill_rate = float( fill_rate = float(fm["fill_rate"]) if fm and fm["fill_rate"] is not None else None
fm["fill_rate"]) if fm and fm["fill_rate"] is not None else None
lp50 = float(tm["latency_p50_seconds"] lp50 = (
) if tm and tm["latency_p50_seconds"] is not None else None float(tm["latency_p50_seconds"])
lp95 = float(tm["latency_p95_seconds"] if tm and tm["latency_p50_seconds"] is not None
) if tm and tm["latency_p95_seconds"] is not None else None else None
lp99 = float(tm["latency_p99_seconds"] )
) if tm and tm["latency_p99_seconds"] is not None else None lp95 = (
float(tm["latency_p95_seconds"])
if tm and tm["latency_p95_seconds"] is not None
else None
)
lp99 = (
float(tm["latency_p99_seconds"])
if tm and tm["latency_p99_seconds"] is not None
else None
)
return PerformanceMetrics( return PerformanceMetrics(
realized_pnl_usd=r_pnl_usd, realized_pnl_usd=r_pnl_usd,
+3 -1
View File
@@ -106,7 +106,9 @@ async def _run_startup_reconciler(app: FastAPI) -> None:
await result await result
async def persist_runtime_snapshot(app: FastAPI, *, note: str | None = None) -> RuntimeStateRecord | None: async def persist_runtime_snapshot(
app: FastAPI, *, note: str | None = None
) -> RuntimeStateRecord | None:
repository = _runtime_repository(app) repository = _runtime_repository(app)
if repository is None: if repository is None:
return None return None
+1 -2
View File
@@ -36,8 +36,7 @@ class AsyncExecutionWriter:
async def start(self) -> None: async def start(self) -> None:
if self._task is None or self._task.done(): if self._task is None or self._task.done():
self._stop.clear() self._stop.clear()
self._task = asyncio.create_task( self._task = asyncio.create_task(self._run(), name="execution-writer")
self._run(), name="execution-writer")
async def stop(self) -> None: async def stop(self) -> None:
self._stop.set() self._stop.set()
+3 -6
View File
@@ -24,16 +24,14 @@ class MarketSnapshot:
class AsyncMarketSnapshotWriter: class AsyncMarketSnapshotWriter:
def __init__(self, repository: MarketSnapshotRepository, max_queue_size: int = 50_000) -> None: def __init__(self, repository: MarketSnapshotRepository, max_queue_size: int = 50_000) -> None:
self._repository = repository self._repository = repository
self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue( self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue(maxsize=max_queue_size)
maxsize=max_queue_size)
self._task: asyncio.Task[None] | None = None self._task: asyncio.Task[None] | None = None
self._stop = asyncio.Event() self._stop = asyncio.Event()
async def start(self) -> None: async def start(self) -> None:
if self._task is None or self._task.done(): if self._task is None or self._task.done():
self._stop.clear() self._stop.clear()
self._task = asyncio.create_task( self._task = asyncio.create_task(self._run(), name="market-snapshot-writer")
self._run(), name="market-snapshot-writer")
async def stop(self) -> None: async def stop(self) -> None:
self._stop.set() self._stop.set()
@@ -61,7 +59,6 @@ class AsyncMarketSnapshotWriter:
) )
) )
except Exception as exc: except Exception as exc:
_LOG.error("market_snapshot_write_failed", _LOG.error("market_snapshot_write_failed", error=str(exc), symbol=item.symbol)
error=str(exc), symbol=item.symbol)
finally: finally:
self._queue.task_done() self._queue.task_done()
+2 -4
View File
@@ -13,16 +13,14 @@ _LOG = structlog.get_logger(__name__)
class AsyncOpportunityWriter: class AsyncOpportunityWriter:
def __init__(self, repository: OpportunityRepository, max_queue_size: int = 50_000) -> None: def __init__(self, repository: OpportunityRepository, max_queue_size: int = 50_000) -> None:
self._repository = repository self._repository = repository
self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue( self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue(maxsize=max_queue_size)
maxsize=max_queue_size)
self._task: asyncio.Task[None] | None = None self._task: asyncio.Task[None] | None = None
self._stop = asyncio.Event() self._stop = asyncio.Event()
async def start(self) -> None: async def start(self) -> None:
if self._task is None or self._task.done(): if self._task is None or self._task.done():
self._stop.clear() self._stop.clear()
self._task = asyncio.create_task( self._task = asyncio.create_task(self._run(), name="opportunity-writer")
self._run(), name="opportunity-writer")
async def stop(self) -> None: async def stop(self) -> None:
self._stop.set() self._stop.set()
+1 -3
View File
@@ -128,7 +128,5 @@ class PgStore:
col_name = column_def.split()[0] col_name = column_def.split()[0]
if col_name not in existing: if col_name not in existing:
async with self.pool.acquire() as conn: async with self.pool.acquire() as conn:
await conn.execute( await conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_def}")
f"ALTER TABLE {table_name} ADD COLUMN {column_def}"
)
_LOG.info("pg_column_added", table=table_name, column=col_name) _LOG.info("pg_column_added", table=table_name, column=col_name)
+319 -33
View File
@@ -242,6 +242,12 @@ class AuditRepository:
async def insert(self, record: AuditRecord) -> None: async def insert(self, record: AuditRecord) -> None:
async with self._store.pool.acquire() as conn: async with self._store.pool.acquire() as conn:
payload = None
if record.payload is not None:
try:
payload = orjson.dumps(record.payload).decode("utf-8")
except Exception:
payload = None
await conn.execute( await conn.execute(
""" """
INSERT INTO audit_events ( INSERT INTO audit_events (
@@ -258,11 +264,7 @@ class AuditRepository:
record.actor, record.actor,
record.event_type, record.event_type,
record.decision, record.decision,
( payload,
None
if record.payload is None
else orjson.dumps(record.payload).decode("utf-8")
),
record.correlation_id, record.correlation_id,
) )
@@ -282,6 +284,9 @@ class AuditRepository:
for row in rows: for row in rows:
payload: dict[str, Any] | None = None payload: dict[str, Any] | None = None
raw_payload = row["payload"] raw_payload = row["payload"]
correlation_id = None
if row["correlation_id"] is not None:
correlation_id = str(row["correlation_id"])
if isinstance(raw_payload, str) and raw_payload: if isinstance(raw_payload, str) and raw_payload:
decoded = orjson.loads(raw_payload) decoded = orjson.loads(raw_payload)
if isinstance(decoded, dict): if isinstance(decoded, dict):
@@ -294,8 +299,7 @@ class AuditRepository:
event_type=str(row["event_type"]), event_type=str(row["event_type"]),
decision=str(row["decision"]), decision=str(row["decision"]),
payload=payload, payload=payload,
correlation_id=str( correlation_id=correlation_id,
row["correlation_id"]) if row["correlation_id"] is not None else None,
) )
) )
@@ -355,6 +359,9 @@ class RuntimeStateRepository:
balances: dict[str, Any] | None = None balances: dict[str, Any] | None = None
raw_balances = row["last_known_balances"] raw_balances = row["last_known_balances"]
kill_switch_reason = None
if row["kill_switch_reason"] is not None:
kill_switch_reason = str(row["kill_switch_reason"])
if isinstance(raw_balances, str) and raw_balances: if isinstance(raw_balances, str) and raw_balances:
decoded = orjson.loads(raw_balances) decoded = orjson.loads(raw_balances)
if isinstance(decoded, dict): if isinstance(decoded, dict):
@@ -364,8 +371,7 @@ class RuntimeStateRepository:
snapshot_at=row["snapshot_at"], snapshot_at=row["snapshot_at"],
is_running=bool(row["is_running"]), is_running=bool(row["is_running"]),
kill_switch_active=bool(row["kill_switch_active"]), kill_switch_active=bool(row["kill_switch_active"]),
kill_switch_reason=str( kill_switch_reason=kill_switch_reason,
row["kill_switch_reason"]) if row["kill_switch_reason"] is not None else None,
open_trade_count=int(row["open_trade_count"]), open_trade_count=int(row["open_trade_count"]),
last_known_balances=balances, last_known_balances=balances,
note=str(row["note"]) if row["note"] is not None else None, note=str(row["note"]) if row["note"] is not None else None,
@@ -386,10 +392,16 @@ class ConfigSectionRepository:
VALUES ($1, $2) VALUES ($1, $2)
RETURNING id, name, description, updated_at RETURNING id, name, description, updated_at
""", """,
section.name, section.description, section.name,
section.description,
) )
if row: if row:
return ConfigSection(id=row["id"], name=row["name"], description=row["description"], updated_at=row["updated_at"]) return ConfigSection(
id=row["id"],
name=row["name"],
description=row["description"],
updated_at=row["updated_at"],
)
raise ValueError("Failed to create section") raise ValueError("Failed to create section")
async def get_section(self, name: str) -> ConfigSection | None: async def get_section(self, name: str) -> ConfigSection | None:
@@ -404,7 +416,12 @@ class ConfigSectionRepository:
name, name,
) )
if row: if row:
return ConfigSection(id=row["id"], name=row["name"], description=row["description"], updated_at=row["updated_at"]) return ConfigSection(
id=row["id"],
name=row["name"],
description=row["description"],
updated_at=row["updated_at"],
)
return None return None
async def list_sections(self) -> list[ConfigSection]: async def list_sections(self) -> list[ConfigSection]:
@@ -417,7 +434,11 @@ class ConfigSectionRepository:
""") """)
return [ return [
ConfigSection( ConfigSection(
id=r["id"], name=r["name"], description=r["description"], updated_at=r["updated_at"]) id=r["id"],
name=r["name"],
description=r["description"],
updated_at=r["updated_at"],
)
for r in rows for r in rows
] ]
@@ -571,7 +592,7 @@ class ConfigSettingRepository:
ts = row["latest_updated_at"] ts = row["latest_updated_at"]
if isinstance(ts, str): if isinstance(ts, str):
return datetime.fromisoformat(ts.replace("Z", "+00:00")) return datetime.fromisoformat(ts.replace("Z", "+00:00"))
return ts # type: ignore[no-any-return] return ts # type: ignore[no-any-return]
return None return None
@@ -614,7 +635,8 @@ class ConfigPairingRepository:
FROM config_pairings FROM config_pairings
WHERE base_asset = $1 AND quote_asset = $2 WHERE base_asset = $1 AND quote_asset = $2
""", """,
base_asset, quote_asset, base_asset,
quote_asset,
) )
if row: if row:
return ConfigPairing( return ConfigPairing(
@@ -665,7 +687,8 @@ class ConfigPairingRepository:
DELETE FROM config_pairings DELETE FROM config_pairings
WHERE base_asset = $1 AND quote_asset = $2 WHERE base_asset = $1 AND quote_asset = $2
""", """,
base_asset, quote_asset, base_asset,
quote_asset,
) )
if result is None: if result is None:
return False return False
@@ -732,7 +755,9 @@ class ConfigBacktestingDefaultsRepository:
def __init__(self, store: PgStore) -> None: def __init__(self, store: PgStore) -> None:
self._store = store self._store = store
async def create_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: async def create_defaults(
self, defaults: ConfigBacktestingDefaults
) -> ConfigBacktestingDefaults:
"""Create new backtesting defaults.""" """Create new backtesting defaults."""
async with self._store.pool.acquire() as conn: async with self._store.pool.acquire() as conn:
balances_json = ( balances_json = (
@@ -753,9 +778,12 @@ class ConfigBacktestingDefaultsRepository:
defaults.execution_latency_ms, defaults.execution_latency_ms,
) )
if row: if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults( return ConfigBacktestingDefaults(
starting_balances=orjson.loads( starting_balances=starting_balances,
row["starting_balances"]) if row["starting_balances"] else None,
trade_capital=row["trade_capital"], trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"], min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"], slippage_bps=row["slippage_bps"],
@@ -773,9 +801,11 @@ class ConfigBacktestingDefaultsRepository:
LIMIT 1 LIMIT 1
""") """)
if row: if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults( return ConfigBacktestingDefaults(
starting_balances=orjson.loads( starting_balances=starting_balances,
row["starting_balances"]) if row["starting_balances"] else None,
trade_capital=row["trade_capital"], trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"], min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"], slippage_bps=row["slippage_bps"],
@@ -783,7 +813,9 @@ class ConfigBacktestingDefaultsRepository:
) )
return None return None
async def update_defaults(self, defaults: ConfigBacktestingDefaults) -> ConfigBacktestingDefaults: async def update_defaults(
self, defaults: ConfigBacktestingDefaults
) -> ConfigBacktestingDefaults:
"""Update the backtesting defaults.""" """Update the backtesting defaults."""
async with self._store.pool.acquire() as conn: async with self._store.pool.acquire() as conn:
starting_balances_json = ( starting_balances_json = (
@@ -807,9 +839,11 @@ class ConfigBacktestingDefaultsRepository:
defaults.execution_latency_ms, defaults.execution_latency_ms,
) )
if row: if row:
starting_balances = None
if row["starting_balances"] is not None:
starting_balances = orjson.loads(row["starting_balances"])
return ConfigBacktestingDefaults( return ConfigBacktestingDefaults(
starting_balances=orjson.loads( starting_balances=starting_balances,
row["starting_balances"]) if row["starting_balances"] else None,
trade_capital=row["trade_capital"], trade_capital=row["trade_capital"],
min_profit_threshold=row["min_profit_threshold"], min_profit_threshold=row["min_profit_threshold"],
slippage_bps=row["slippage_bps"], slippage_bps=row["slippage_bps"],
@@ -872,16 +906,20 @@ class KrakenAccountSnapshotRepository:
""") """)
if row is None: if row is None:
return None return None
trade_balance_raw = None
fee_schedule_raw = None
if row["trade_balance_raw"] is not None:
trade_balance_raw = orjson.loads(row["trade_balance_raw"])
if row["fee_schedule_raw"] is not None:
fee_schedule_raw = orjson.loads(row["fee_schedule_raw"])
return KrakenAccountSnapshot( return KrakenAccountSnapshot(
snapshot_at=row["snapshot_at"], snapshot_at=row["snapshot_at"],
fee_tier=row["fee_tier"], fee_tier=row["fee_tier"],
maker_fee=row["maker_fee"], maker_fee=row["maker_fee"],
taker_fee=row["taker_fee"], taker_fee=row["taker_fee"],
thirty_day_volume=row["thirty_day_volume"], thirty_day_volume=row["thirty_day_volume"],
trade_balance_raw=orjson.loads( trade_balance_raw=trade_balance_raw,
row["trade_balance_raw"]) if row["trade_balance_raw"] else None, fee_schedule_raw=fee_schedule_raw,
fee_schedule_raw=orjson.loads(
row["fee_schedule_raw"]) if row["fee_schedule_raw"] else None,
) )
@@ -916,7 +954,8 @@ class BacktestJobRepository:
VALUES ($1, $2) VALUES ($1, $2)
RETURNING id, status, events_path, config, created_at RETURNING id, status, events_path, config, created_at
""", """,
events_path, job_config_json, events_path,
job_config_json,
) )
if row is None: if row is None:
raise ValueError("Failed to create backtest job") raise ValueError("Failed to create backtest job")
@@ -933,24 +972,30 @@ class BacktestJobRepository:
if status == "running": if status == "running":
await conn.execute( await conn.execute(
"UPDATE backtest_jobs SET status = $1, started_at = CURRENT_TIMESTAMP WHERE id = $2", "UPDATE backtest_jobs SET status = $1, started_at = CURRENT_TIMESTAMP WHERE id = $2",
status, job_id, status,
job_id,
) )
elif status in ("completed", "failed"): elif status in ("completed", "failed"):
await conn.execute( await conn.execute(
"UPDATE backtest_jobs SET status = $1, finished_at = CURRENT_TIMESTAMP, error = $2 WHERE id = $3", "UPDATE backtest_jobs SET status = $1, finished_at = CURRENT_TIMESTAMP, error = $2 WHERE id = $3",
status, error, job_id, status,
error,
job_id,
) )
else: else:
await conn.execute( await conn.execute(
"UPDATE backtest_jobs SET status = $1, error = $2 WHERE id = $3", "UPDATE backtest_jobs SET status = $1, error = $2 WHERE id = $3",
status, error, job_id, status,
error,
job_id,
) )
async def store_report(self, job_id: str, report: dict[str, Any]) -> None: async def store_report(self, job_id: str, report: dict[str, Any]) -> None:
async with self._store.pool.acquire() as conn: async with self._store.pool.acquire() as conn:
await conn.execute( await conn.execute(
"UPDATE backtest_jobs SET report = $1 WHERE id = $2", "UPDATE backtest_jobs SET report = $1 WHERE id = $2",
orjson.dumps(report).decode("utf-8"), job_id, orjson.dumps(report).decode("utf-8"),
job_id,
) )
async def get_job(self, job_id: str) -> BacktestJobRecord | None: async def get_job(self, job_id: str) -> BacktestJobRecord | None:
@@ -1009,3 +1054,244 @@ class BacktestJobRepository:
elif isinstance(result, str): elif isinstance(result, str):
return result != "DELETE 0" return result != "DELETE 0"
return False return False
@dataclass(slots=True)
class LogRecord:
recorded_at: datetime
level: str
logger: str
message: str
context: dict[str, Any] | None = None
@dataclass(slots=True)
class LogAggregateRecord:
bucket_start: datetime
period: str
level: str
count: int
class LogRepository:
def __init__(self, store: PgStore) -> None:
self._store = store
async def insert(self, record: LogRecord) -> None:
async with self._store.pool.acquire() as conn:
context = None
if record.context:
try:
context = orjson.dumps(record.context).decode("utf-8")
except Exception:
context = None
await conn.execute(
"""
INSERT INTO app_logs (recorded_at, level, logger, message, context)
VALUES ($1, $2, $3, $4, $5)
""",
record.recorded_at,
record.level,
record.logger,
record.message,
context,
)
async def query(
self,
*,
level: str | None = None,
before: datetime | None = None,
after: datetime | None = None,
limit: int = 50,
offset: int = 0,
) -> list[LogRecord]:
async with self._store.pool.acquire() as conn:
conditions: list[str] = []
params: list[Any] = []
idx = 1
if level:
conditions.append(f"level = ${idx}")
params.append(level.upper())
idx += 1
if before:
conditions.append(f"recorded_at < ${idx}")
params.append(before)
idx += 1
if after:
conditions.append(f"recorded_at >= ${idx}")
params.append(after)
idx += 1
where = ""
if conditions:
where = "WHERE " + " AND ".join(conditions)
rows = await conn.fetch(
f"""
SELECT recorded_at, level, logger, message, context
FROM app_logs
{where}
ORDER BY recorded_at DESC
LIMIT ${idx} OFFSET ${idx + 1}
""",
*params,
limit,
offset,
)
return [
LogRecord(
recorded_at=r["recorded_at"],
level=r["level"],
logger=r["logger"],
message=r["message"],
context=r["context"],
)
for r in rows
]
async def count(self, level: str | None = None) -> int:
async with self._store.pool.acquire() as conn:
if level:
row = await conn.fetchrow(
"SELECT COUNT(*) as cnt FROM app_logs WHERE level = $1", level.upper()
)
else:
row = await conn.fetchrow("SELECT COUNT(*) as cnt FROM app_logs")
return row["cnt"] if row else 0
async def count_filtered(
self,
*,
level: str | None = None,
before: datetime | None = None,
after: datetime | None = None,
) -> int:
async with self._store.pool.acquire() as conn:
conditions: list[str] = []
params: list[Any] = []
idx = 1
if level:
conditions.append(f"level = ${idx}")
params.append(level.upper())
idx += 1
if before:
conditions.append(f"recorded_at < ${idx}")
params.append(before)
idx += 1
if after:
conditions.append(f"recorded_at >= ${idx}")
params.append(after)
idx += 1
where = ""
if conditions:
where = "WHERE " + " AND ".join(conditions)
row = await conn.fetchrow(f"SELECT COUNT(*) as cnt FROM app_logs {where}", *params)
return row["cnt"] if row else 0
class LogArchiveRepository:
def __init__(self, store: PgStore) -> None:
self._store = store
async def archive_before(self, cutoff: datetime) -> int:
"""Move rows older than cutoff from app_logs to app_log_archives."""
async with self._store.pool.acquire() as conn:
# Insert into archive
result = await conn.execute(
"""
INSERT INTO app_log_archives (id, recorded_at, level, logger, message, context)
SELECT id, recorded_at, level, logger, message, context
FROM app_logs
WHERE recorded_at < $1
""",
cutoff,
)
# Delete originals
await conn.execute("DELETE FROM app_logs WHERE recorded_at < $1", cutoff)
if isinstance(result, str):
parts = result.split()
if len(parts) == 2 and parts[0] == "INSERT":
return int(parts[1])
return 0
class LogAggregationRepository:
def __init__(self, store: PgStore) -> None:
self._store = store
async def aggregate_since(self, since: datetime, period: str) -> None:
"""Aggregate log counts per level for entries >= since, grouped by period."""
period_map = {
"1h": "date_trunc('hour', recorded_at)",
"1d": "date_trunc('day', recorded_at)",
"1w": "date_trunc('week', recorded_at)",
"1mo": "date_trunc('month', recorded_at)",
}
bucket_expr = period_map.get(period)
if bucket_expr is None:
raise ValueError(f"Unknown period: {period}")
async with self._store.pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT {bucket_expr} AS bucket_start, level, COUNT(*) AS cnt
FROM app_logs
WHERE recorded_at >= $1
GROUP BY bucket_start, level
""",
since,
)
for row in rows:
await conn.execute(
"""
INSERT INTO app_log_aggregates (bucket_start, period, level, count)
VALUES ($1, $2, $3, $4)
ON CONFLICT (bucket_start, period, level)
DO UPDATE SET count = EXCLUDED.count
""",
row["bucket_start"],
period,
str(row["level"]),
row["cnt"],
)
async def query_aggregates(
self,
period: str,
level: str | None = None,
limit: int = 50,
) -> list[LogAggregateRecord]:
async with self._store.pool.acquire() as conn:
if level:
rows = await conn.fetch(
"""
SELECT bucket_start, period, level, count
FROM app_log_aggregates
WHERE period = $1 AND level = $2
ORDER BY bucket_start DESC
LIMIT $3
""",
period,
level.upper(),
limit,
)
else:
rows = await conn.fetch(
"""
SELECT bucket_start, period, level, count
FROM app_log_aggregates
WHERE period = $1
ORDER BY bucket_start DESC
LIMIT $2
""",
period,
limit,
)
return [
LogAggregateRecord(
bucket_start=r["bucket_start"],
period=r["period"],
level=r["level"],
count=r["count"],
)
for r in rows
]
+36 -1
View File
@@ -188,4 +188,39 @@ ALTER TABLE market_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPT
ALTER TABLE kraken_account_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC'; ALTER TABLE kraken_account_snapshots ALTER COLUMN snapshot_at TYPE TIMESTAMPTZ USING snapshot_at AT TIME ZONE 'UTC';
ALTER TABLE backtest_jobs ALTER COLUMN created_at TYPE TIMESTAMPTZ USING created_at AT TIME ZONE 'UTC'; ALTER TABLE backtest_jobs ALTER COLUMN created_at TYPE TIMESTAMPTZ USING created_at AT TIME ZONE 'UTC';
ALTER TABLE backtest_jobs ALTER COLUMN started_at TYPE TIMESTAMPTZ USING started_at AT TIME ZONE 'UTC'; ALTER TABLE backtest_jobs ALTER COLUMN started_at TYPE TIMESTAMPTZ USING started_at AT TIME ZONE 'UTC';
ALTER TABLE backtest_jobs ALTER COLUMN finished_at TYPE TIMESTAMPTZ USING finished_at AT TIME ZONE 'UTC'; ALTER TABLE backtest_jobs ALTER COLUMN finished_at TYPE TIMESTAMPTZ USING finished_at AT TIME ZONE 'UTC';
-- ========================================
-- Logging tables
-- ========================================
CREATE TABLE IF NOT EXISTS app_logs (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
recorded_at TIMESTAMPTZ NOT NULL,
level VARCHAR NOT NULL,
logger VARCHAR NOT NULL,
message TEXT NOT NULL,
context JSONB
);
CREATE INDEX IF NOT EXISTS idx_app_logs_recorded_at ON app_logs (recorded_at DESC);
CREATE INDEX IF NOT EXISTS idx_app_logs_level ON app_logs (level);
CREATE TABLE IF NOT EXISTS app_log_archives (
id UUID PRIMARY KEY,
recorded_at TIMESTAMPTZ NOT NULL,
level VARCHAR NOT NULL,
logger VARCHAR NOT NULL,
message TEXT NOT NULL,
context JSONB,
archived_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_app_log_archives_recorded_at ON app_log_archives (recorded_at DESC);
CREATE TABLE IF NOT EXISTS app_log_aggregates (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
bucket_start TIMESTAMPTZ NOT NULL,
period VARCHAR NOT NULL,
level VARCHAR NOT NULL,
count INTEGER NOT NULL DEFAULT 0,
UNIQUE (bucket_start, period, level)
);
CREATE INDEX IF NOT EXISTS idx_app_log_aggregates_bucket ON app_log_aggregates (bucket_start DESC, period);
+4 -3
View File
@@ -5,9 +5,10 @@
</div> </div>
{% set nav_links = [ {"url": "/dashboard", "label": "Dashboard", "class": {% set nav_links = [ {"url": "/dashboard", "label": "Dashboard", "class":
"secondary"}, {"url": "/dashboard/config", "label": "Config", "class": "secondary"}, {"url": "/dashboard/config", "label": "Config", "class":
"secondary"}, {"url": "/dashboard/backtesting", "label": "Backtesting", "secondary"}, {"url": "/dashboard/config/pairings", "label": "Pairings",
"class": "secondary"}, {"url": "/dashboard/health", "label": "Health", "class": "secondary"}, {"url": "/dashboard/backtesting", "label":
"class": "secondary"}, ] %} "Backtesting", "class": "secondary"}, {"url": "/dashboard/health", "label":
"Health", "class": "secondary"}, ] %}
<div class="toolbar"> <div class="toolbar">
{% for link in nav_links %} {% for link in nav_links %}
<a <a
@@ -0,0 +1,192 @@
<div class="card">
<div class="label">Alerting</div>
<label class="field checkbox">
<input name="alerts_enabled" type="checkbox" {{ alerts_enabled }} />
<span>Alerts enabled</span>
</label>
<label class="field">
<span>Min severity</span>
<select name="alert_min_severity">
{% for sev in ["info", "warning", "error", "critical"] %} {% set sel =
"selected" if alert_min_severity == sev else "" %}
<option value="{{ sev }}" {{ sel }}>{{ sev }}</option>
{% endfor %}
</select>
</label>
<label class="field">
<span>Dedup seconds</span>
<input
name="alert_dedup_seconds"
type="number"
min="0"
step="1"
value="{{ alert_dedup_seconds }}"
/>
</label>
<label class="field checkbox">
<input
name="alert_on_trade_events"
type="checkbox"
{{
alert_on_trade_events
}}
/>
<span>Trade events</span>
</label>
<label class="field checkbox">
<input
name="alert_on_error_events"
type="checkbox"
{{
alert_on_error_events
}}
/>
<span>Error events</span>
</label>
<label class="field checkbox">
<input
name="alert_on_threshold_events"
type="checkbox"
{{
alert_on_threshold_events
}}
/>
<span>Threshold events</span>
</label>
<label class="field checkbox">
<input
name="alert_on_system_events"
type="checkbox"
{{
alert_on_system_events
}}
/>
<span>System events</span>
</label>
<hr
style="
border: none;
border-top: 1px solid rgba(255, 255, 255, 0.1);
margin: 12px 0;
"
/>
<label class="field checkbox">
<input
name="telegram_alerts_enabled"
type="checkbox"
{{
telegram_alerts_enabled
}}
/>
<span>Telegram</span>
</label>
<label class="field">
<span>Telegram bot token</span>
<input
name="telegram_bot_token"
type="password"
value="{{ telegram_bot_token }}"
placeholder="Bot token"
/>
</label>
<label class="field">
<span>Telegram chat ID</span>
<input
name="telegram_chat_id"
type="text"
value="{{ telegram_chat_id }}"
placeholder="Chat ID"
/>
</label>
<hr
style="
border: none;
border-top: 1px solid rgba(255, 255, 255, 0.1);
margin: 12px 0;
"
/>
<label class="field checkbox">
<input
name="discord_alerts_enabled"
type="checkbox"
{{
discord_alerts_enabled
}}
/>
<span>Discord</span>
</label>
<label class="field">
<span>Discord webhook URL</span>
<input
name="discord_webhook_url"
type="password"
value="{{ discord_webhook_url }}"
placeholder="Webhook URL"
/>
</label>
<hr
style="
border: none;
border-top: 1px solid rgba(255, 255, 255, 0.1);
margin: 12px 0;
"
/>
<label class="field checkbox">
<input
name="email_alerts_enabled"
type="checkbox"
{{
email_alerts_enabled
}}
/>
<span>Email</span>
</label>
<label class="field">
<span>SMTP host</span>
<input
name="email_smtp_host"
type="text"
value="{{ email_smtp_host }}"
placeholder="smtp.example.com"
/>
</label>
<label class="field">
<span>SMTP port</span>
<input
name="email_smtp_port"
type="number"
min="1"
max="65535"
value="{{ email_smtp_port }}"
/>
</label>
<label class="field">
<span>SMTP username</span>
<input
name="email_smtp_username"
type="text"
value="{{ email_smtp_username }}"
/>
</label>
<label class="field">
<span>SMTP password</span>
<input
name="email_smtp_password"
type="password"
value=""
placeholder="Leave blank to keep existing"
/>
</label>
<label class="field">
<span>From address</span>
<input name="email_alert_from" type="text" value="{{ email_alert_from }}" />
</label>
<label class="field">
<span>To address</span>
<input name="email_alert_to" type="text" value="{{ email_alert_to }}" />
</label>
<label class="field checkbox">
<input name="email_smtp_use_tls" type="checkbox" {{ email_smtp_use_tls }} />
<span>Use TLS</span>
</label>
</div>
@@ -0,0 +1,93 @@
<div class="card">
<div class="label">Kraken Exchange</div>
<label class="field">
<span>REST URL</span>
<input name="kraken_rest_url" type="text" value="{{ kraken_rest_url }}" />
</label>
<label class="field">
<span>WebSocket URL</span>
<input name="kraken_ws_url" type="text" value="{{ kraken_ws_url }}" />
</label>
<label class="field">
<span>Private rate limit (s)</span>
<input
name="kraken_private_rate_limit_seconds"
type="number"
min="0"
step="0.01"
value="{{ kraken_private_rate_limit_seconds }}"
/>
</label>
<label class="field">
<span>HTTP timeout (s)</span>
<input
name="kraken_http_timeout_seconds"
type="number"
min="1"
step="0.5"
value="{{ kraken_http_timeout_seconds }}"
/>
</label>
<label class="field">
<span>Retry attempts</span>
<input
name="kraken_retry_attempts"
type="number"
min="0"
step="1"
value="{{ kraken_retry_attempts }}"
/>
</label>
<label class="field">
<span>Retry base delay (s)</span>
<input
name="kraken_retry_base_delay_seconds"
type="number"
min="0"
step="0.01"
value="{{ kraken_retry_base_delay_seconds }}"
/>
</label>
<label class="field">
<span>API key</span>
<input name="kraken_api_key" type="text" value="{{ kraken_api_key }}" />
</label>
<label class="field">
<span>API secret</span>
<input
name="kraken_api_secret"
type="password"
value=""
placeholder="Leave blank to keep existing"
/>
</label>
<label class="field">
<span>API key permissions</span>
<input
name="kraken_api_key_permissions"
type="text"
value="{{ kraken_api_key_permissions }}"
disabled
/>
</label>
<label class="field">
<span>WS heartbeat timeout (s)</span>
<input
name="ws_heartbeat_timeout_seconds"
type="number"
min="1"
step="1"
value="{{ ws_heartbeat_timeout_seconds }}"
/>
</label>
<label class="field">
<span>WS max staleness (s)</span>
<input
name="ws_max_staleness_seconds"
type="number"
min="1"
step="1"
value="{{ ws_max_staleness_seconds }}"
/>
</label>
</div>
@@ -0,0 +1,57 @@
<div class="card">
<div class="label">Risk & Guardrails</div>
<label class="field">
<span>Daily loss limit USD</span>
<input
name="daily_loss_limit_usd"
type="number"
min="0"
step="0.01"
value="{{ daily_loss_limit_value }}"
/>
</label>
<label class="field">
<span>Cumulative loss limit USD</span>
<input
name="cumulative_loss_limit_usd"
type="number"
min="0"
step="0.01"
value="{{ cumulative_loss_limit_value }}"
/>
</label>
<label class="field">
<span>Max source latency (ms)</span>
<input
name="max_source_latency_ms"
type="number"
min="0"
step="1"
value="{{ max_source_latency_value }}"
/>
</label>
<label class="field">
<span>Max apply latency (ms)</span>
<input
name="max_apply_latency_ms"
type="number"
min="0"
step="1"
value="{{ max_apply_latency_value }}"
/>
</label>
<label class="field">
<span>Max consecutive failures</span>
<input
name="max_consecutive_failures"
type="number"
min="0"
step="1"
value="{{ max_consecutive_failures_value }}"
/>
</label>
<label class="field checkbox">
<input name="kill_switch_active" type="checkbox" {{ kill_switch_active }} />
<span>Kill switch active</span>
</label>
</div>
@@ -0,0 +1,140 @@
<div class="card">
<div class="label">Runtime</div>
<label class="field">
<span>App env</span>
<input type="text" value="{{ app_env }}" disabled />
</label>
<label class="field">
<span>App host</span>
<input name="app_host" type="text" value="{{ app_host }}" />
</label>
<label class="field">
<span>App port</span>
<input
name="app_port"
type="number"
min="1"
max="65535"
value="{{ app_port }}"
/>
</label>
<label class="field">
<span>Log level</span>
<select name="log_level">
{% for lvl in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] %} {% set
sel = "selected" if log_level == lvl else "" %}
<option value="{{ lvl }}" {{ sel }}>{{ lvl }}</option>
{% endfor %}
</select>
</label>
<label class="field checkbox">
<input name="log_json" type="checkbox" {{ log_json }} />
<span>JSON logs</span>
</label>
<label class="field checkbox">
<input name="paper_trading_mode" type="checkbox" {{ paper_trading_mode }} />
<span>Paper trading mode</span>
</label>
<label class="field">
<span>Trade capital USD</span>
<input
name="trade_capital_usd"
type="number"
min="0"
step="0.01"
value="{{ trade_capital_usd_value }}"
/>
</label>
<label class="field">
<span>Max trade capital USD</span>
<input
name="max_trade_capital_usd"
type="number"
min="0"
step="0.01"
value="{{ max_trade_capital_usd_value }}"
/>
</label>
<label class="field">
<span>Max concurrent trades</span>
<input
name="max_concurrent_trades"
type="number"
min="1"
step="1"
value="{{ max_concurrent_trades_value }}"
/>
</label>
<label class="field">
<span>Max exposure per asset USD</span>
<input
name="max_exposure_per_asset_usd"
type="number"
min="0"
step="0.01"
value="{{ max_exposure_per_asset_value }}"
/>
</label>
<label class="field">
<span>Quote balance asset</span>
<input
name="quote_balance_asset"
type="text"
value="{{ quote_balance_asset }}"
/>
</label>
<label class="field">
<span>Min order size USD</span>
<input
name="min_order_size_usd"
type="number"
min="0"
step="0.01"
value="{{ min_order_size_usd_value }}"
/>
</label>
<label class="field">
<span>Tradable pairs (comma-separated)</span>
<input
name="tradable_pairs"
type="text"
placeholder="BTC/USD, ETH/BTC"
value="{{ tradable_pairs_value }}"
/>
</label>
<label class="field">
<span>Strategy mode</span>
<select name="strategy_mode">
{% set sel = "selected" if strategy_mode == "incremental" else "" %}
<option value="incremental" {{ sel }}>incremental</option>
{% set sel = "selected" if strategy_mode == "paper" else "" %}
<option value="paper" {{ sel }}>paper</option>
{% set sel = "selected" if strategy_mode == "live" else "" %}
<option value="live" {{ sel }}>live</option>
{% if strategy_stat_arb_enabled %} {% set sel = "selected" if
strategy_mode == "stat_arb_experiment" else "" %}
<option value="stat_arb_experiment" {{ sel }}>stat_arb_experiment</option>
{% endif %}
</select>
</label>
<label class="field">
<span>Strategy profit threshold</span>
<input
name="strategy_profit_threshold"
type="number"
min="0"
step="0.0001"
value="{{ strategy_profit_threshold }}"
/>
</label>
<label class="field">
<span>Max depth levels</span>
<input
name="strategy_max_depth_levels"
type="number"
min="1"
step="1"
value="{{ strategy_max_depth_levels }}"
/>
</label>
</div>
+44 -3
View File
@@ -1,8 +1,10 @@
{% extends "_base.html" %} {% block title %}Arbitrade Health Check{% endblock %} {% extends "_base.html" %} {% block title %}Arbitrade Health Check{% endblock %}
{% block header %} {% with page_title="Arbitrade Health Check", {% block header %} {% with page_title="Arbitrade Health Check",
page_subtitle="Live system state." %} {% include "_header.html" %} {% endwith %} page_subtitle="Live system state and logs." %} {% include "_header.html" %} {%
{% endblock %} {% block main_class %}shell{% endblock %} {% block content %} endwith %} {% endblock %} {% block main_class %}shell{% endblock %} {% block
<section class="card"> content %}
<section class="card" style="margin-bottom: 24px">
<h1>Arbitrade Bootstrap Complete</h1> <h1>Arbitrade Bootstrap Complete</h1>
<p><span class="badge">Status: {{ status }}</span></p> <p><span class="badge">Status: {{ status }}</span></p>
<p>UTC: {{ time }}</p> <p>UTC: {{ time }}</p>
@@ -18,4 +20,43 @@ page_subtitle="Live system state." %} {% include "_header.html" %} {% endwith %}
</p> </p>
<pre id="health-json">{"status":"ok","service":"arbitrade"}</pre> <pre id="health-json">{"status":"ok","service":"arbitrade"}</pre>
</section> </section>
<section class="card">
<h2>System Logs</h2>
<div class="toolbar" style="display: flex; gap: 8px; margin-bottom: 12px">
<form
hx-post="/dashboard/api/logging/aggregate"
hx-target="#aggregate-result"
hx-swap="innerHTML"
style="display: inline"
>
<button type="submit" class="button secondary" style="font-size: 0.85rem">
Aggregate Now
</button>
</form>
<form
hx-post="/dashboard/api/logging/archive"
hx-target="#archive-result"
hx-swap="innerHTML"
style="display: inline"
>
<button type="submit" class="button secondary" style="font-size: 0.85rem">
Archive Old Logs
</button>
</form>
<span id="aggregate-result" style="font-size: 0.85rem; opacity: 0.6"></span>
<span id="archive-result" style="font-size: 0.85rem; opacity: 0.6"></span>
</div>
<div
id="log-table-container"
hx-get="/dashboard/fragment/logs"
hx-trigger="load, every 30s"
hx-swap="outerHTML"
>
<div style="text-align: center; padding: 20px; opacity: 0.5">
Loading logs...
</div>
</div>
</section>
{% endblock %} {% block scripts %}{% endblock %} {% endblock %} {% block scripts %}{% endblock %}
+52
View File
@@ -0,0 +1,52 @@
{% extends "_base.html" %} {% block title %}{{ title }}{% endblock %} {% block
main_class %}shell{% endblock %} {% block header %} {% with
page_title="Currency Pairings",
page_subtitle="Enable/disable pairings, search, and sync from Kraken." %}
{% include "_header.html" %} {% endwith %} {% endblock %} {% block content %}
<div class="toolbar" style="margin-bottom: 16px; display: flex; gap: 8px">
<input
id="pairing-search"
type="text"
placeholder="Search pairings…"
value="{{ search or '' }}"
style="flex: 1; max-width: 300px"
hx-get="/dashboard/fragment/pairings"
hx-target="#pairings-table-container"
hx-trigger="keyup changed delay:300ms"
hx-include="#pairing-enabled-filter"
name="search"
/>
<select
id="pairing-enabled-filter"
name="enabled"
hx-get="/dashboard/fragment/pairings"
hx-target="#pairings-table-container"
hx-trigger="change"
hx-include="#pairing-search"
>
<option value="all">All</option>
<option value="true" {{ 'selected' if enabled == 'true' else '' }}>
Enabled
</option>
<option value="false" {{ 'selected' if enabled == 'false' else '' }}>
Disabled
</option>
</select>
<button
class="button"
hx-post="/dashboard/api/pairings/sync"
hx-target="#pairings-table-container"
hx-swap="innerHTML"
>
Sync from Kraken
</button>
</div>
<div id="pairings-table-container">
{% include "partials/pairings_table.html" %}
</div>
{% endblock %}
@@ -45,6 +45,46 @@
<article class="card" style="margin-top: 16px"> <article class="card" style="margin-top: 16px">
<div class="label">Run Backtest</div> <div class="label">Run Backtest</div>
{% if no_enabled_pairings %}
<div
class="flash"
style="
background: rgba(255, 193, 7, 0.15);
border: 1px solid rgba(255, 193, 7, 0.3);
border-radius: 8px;
padding: 10px 16px;
margin-bottom: 16px;
color: #ffe58f;
font-size: 0.9rem;
"
>
No enabled pairings found. Enable at least one pairing on the
<a href="/dashboard/config/pairings" style="color: #ffe58f"
>Pairings page</a
>
before running a backtest.
</div>
{% endif %} {% if flash_message %}
<div
class="flash"
style="
background: rgba(82, 196, 26, 0.15);
border: 1px solid rgba(82, 196, 26, 0.3);
border-radius: 8px;
padding: 10px 16px;
margin-bottom: 16px;
color: #b7eb8f;
font-size: 0.9rem;
"
hx-trigger="load delay:5s"
hx-target="this"
hx-swap="delete"
>
{{ flash_message }}
</div>
{% endif %}
<form <form
class="form-grid" class="form-grid"
hx-post="{{ run_endpoint }}" hx-post="{{ run_endpoint }}"
@@ -52,37 +92,16 @@
hx-swap="outerHTML" hx-swap="outerHTML"
> >
<input type="hidden" name="source" value="db" /> <input type="hidden" name="source" value="db" />
<input type="hidden" name="symbols" value="" />
<div class="meta" style="margin-bottom: 12px">
Pairings managed in
<a href="/dashboard/config/pairings">Configuration → Pairings</a>. Only
enabled pairings are backtested.
</div>
<!-- Required fields -->
<label class="field"> <label class="field">
<span>Pairings</span> <span>Starting balances <span style="color: #ff4d4f">*</span></span>
<div
id="pairing-checkboxes"
hx-get="/dashboard/fragment/backtesting-pairings"
hx-trigger="load"
style="display: flex; flex-wrap: wrap; gap: 6px; margin-top: 4px"
>
<span style="opacity: 0.5">Loading pairings...</span>
</div>
</label>
<label class="field">
<span>Start time (ISO datetime, optional)</span>
<input
name="start_time"
type="text"
value="{{ start_time | default('') }}"
placeholder="2025-01-01T00:00:00"
/>
</label>
<label class="field">
<span>End time (ISO datetime, optional)</span>
<input
name="end_time"
type="text"
value="{{ end_time | default('') }}"
placeholder="2025-01-02T00:00:00"
/>
</label>
<label class="field">
<span>Starting balances</span>
<input <input
name="starting_balances" name="starting_balances"
type="text" type="text"
@@ -91,17 +110,25 @@
/> />
</label> </label>
<label class="field"> <label class="field">
<span>Trade capital</span> <span>Start time <span style="color: #ff4d4f">*</span></span>
<input <input
name="trade_capital" name="start_time"
type="number" type="text"
min="0" value="{{ start_time | default('') }}"
step="0.01" placeholder="2025-01-01T00:00:00"
value="{{ trade_capital }}"
/> />
</label> </label>
<label class="field"> <label class="field">
<span>Min profit threshold</span> <span>End time <span style="color: #ff4d4f">*</span></span>
<input
name="end_time"
type="text"
value="{{ end_time | default('') }}"
placeholder="2025-01-02T00:00:00"
/>
</label>
<label class="field">
<span>Min profit threshold <span style="color: #ff4d4f">*</span></span>
<input <input
name="min_profit_threshold" name="min_profit_threshold"
type="number" type="number"
@@ -110,51 +137,67 @@
value="{{ min_profit_threshold }}" value="{{ min_profit_threshold }}"
/> />
</label> </label>
<label class="field">
<span>Fee profile</span> <!-- Advanced -->
<select name="fee_profile"> <details style="grid-column: 1 / -1; margin-top: 8px">
{% set sel = "selected" if fee_profile == "api" else "" %} <summary style="cursor: pointer; opacity: 0.7; font-size: 0.85rem">
<option value="api" {{ sel }}>api (from Kraken)</option> Advanced options (fee profile, slippage, latency)
{% set sel = "selected" if fee_profile == "standard" else "" %} </summary>
<option value="standard" {{ sel }}>standard</option> <div
{% set sel = "selected" if fee_profile == "maker_heavy" else "" %} class="form-grid"
<option value="maker_heavy" {{ sel }}>maker_heavy</option> style="
{% set sel = "selected" if fee_profile == "taker_heavy" else "" %} margin-top: 12px;
<option value="taker_heavy" {{ sel }}>taker_heavy</option> grid-template-columns: repeat(auto-fit, minmax(240px, 1fr));
{% set sel = "selected" if fee_profile == "custom" else "" %} "
<option value="custom" {{ sel }}>custom</option> >
</select> <label class="field">
</label> <span>Fee profile</span>
<label class="field"> <select name="fee_profile">
<span>Custom fee rate (if fee profile = custom)</span> {% set sel = "selected" if fee_profile == "api" else "" %}
<input <option value="api" {{ sel }}>api (from Kraken)</option>
name="custom_fee_rate" {% set sel = "selected" if fee_profile == "standard" else "" %}
type="number" <option value="standard" {{ sel }}>standard</option>
min="0" {% set sel = "selected" if fee_profile == "maker_heavy" else "" %}
step="0.0001" <option value="maker_heavy" {{ sel }}>maker_heavy</option>
value="{{ custom_fee_rate }}" {% set sel = "selected" if fee_profile == "taker_heavy" else "" %}
/> <option value="taker_heavy" {{ sel }}>taker_heavy</option>
</label> {% set sel = "selected" if fee_profile == "custom" else "" %}
<label class="field"> <option value="custom" {{ sel }}>custom</option>
<span>Slippage (bps)</span> </select>
<input </label>
name="slippage_bps" <label class="field">
type="number" <span>Custom fee rate (if custom profile)</span>
min="0" <input
step="0.1" name="custom_fee_rate"
value="{{ slippage_bps }}" type="number"
/> min="0"
</label> step="0.0001"
<label class="field"> value="{{ custom_fee_rate }}"
<span>Execution latency (ms)</span> />
<input </label>
name="execution_latency_ms" <label class="field">
type="number" <span>Slippage (bps)</span>
min="0" <input
step="0.1" name="slippage_bps"
value="{{ execution_latency_ms }}" type="number"
/> min="0"
</label> step="0.1"
value="{{ slippage_bps }}"
/>
</label>
<label class="field">
<span>Execution latency (ms)</span>
<input
name="execution_latency_ms"
type="number"
min="0"
step="0.1"
value="{{ execution_latency_ms }}"
/>
</label>
</div>
</details>
<button type="submit" class="button">Submit Job</button> <button type="submit" class="button">Submit Job</button>
</form> </form>
</article> </article>
+23 -646
View File
@@ -1,4 +1,23 @@
<div id="config-panel" class="panel" style="margin-top: 16px"> <div id="config-panel" class="panel" style="margin-top: 16px">
{% if flash_message %}
<div
class="flash"
style="
background: rgba(82, 196, 26, 0.15);
border: 1px solid rgba(82, 196, 26, 0.3);
border-radius: 8px;
padding: 10px 16px;
margin-bottom: 16px;
color: #b7eb8f;
font-size: 0.9rem;
"
hx-trigger="load delay:3s"
hx-target="this"
hx-swap="delete"
>
{{ flash_message }}
</div>
{% endif %}
<form <form
class="form-grid" class="form-grid"
hx-post="{{ config_endpoint }}" hx-post="{{ config_endpoint }}"
@@ -9,652 +28,10 @@
gap: 20px; gap: 20px;
" "
> >
<!-- Runtime --> {% include "config/runtime.html" %} {% include "config/alerts.html" %} {%
<div class="card"> include "config/kraken.html" %} {% include "config/risk.html" %}
<div class="label">Runtime</div> <div style="grid-column: 1 / -1">
<label class="field"> <button type="submit" class="button">Save Settings</button>
<span>App env</span>
<input type="text" value="{{ app_env }}" disabled />
</label>
<label class="field">
<span>App host</span>
<input name="app_host" type="text" value="{{ app_host }}" />
</label>
<label class="field">
<span>App port</span>
<input
name="app_port"
type="number"
min="1"
max="65535"
value="{{ app_port }}"
/>
</label>
<label class="field">
<span>Log level</span>
<select name="log_level">
{% for lvl in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] %} {%
set sel = "selected" if log_level == lvl else "" %}
<option value="{{ lvl }}" {{ sel }}>{{ lvl }}</option>
{% endfor %}
</select>
</label>
<label class="field checkbox">
<input name="log_json" type="checkbox" {{ log_json }} />
<span>JSON logs</span>
</label>
<label class="field checkbox">
<input
name="paper_trading_mode"
type="checkbox"
{{
paper_trading_mode
}}
/>
<span>Paper trading mode</span>
</label>
<label class="field">
<span>Trade capital USD</span>
<input
name="trade_capital_usd"
type="number"
min="0"
step="0.01"
value="{{ trade_capital_usd_value }}"
/>
</label>
<label class="field">
<span>Max trade capital USD</span>
<input
name="max_trade_capital_usd"
type="number"
min="0"
step="0.01"
value="{{ max_trade_capital_usd_value }}"
/>
</label>
<label class="field">
<span>Max concurrent trades</span>
<input
name="max_concurrent_trades"
type="number"
min="1"
step="1"
value="{{ max_concurrent_trades_value }}"
/>
</label>
<label class="field">
<span>Max exposure per asset USD</span>
<input
name="max_exposure_per_asset_usd"
type="number"
min="0"
step="0.01"
value="{{ max_exposure_per_asset_value }}"
/>
</label>
<label class="field">
<span>Quote balance asset</span>
<input
name="quote_balance_asset"
type="text"
value="{{ quote_balance_asset }}"
/>
</label>
<label class="field">
<span>Min order size USD</span>
<input
name="min_order_size_usd"
type="number"
min="0"
step="0.01"
value="{{ min_order_size_usd_value }}"
/>
</label>
<label class="field">
<span>Tradable pairs (comma-separated)</span>
<input
name="tradable_pairs"
type="text"
placeholder="BTC/USD, ETH/BTC"
value="{{ tradable_pairs_value }}"
/>
</label>
<label class="field">
<span>Strategy mode</span>
<select name="strategy_mode">
{% set sel = "selected" if strategy_mode == "incremental" else "" %}
<option value="incremental" {{ sel }}>incremental</option>
{% set sel = "selected" if strategy_mode == "paper" else "" %}
<option value="paper" {{ sel }}>paper</option>
{% set sel = "selected" if strategy_mode == "live" else "" %}
<option value="live" {{ sel }}>live</option>
{% if strategy_stat_arb_enabled %} {% set sel = "selected" if
strategy_mode == "stat_arb_experiment" else "" %}
<option value="stat_arb_experiment" {{ sel }}>
stat_arb_experiment
</option>
{% endif %}
</select>
</label>
<label class="field">
<span>Strategy profit threshold</span>
<input
name="strategy_profit_threshold"
type="number"
min="0"
step="0.0001"
value="{{ strategy_profit_threshold }}"
/>
</label>
<label class="field">
<span>Max depth levels</span>
<input
name="strategy_max_depth_levels"
type="number"
min="1"
step="1"
value="{{ strategy_max_depth_levels }}"
/>
</label>
</div>
<!-- Alerts -->
<div class="card">
<div class="label">Alerting</div>
<label class="field checkbox">
<input name="alerts_enabled" type="checkbox" {{ alerts_enabled }} />
<span>Alerts enabled</span>
</label>
<label class="field">
<span>Min severity</span>
<select name="alert_min_severity">
{% for sev in ["info", "warning", "error", "critical"] %} {% set sel =
"selected" if alert_min_severity == sev else "" %}
<option value="{{ sev }}" {{ sel }}>{{ sev }}</option>
{% endfor %}
</select>
</label>
<label class="field">
<span>Dedup seconds</span>
<input
name="alert_dedup_seconds"
type="number"
min="0"
step="1"
value="{{ alert_dedup_seconds }}"
/>
</label>
<label class="field checkbox">
<input
name="alert_on_trade_events"
type="checkbox"
{{
alert_on_trade_events
}}
/>
<span>Trade events</span>
</label>
<label class="field checkbox">
<input
name="alert_on_error_events"
type="checkbox"
{{
alert_on_error_events
}}
/>
<span>Error events</span>
</label>
<label class="field checkbox">
<input
name="alert_on_threshold_events"
type="checkbox"
{{
alert_on_threshold_events
}}
/>
<span>Threshold events</span>
</label>
<label class="field checkbox">
<input
name="alert_on_system_events"
type="checkbox"
{{
alert_on_system_events
}}
/>
<span>System events</span>
</label>
<hr
style="
border: none;
border-top: 1px solid rgba(255, 255, 255, 0.1);
margin: 12px 0;
"
/>
<label class="field checkbox">
<input
name="telegram_alerts_enabled"
type="checkbox"
{{
telegram_alerts_enabled
}}
/>
<span>Telegram</span>
</label>
<label class="field">
<span>Telegram bot token</span>
<input
name="telegram_bot_token"
type="password"
value="{{ telegram_bot_token }}"
placeholder="Bot token"
/>
</label>
<label class="field">
<span>Telegram chat ID</span>
<input
name="telegram_chat_id"
type="text"
value="{{ telegram_chat_id }}"
placeholder="Chat ID"
/>
</label>
<hr
style="
border: none;
border-top: 1px solid rgba(255, 255, 255, 0.1);
margin: 12px 0;
"
/>
<label class="field checkbox">
<input
name="discord_alerts_enabled"
type="checkbox"
{{
discord_alerts_enabled
}}
/>
<span>Discord</span>
</label>
<label class="field">
<span>Discord webhook URL</span>
<input
name="discord_webhook_url"
type="password"
value="{{ discord_webhook_url }}"
placeholder="Webhook URL"
/>
</label>
<hr
style="
border: none;
border-top: 1px solid rgba(255, 255, 255, 0.1);
margin: 12px 0;
"
/>
<label class="field checkbox">
<input
name="email_alerts_enabled"
type="checkbox"
{{
email_alerts_enabled
}}
/>
<span>Email</span>
</label>
<label class="field">
<span>SMTP host</span>
<input
name="email_smtp_host"
type="text"
value="{{ email_smtp_host }}"
placeholder="smtp.example.com"
/>
</label>
<label class="field">
<span>SMTP port</span>
<input
name="email_smtp_port"
type="number"
min="1"
max="65535"
value="{{ email_smtp_port }}"
/>
</label>
<label class="field">
<span>SMTP username</span>
<input
name="email_smtp_username"
type="text"
value="{{ email_smtp_username }}"
/>
</label>
<label class="field">
<span>SMTP password</span>
<input
name="email_smtp_password"
type="password"
value="{{ email_smtp_password }}"
placeholder="Leave blank to keep existing"
/>
</label>
<label class="field">
<span>From address</span>
<input
name="email_alert_from"
type="text"
value="{{ email_alert_from }}"
/>
</label>
<label class="field">
<span>To address</span>
<input name="email_alert_to" type="text" value="{{ email_alert_to }}" />
</label>
<label class="field checkbox">
<input
name="email_smtp_use_tls"
type="checkbox"
{{
email_smtp_use_tls
}}
/>
<span>Use TLS</span>
</label>
</div>
<!-- Kraken -->
<div class="card">
<div class="label">Kraken Exchange</div>
<label class="field">
<span>REST URL</span>
<input
name="kraken_rest_url"
type="text"
value="{{ kraken_rest_url }}"
/>
</label>
<label class="field">
<span>WebSocket URL</span>
<input name="kraken_ws_url" type="text" value="{{ kraken_ws_url }}" />
</label>
<label class="field">
<span>Private rate limit (s)</span>
<input
name="kraken_private_rate_limit_seconds"
type="number"
min="0"
step="0.01"
value="{{ kraken_private_rate_limit_seconds }}"
/>
</label>
<label class="field">
<span>HTTP timeout (s)</span>
<input
name="kraken_http_timeout_seconds"
type="number"
min="1"
step="0.5"
value="{{ kraken_http_timeout_seconds }}"
/>
</label>
<label class="field">
<span>Retry attempts</span>
<input
name="kraken_retry_attempts"
type="number"
min="0"
step="1"
value="{{ kraken_retry_attempts }}"
/>
</label>
<label class="field">
<span>Retry base delay (s)</span>
<input
name="kraken_retry_base_delay_seconds"
type="number"
min="0"
step="0.01"
value="{{ kraken_retry_base_delay_seconds }}"
/>
</label>
<label class="field">
<span>API key</span>
<input
name="kraken_api_key"
type="text"
value="{{ kraken_api_key }}"
placeholder="API key"
/>
</label>
<label class="field">
<span>API secret</span>
<input
name="kraken_api_secret"
type="password"
value="{{ kraken_api_secret }}"
placeholder="API secret"
/>
</label>
<label class="field">
<span>Key permissions</span>
<input
name="kraken_api_key_permissions"
type="text"
value="{{ kraken_api_key_permissions }}"
placeholder="query,trade"
/>
</label>
<label class="field">
<span>Heartbeat timeout (s)</span>
<input
name="ws_heartbeat_timeout_seconds"
type="number"
min="1"
step="1"
value="{{ ws_heartbeat_timeout_seconds }}"
/>
</label>
<label class="field">
<span>Max staleness (s)</span>
<input
name="ws_max_staleness_seconds"
type="number"
min="0"
step="0.5"
value="{{ ws_max_staleness_seconds }}"
/>
</label>
</div>
<!-- Pairings -->
<div class="card" id="pairings-card">
<div class="label">Currency Pairings</div>
<div style="display: flex; gap: 8px; margin-bottom: 12px">
<input
id="pairing-search"
type="text"
placeholder="Search pairings..."
hx-get="/dashboard/fragment/pairings"
hx-target="#pairings-table-container"
hx-trigger="keyup changed delay:300ms"
hx-swap="innerHTML"
name="search"
style="
flex: 1;
padding: 6px 10px;
border-radius: 6px;
border: 1px solid rgba(255, 255, 255, 0.15);
background: rgba(0, 0, 0, 0.3);
color: inherit;
"
/>
<button
type="button"
class="button"
id="pairing-sync-btn"
hx-post="/dashboard/api/pairings/sync"
hx-target="#pairings-table-container"
hx-swap="innerHTML"
hx-trigger="click"
style="white-space: nowrap"
>
Sync from Kraken
</button>
</div>
<div
id="pairings-table-container"
hx-get="/dashboard/fragment/pairings"
hx-trigger="load"
>
<div style="text-align: center; padding: 20px; opacity: 0.5">
Loading pairings...
</div>
</div>
</div>
<!-- Risk -->
<div class="card">
<div class="label">Risk Limits</div>
<label class="field">
<span>Daily loss limit USD</span>
<input
name="daily_loss_limit_usd"
type="number"
min="0"
step="0.01"
value="{{ daily_loss_limit_value }}"
placeholder="None"
/>
</label>
<label class="field">
<span>Cumulative loss limit USD</span>
<input
name="cumulative_loss_limit_usd"
type="number"
min="0"
step="0.01"
value="{{ cumulative_loss_limit_value }}"
placeholder="None"
/>
</label>
<label class="field">
<span>Max source latency (ms)</span>
<input
name="max_source_latency_ms"
type="number"
min="0"
step="1"
value="{{ max_source_latency_value }}"
placeholder="None"
/>
</label>
<label class="field">
<span>Max apply latency (ms)</span>
<input
name="max_apply_latency_ms"
type="number"
min="0"
step="1"
value="{{ max_apply_latency_value }}"
placeholder="None"
/>
</label>
<label class="field">
<span>Max consecutive failures</span>
<input
name="max_consecutive_failures"
type="number"
min="1"
step="1"
value="{{ max_consecutive_failures_value }}"
placeholder="None"
/>
</label>
<label class="field checkbox">
<input
name="kill_switch_active"
type="checkbox"
{{
kill_switch_active
}}
/>
<span>Kill switch active</span>
</label>
</div>
<!-- Strategy Stat-Arb -->
<div class="card">
<div class="label">Stat-Arb Strategy</div>
<label class="field checkbox">
<input
name="strategy_enable_stat_arb_experiment"
type="checkbox"
{%
if
strategy_stat_arb_enabled
%}checked{%
endif
%}
/>
<span>Enable stat-arb experiment</span>
</label>
{% if strategy_stat_arb_enabled %}
<label class="field">
<span>Lookback window</span>
<input
name="strategy_stat_arb_lookback_window"
type="number"
min="2"
step="1"
value="{{ strategy_stat_arb_lookback_window }}"
/>
</label>
<label class="field">
<span>Entry z-score</span>
<input
name="strategy_stat_arb_entry_zscore"
type="number"
min="0"
step="0.1"
value="{{ strategy_stat_arb_entry_zscore }}"
/>
</label>
<label class="field">
<span>Exit z-score</span>
<input
name="strategy_stat_arb_exit_zscore"
type="number"
min="0"
step="0.1"
value="{{ strategy_stat_arb_exit_zscore }}"
/>
</label>
<label class="field">
<span>Max holding seconds</span>
<input
name="strategy_stat_arb_max_holding_seconds"
type="number"
min="1"
step="1"
value="{{ strategy_stat_arb_max_holding_seconds }}"
/>
</label>
{% endif %}
</div>
<!-- Submit -->
<div
class="card"
style="display: flex; align-items: center; justify-content: center"
>
<button
type="submit"
class="button"
style="padding: 14px 32px; font-size: 1.1rem"
>
Save configuration
</button>
</div> </div>
</form> </form>
</div> </div>
@@ -0,0 +1,72 @@
<div id="log-table-container">
<div class="toolbar" style="display: flex; gap: 8px; margin-bottom: 12px">
<select
name="level"
hx-get="/dashboard/fragment/logs"
hx-target="#log-table-container"
hx-trigger="change"
hx-swap="outerHTML"
>
<option value="" {{ 'selected' if current_level == 'all' else '' }}>All</option>
<option value="INFO" {{ 'selected' if current_level == 'INFO' else '' }}>INFO</option>
<option value="WARNING" {{ 'selected' if current_level == 'WARNING' else '' }}>WARNING</option>
<option value="ERROR" {{ 'selected' if current_level == 'ERROR' else '' }}>ERROR</option>
<option value="CRITICAL" {{ 'selected' if current_level == 'CRITICAL' else '' }}>CRITICAL</option>
</select>
<span style="opacity: 0.6; font-size: 0.85rem">{{ total }} entries</span>
</div>
<table style="width: 100%; border-collapse: collapse; font-size: 0.82rem">
<thead>
<tr style="border-bottom: 1px solid rgba(255,255,255,0.1); text-align: left">
<th style="padding: 6px 8px">Time</th>
<th style="padding: 6px 8px">Level</th>
<th style="padding: 6px 8px">Logger</th>
<th style="padding: 6px 8px">Message</th>
</tr>
</thead>
<tbody>
{% for r in records %}
<tr style="border-bottom: 1px solid rgba(255,255,255,0.04)">
<td style="padding: 4px 8px; white-space: nowrap">
{{ r.recorded_at.strftime('%H:%M:%S') if r.recorded_at else '—' }}
</td>
<td style="padding: 4px 8px">
<span class="badge level-{{ r.level.lower() }}">{{ r.level }}</span>
</td>
<td style="padding: 4px 8px; opacity: 0.7; max-width: 200px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap">
{{ r.logger }}
</td>
<td style="padding: 4px 8px; max-width: 400px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap">
{{ r.message }}
</td>
</tr>
{% endfor %}
{% if not records %}
<tr>
<td colspan="4" style="padding: 20px; text-align: center; opacity: 0.5">No log entries found.</td>
</tr>
{% endif %}
</tbody>
</table>
<div class="toolbar" style="display: flex; gap: 8px; justify-content: center; margin-top: 12px">
{% if page > 1 %}
<button
class="button secondary"
hx-get="/dashboard/fragment/logs?level={{ current_level }}&page={{ page - 1 }}"
hx-target="#log-table-container"
hx-swap="outerHTML"
>Previous</button>
{% endif %}
<span style="opacity: 0.6; font-size: 0.85rem; padding: 0 8px">Page {{ page }} / {{ total_pages }}</span>
{% if page < total_pages %}
<button
class="button secondary"
hx-get="/dashboard/fragment/logs?level={{ current_level }}&page={{ page + 1 }}"
hx-target="#log-table-container"
hx-swap="outerHTML"
>Next</button>
{% endif %}
</div>
</div>
+1 -1
View File
@@ -1 +1 @@
"""Integration tests for PostgreSQL schema and connectivity.""" """Integration tests for PostgreSQL schema and connectivity."""
+2 -6
View File
@@ -11,13 +11,9 @@ import pathlib
import pytest import pytest
def pytest_ignore_collect( def pytest_ignore_collect(collection_path: pathlib.Path, config: pytest.Config) -> bool:
collection_path: pathlib.Path, config: pytest.Config
) -> bool:
"""Skip integration tests unless --integration is passed.""" """Skip integration tests unless --integration is passed."""
if "integration" in str(collection_path) and not config.getoption( if "integration" in str(collection_path) and not config.getoption("--integration", False):
"--integration", False
):
return True return True
return False return False
+60 -10
View File
@@ -42,9 +42,24 @@ async def test_metrics_calculator_summarizes_execution_data() -> None:
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9), ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9),
($10, $11, $12, $13, $14, $15, $16, $17, $18) ($10, $11, $12, $13, $14, $15, $16, $17, $18)
""", """,
"trade-1", started, finished, "filled", 12.5, 10.0, 100.0, "USD->BTC->ETH->USD", 3, "trade-1",
"trade-2", started_two, finished_two, "filled", - started,
4.5, -2.0, 200.0, "USD->ETH->BTC->USD", 3, finished,
"filled",
12.5,
10.0,
100.0,
"USD->BTC->ETH->USD",
3,
"trade-2",
started_two,
finished_two,
"filled",
-4.5,
-2.0,
200.0,
"USD->ETH->BTC->USD",
3,
) )
await conn.execute( await conn.execute(
""" """
@@ -53,11 +68,24 @@ async def test_metrics_calculator_summarizes_execution_data() -> None:
($7, $8, $9, $10, $11, $12), ($7, $8, $9, $10, $11, $12),
($13, $14, $15, $16, $17, $18) ($13, $14, $15, $16, $17, $18)
""", """,
started, "USD->BTC->ETH->USD", 4.0, 3.0, 0.03, True, started,
started_two, "USD->ETH->BTC->USD", 2.0, 1.0, 0.01, False, "USD->BTC->ETH->USD",
started_two + 4.0,
timedelta( 3.0,
seconds=30), "USD->BTC->ETH->USD", 5.0, 4.0, 0.04, True, 0.03,
True,
started_two,
"USD->ETH->BTC->USD",
2.0,
1.0,
0.01,
False,
started_two + timedelta(seconds=30),
"USD->BTC->ETH->USD",
5.0,
4.0,
0.04,
True,
) )
await conn.execute( await conn.execute(
""" """
@@ -67,8 +95,30 @@ async def test_metrics_calculator_summarizes_execution_data() -> None:
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12), ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12),
($13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) ($13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
""", """,
"trade-1", "order-1", 0, "BTC/USD", "buy", 2.0, 101, "closed", 2.0, 100.0, "{}", started, "trade-1",
"trade-2", "order-2", 0, "ETH/USD", "sell", 4.0, 202, "closed", 3.0, 200.0, "{}", started_two, "order-1",
0,
"BTC/USD",
"buy",
2.0,
101,
"closed",
2.0,
100.0,
"{}",
started,
"trade-2",
"order-2",
0,
"ETH/USD",
"sell",
4.0,
202,
"closed",
3.0,
200.0,
"{}",
started_two,
) )
metrics = await MetricsCalculator(store).compute() metrics = await MetricsCalculator(store).compute()
+99 -35
View File
@@ -25,50 +25,116 @@ EXPECTED_TABLES: dict[str, list[str]] = {
"schema_migrations": ["version", "applied_at"], "schema_migrations": ["version", "applied_at"],
"config_sections": ["id", "name", "description", "updated_at"], "config_sections": ["id", "name", "description", "updated_at"],
"config_settings": [ "config_settings": [
"key", "section", "value_json", "value_type", "is_secret", "key",
"is_runtime_reloadable", "updated_at", "updated_by", "section",
"value_json",
"value_type",
"is_secret",
"is_runtime_reloadable",
"updated_at",
"updated_by",
], ],
"config_pairings": [ "config_pairings": [
"id", "base_asset", "quote_asset", "enabled", "source", "id",
"created_at", "updated_at", "base_asset",
"quote_asset",
"enabled",
"source",
"created_at",
"updated_at",
], ],
"config_backtesting_defaults": [ "config_backtesting_defaults": [
"id", "starting_balances", "trade_capital", "min_profit_threshold", "id",
"slippage_bps", "execution_latency_ms", "fee_source", "starting_balances",
"trade_capital",
"min_profit_threshold",
"slippage_bps",
"execution_latency_ms",
"fee_source",
], ],
"opportunities": [ "opportunities": [
"id", "detected_at", "cycle", "gross_pct", "net_pct", "id",
"est_profit", "executed", "detected_at",
"cycle",
"gross_pct",
"net_pct",
"est_profit",
"executed",
], ],
"trades": [ "trades": [
"id", "trade_ref", "started_at", "finished_at", "status", "id",
"realized_pnl", "estimated_pnl", "capital_used", "cycle", "leg_count", "trade_ref",
"started_at",
"finished_at",
"status",
"realized_pnl",
"estimated_pnl",
"capital_used",
"cycle",
"leg_count",
], ],
"orders": [ "orders": [
"id", "trade_ref", "order_ref", "leg_index", "pair", "side", "id",
"volume", "user_ref", "status", "filled_volume", "avg_price", "trade_ref",
"raw_response", "recorded_at", "order_ref",
"leg_index",
"pair",
"side",
"volume",
"user_ref",
"status",
"filled_volume",
"avg_price",
"raw_response",
"recorded_at",
], ],
"pnl_events": [ "pnl_events": [
"id", "trade_ref", "recorded_at", "kind", "pnl_usd", "source", "id",
"trade_ref",
"recorded_at",
"kind",
"pnl_usd",
"source",
], ],
"portfolio_snapshots": ["snapshot_at", "balances", "total_value_usd"], "portfolio_snapshots": ["snapshot_at", "balances", "total_value_usd"],
"market_snapshots": ["snapshot_at", "symbol", "source", "payload", "latency_ms"], "market_snapshots": ["snapshot_at", "symbol", "source", "payload", "latency_ms"],
"audit_events": [ "audit_events": [
"id", "occurred_at", "actor", "event_type", "decision", "id",
"payload", "correlation_id", "occurred_at",
"actor",
"event_type",
"decision",
"payload",
"correlation_id",
], ],
"runtime_state_snapshots": [ "runtime_state_snapshots": [
"snapshot_at", "is_running", "kill_switch_active", "kill_switch_reason", "snapshot_at",
"open_trade_count", "last_known_balances", "note", "is_running",
"kill_switch_active",
"kill_switch_reason",
"open_trade_count",
"last_known_balances",
"note",
], ],
"kraken_account_snapshots": [ "kraken_account_snapshots": [
"snapshot_at", "fee_tier", "maker_fee", "taker_fee", "snapshot_at",
"thirty_day_volume", "trade_balance_raw", "fee_schedule_raw", "fee_tier",
"maker_fee",
"taker_fee",
"thirty_day_volume",
"trade_balance_raw",
"fee_schedule_raw",
], ],
"backtest_jobs": [ "backtest_jobs": [
"id", "status", "events_path", "config", "report", "error", "id",
"created_at", "started_at", "finished_at", "status",
"events_path",
"config",
"report",
"error",
"created_at",
"started_at",
"finished_at",
], ],
} }
@@ -96,6 +162,7 @@ TABLES_WITH_UNIQUE_CONSTRAINTS: dict[str, list[str]] = {
# ── fixtures ──────────────────────────────────────────────────────────────── # ── fixtures ────────────────────────────────────────────────────────────────
@asynccontextmanager @asynccontextmanager
async def _pg_lifecycle() -> AsyncIterator[PgStore]: async def _pg_lifecycle() -> AsyncIterator[PgStore]:
"""Connect, yield store, then disconnect.""" """Connect, yield store, then disconnect."""
@@ -116,6 +183,7 @@ async def pg_fixture() -> AsyncIterator[PgStore]:
# ── helpers ───────────────────────────────────────────────────────────────── # ── helpers ─────────────────────────────────────────────────────────────────
async def _get_actual_tables(store: PgStore) -> dict[str, list[str]]: async def _get_actual_tables(store: PgStore) -> dict[str, list[str]]:
"""Return {table_name: [column_name, ...]} for the public schema.""" """Return {table_name: [column_name, ...]} for the public schema."""
actual: dict[str, list[str]] = {} actual: dict[str, list[str]] = {}
@@ -139,6 +207,7 @@ async def _table_row_count(store: PgStore, table: str) -> int:
# ── tests ─────────────────────────────────────────────────────────────────── # ── tests ───────────────────────────────────────────────────────────────────
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pg_connect(pg: PgStore) -> None: async def test_pg_connect(pg: PgStore) -> None:
"""Can connect to PostgreSQL and ping the server.""" """Can connect to PostgreSQL and ping the server."""
@@ -165,8 +234,7 @@ async def test_schema_migration_applies(pg: PgStore) -> None:
for table in EXPECTED_TABLES: for table in EXPECTED_TABLES:
assert table in actual, ( assert table in actual, (
f"Table '{table}' missing after migration. " f"Table '{table}' missing after migration. " f"Found tables: {sorted(actual)}"
f"Found tables: {sorted(actual)}"
) )
@@ -190,8 +258,7 @@ async def test_table_columns(pg: PgStore) -> None:
actual_cols = actual.get(table, []) actual_cols = actual.get(table, [])
for col in expected_cols: for col in expected_cols:
assert col in actual_cols, ( assert col in actual_cols, (
f"Column '{col}' missing from table '{table}'. " f"Column '{col}' missing from table '{table}'. " f"Actual columns: {actual_cols}"
f"Actual columns: {actual_cols}"
) )
@@ -250,8 +317,7 @@ async def test_table_row_count_is_zero(pg: PgStore) -> None:
for table in EXPECTED_TABLES: for table in EXPECTED_TABLES:
count = await _table_row_count(pg, table) count = await _table_row_count(pg, table)
assert count == 0, ( assert count == 0, (
f"Table '{table}' should be empty after migration, " f"Table '{table}' should be empty after migration, " f"but has {count} rows"
f"but has {count} rows"
) )
@@ -262,13 +328,10 @@ async def test_schema_migration_version_recorded(pg: PgStore) -> None:
await pg.migrate() await pg.migrate()
async with pg.pool.acquire() as conn: async with pg.pool.acquire() as conn:
row = await conn.fetchrow( row = await conn.fetchrow("SELECT MAX(version) AS v FROM schema_migrations")
"SELECT MAX(version) AS v FROM schema_migrations"
)
assert row is not None assert row is not None
assert row["v"] == SCHEMA_VERSION, ( assert row["v"] == SCHEMA_VERSION, (
f"Expected schema version {SCHEMA_VERSION}, " f"Expected schema version {SCHEMA_VERSION}, " f"got {row['v']}"
f"got {row['v']}"
) )
@@ -280,7 +343,8 @@ async def test_create_and_query_row(pg: PgStore) -> None:
# ConfigSections round-trip # ConfigSections round-trip
await conn.execute( await conn.execute(
"INSERT INTO config_sections (name, description) VALUES ($1, $2)", "INSERT INTO config_sections (name, description) VALUES ($1, $2)",
"test_section", "A test section for integration test", "test_section",
"A test section for integration test",
) )
row = await conn.fetchrow( row = await conn.fetchrow(
"SELECT name, description FROM config_sections WHERE name = $1", "SELECT name, description FROM config_sections WHERE name = $1",
@@ -357,4 +421,4 @@ async def test_audit_list_recent(pg: PgStore) -> None:
# Verify payload serialization worked # Verify payload serialization worked
first = recent[0] first = recent[0]
if first.payload: if first.payload:
assert "index" in first.payload assert "index" in first.payload
+1 -2
View File
@@ -39,8 +39,7 @@ async def test_end_to_end_config_workflow():
# Mock the setting creation # Mock the setting creation
mock_created_setting = Mock() mock_created_setting = Mock()
mock_created_setting.updated_at = "2023-01-01T00:00:00" mock_created_setting.updated_at = "2023-01-01T00:00:00"
mock_repo_instance.create_setting = AsyncMock( mock_repo_instance.create_setting = AsyncMock(return_value=mock_created_setting)
return_value=mock_created_setting)
mock_repo_instance.get_setting = AsyncMock(return_value=None) mock_repo_instance.get_setting = AsyncMock(return_value=None)
mock_repo_instance.get_latest_updated_at = AsyncMock(return_value=None) mock_repo_instance.get_latest_updated_at = AsyncMock(return_value=None)
mock_repo_instance.list_settings = AsyncMock(return_value=[]) mock_repo_instance.list_settings = AsyncMock(return_value=[])
+3 -7
View File
@@ -136,10 +136,8 @@ async def test_config_setting_repository_list_settings(mock_store):
repo = ConfigSettingRepository(mock_store) repo = ConfigSettingRepository(mock_store)
conn = await mock_store.pool.acquire().__aenter__() conn = await mock_store.pool.acquire().__aenter__()
row1 = _make_row({**SETTING_ROW, "key": "test_key1", row1 = _make_row({**SETTING_ROW, "key": "test_key1", "value_json": "test_value1"})
"value_json": "test_value1"}) row2 = _make_row({**SETTING_ROW, "key": "test_key2", "value_json": "test_value2"})
row2 = _make_row({**SETTING_ROW, "key": "test_key2",
"value_json": "test_value2"})
conn.fetch = AsyncMock(return_value=[row1, row2]) conn.fetch = AsyncMock(return_value=[row1, row2])
result = await repo.list_settings() result = await repo.list_settings()
@@ -176,9 +174,7 @@ async def test_config_pairing_repository_create_pairing(mock_store):
conn = await mock_store.pool.acquire().__aenter__() conn = await mock_store.pool.acquire().__aenter__()
conn.fetchrow = AsyncMock(return_value=_make_row(PAIRING_ROW)) conn.fetchrow = AsyncMock(return_value=_make_row(PAIRING_ROW))
pairing = ConfigPairing( pairing = ConfigPairing(base_asset="BTC", quote_asset="USD", enabled=True, source="Kraken")
base_asset="BTC", quote_asset="USD", enabled=True, source="Kraken"
)
result = await repo.create_pairing(pairing) result = await repo.create_pairing(pairing)
+11 -12
View File
@@ -63,8 +63,7 @@ async def test_configuration_service_set_setting(mock_settings, mock_store, mock
mock_created_setting = Mock() mock_created_setting = Mock()
mock_created_setting.updated_at = "2023-01-01T00:00:00" mock_created_setting.updated_at = "2023-01-01T00:00:00"
mock_repo_instance.create_setting = AsyncMock( mock_repo_instance.create_setting = AsyncMock(return_value=mock_created_setting)
return_value=mock_created_setting)
mock_repo_instance.get_setting = AsyncMock(return_value=None) mock_repo_instance.get_setting = AsyncMock(return_value=None)
await service.set_setting("test_key", "test_value", "test_user") await service.set_setting("test_key", "test_value", "test_user")
@@ -73,7 +72,9 @@ async def test_configuration_service_set_setting(mock_settings, mock_store, mock
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_configuration_service_hot_reload_detection(mock_settings, mock_store, mock_audit_repo): async def test_configuration_service_hot_reload_detection(
mock_settings, mock_store, mock_audit_repo
):
"""Test hot-reload detection functionality.""" """Test hot-reload detection functionality."""
service = ConfigurationService(mock_settings, mock_store, mock_audit_repo) service = ConfigurationService(mock_settings, mock_store, mock_audit_repo)
@@ -86,8 +87,7 @@ async def test_configuration_service_hot_reload_detection(mock_settings, mock_st
from datetime import datetime from datetime import datetime
mock_repo_instance.get_latest_updated_at = AsyncMock( mock_repo_instance.get_latest_updated_at = AsyncMock(return_value=datetime.now())
return_value=datetime.now())
assert await service.is_config_outdated() is True assert await service.is_config_outdated() is True
@@ -105,8 +105,7 @@ async def test_configuration_service_reload_if_changed(mock_settings, mock_store
from datetime import datetime from datetime import datetime
mock_repo_instance.get_latest_updated_at = AsyncMock( mock_repo_instance.get_latest_updated_at = AsyncMock(return_value=datetime.now())
return_value=datetime.now())
result = await service.reload_if_changed() result = await service.reload_if_changed()
assert result is True assert result is True
@@ -125,8 +124,7 @@ async def test_configuration_service_get_config_version(mock_settings, mock_stor
mock_created_setting = Mock() mock_created_setting = Mock()
mock_created_setting.updated_at = "2023-01-01T00:00:00" mock_created_setting.updated_at = "2023-01-01T00:00:00"
mock_repo_instance.create_setting = AsyncMock( mock_repo_instance.create_setting = AsyncMock(return_value=mock_created_setting)
return_value=mock_created_setting)
mock_repo_instance.get_setting = AsyncMock(return_value=None) mock_repo_instance.get_setting = AsyncMock(return_value=None)
await service.set_setting("test_key", "test_value", "test_user") await service.set_setting("test_key", "test_value", "test_user")
@@ -134,7 +132,9 @@ async def test_configuration_service_get_config_version(mock_settings, mock_stor
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_configuration_service_get_last_updated_at(mock_settings, mock_store, mock_audit_repo): async def test_configuration_service_get_last_updated_at(
mock_settings, mock_store, mock_audit_repo
):
"""Test getting last updated timestamp.""" """Test getting last updated timestamp."""
service = ConfigurationService(mock_settings, mock_store, mock_audit_repo) service = ConfigurationService(mock_settings, mock_store, mock_audit_repo)
assert service.get_last_updated_at() is None assert service.get_last_updated_at() is None
@@ -145,8 +145,7 @@ async def test_configuration_service_get_last_updated_at(mock_settings, mock_sto
mock_created_setting = Mock() mock_created_setting = Mock()
mock_created_setting.updated_at = "2023-01-01T00:00:00" mock_created_setting.updated_at = "2023-01-01T00:00:00"
mock_repo_instance.create_setting = AsyncMock( mock_repo_instance.create_setting = AsyncMock(return_value=mock_created_setting)
return_value=mock_created_setting)
mock_repo_instance.get_setting = AsyncMock(return_value=None) mock_repo_instance.get_setting = AsyncMock(return_value=None)
await service.set_setting("test_key", "test_value", "test_user") await service.set_setting("test_key", "test_value", "test_user")
+3 -7
View File
@@ -49,9 +49,7 @@ def _mock_pg_store():
@pytest.fixture @pytest.fixture
def app(): def app():
"""Create a test app with a mocked PgStore and audit repository.""" """Create a test app with a mocked PgStore and audit repository."""
a = create_app( a = create_app(Settings(_env_file=None, APP_MODE="paper", paper_trading_mode=True))
Settings(_env_file=None, APP_MODE="paper", paper_trading_mode=True)
)
a.state.store = _mock_pg_store() a.state.store = _mock_pg_store()
a.state.runtime_state_repository.insert = AsyncMock() a.state.runtime_state_repository.insert = AsyncMock()
a.state.runtime_state_repository.latest = AsyncMock(return_value=None) a.state.runtime_state_repository.latest = AsyncMock(return_value=None)
@@ -69,16 +67,14 @@ async def test_persist_runtime_snapshot_writes_record(app) -> None:
# Mock _open_trade_count → 0, _latest_balances → None # Mock _open_trade_count → 0, _latest_balances → None
conn = await app.state.store.pool.acquire().__aenter__() conn = await app.state.store.pool.acquire().__aenter__()
conn.fetchrow = AsyncMock(return_value=MagicMock( conn.fetchrow = AsyncMock(return_value=MagicMock(**{"__getitem__": lambda s, k: 0}))
**{"__getitem__": lambda s, k: 0}))
snapshot = await persist_runtime_snapshot(app, note="unit-test") snapshot = await persist_runtime_snapshot(app, note="unit-test")
assert snapshot is not None assert snapshot is not None
assert snapshot.note == "unit-test" assert snapshot.note == "unit-test"
app.state.runtime_state_repository.latest = AsyncMock( app.state.runtime_state_repository.latest = AsyncMock(return_value=snapshot)
return_value=snapshot)
latest = await app.state.runtime_state_repository.latest() latest = await app.state.runtime_state_repository.latest()
assert latest is not None assert latest is not None
assert latest.note == "unit-test" assert latest.note == "unit-test"