Add integration tests for execution persistence, metrics, and opportunity writing
CI / lint-test-build (push) Failing after 1m23s

- Implemented integration tests for the execution writer to ensure trade orders and PnL are persisted correctly.
- Created integration tests for the metrics calculator to summarize execution data accurately.
- Added integration tests for the opportunity writer to verify event persistence.
- Established PostgreSQL schema validation tests to ensure all expected tables, columns, and constraints exist.
- Removed outdated unit tests that relied on DuckDB and replaced them with tests using PgStore.
This commit is contained in:
2026-06-07 14:37:53 +02:00
parent 54feb2ecd4
commit 529ff967cc
44 changed files with 1955 additions and 1386 deletions
+1
View File
@@ -38,6 +38,7 @@ dist/
# Local database / runtime data
data/*.duckdb.wal
data/*.duckdb.tmp
data/arbitrade.duckdb
logs/
ops/performance/latest_profile.json
+22 -10
View File
@@ -6,7 +6,7 @@ Current stack:
- Python 3.12+
- FastAPI + HTMX/Jinja2
- DuckDB for dev/test/prod
- PostgreSQL for all environments (via asyncpg)
- Native Kraken WebSocket planned for market-data hot path
- Gitea Actions + Gitea container registry
@@ -22,7 +22,7 @@ Bootstrap complete for foundation layer:
- typed settings and env loading
- structured logging
- encrypted secret helpers
- DuckDB connection + base schema
- PostgreSQL connection + full schema migration
- FastAPI app with health endpoint
- Gitea Actions CI scaffold
- Docker / docker-compose scaffold
@@ -152,7 +152,11 @@ APP_HOST=0.0.0.0
APP_PORT=9090
LOG_LEVEL=INFO
LOG_JSON=true
DUCKDB_PATH=./data/arbitrade.duckdb
PG_HOST=192.168.88.35
PG_PORT=5432
PG_DATABASE=arbitrade
PG_USER=arbitrade
PG_PASSWORD=arbitrade
FERNET_KEY=
KRAKEN_API_KEY=
KRAKEN_API_SECRET=
@@ -182,15 +186,19 @@ Health endpoints:
## Database
DuckDB used everywhere: local dev, tests, production.
PostgreSQL used everywhere: local dev, tests, production.
Default database file:
Default connection:
```text
./data/arbitrade.duckdb
PG_HOST=192.168.88.35
PG_PORT=5432
PG_DATABASE=arbitrade
PG_USER=arbitrade
PG_PASSWORD=arbitrade
```
Schema bootstrap runs automatically on app startup.
Schema bootstrap runs automatically on app startup via `PgStore.migrate()`.
Current tables:
@@ -220,7 +228,7 @@ DELETE FROM audit_events
WHERE occurred_at < NOW() - INTERVAL 30 DAY;
```
- Back up archive files and the main DuckDB file together.
- Back up archive files and the PostgreSQL database together.
- For production, run archive + backup as scheduled maintenance (cron/task scheduler).
## Quality Checks
@@ -342,7 +350,7 @@ Add a persistent volume in Coolify:
- Mount Path: `/app/data`
This preserves DuckDB and other runtime artifacts across restarts/redeploys.
This preserves PostgreSQL data and other runtime artifacts across restarts/redeploys.
### 5) Configure environment variables
@@ -351,7 +359,11 @@ Add runtime environment variables in Coolify (UI: Environment Variables):
- `APP_ENV=prod`
- `APP_HOST=0.0.0.0`
- `APP_PORT=9090`
- `DUCKDB_PATH=/app/data/arbitrade.duckdb`
- `PG_HOST=postgres`
`PG_PORT=5432`
`PG_DATABASE=arbitrade`
`PG_USER=arbitrade`
`PG_PASSWORD=arbitrade`
- `LOG_LEVEL=INFO`
- `LOG_JSON=true`
- `KRAKEN_API_KEY=...`
Binary file not shown.
+13 -13
View File
@@ -7,16 +7,16 @@ This guide provides two supported deployment paths for Arbitrade on Coolify:
Reference docs:
- Coolify Applications: https://coolify.io/docs/applications
- Coolify Build Packs: https://coolify.io/docs/applications/build-packs
- Coolify Dockerfile Build Pack: https://coolify.io/docs/applications/build-packs/dockerfile
- Coolify Nixpacks Build Pack: https://coolify.io/docs/applications/build-packs/nixpacks
- Coolify CI/CD (Git providers): https://coolify.io/docs/applications/ci-cd
- Coolify Gitea integration: https://coolify.io/docs/applications/ci-cd/gitea/integration
- Coolify environment variables: https://coolify.io/docs/knowledge-base/environment-variables
- Coolify persistent storage: https://coolify.io/docs/knowledge-base/persistent-storage
- Coolify health checks: https://coolify.io/docs/knowledge-base/health-checks
- Coolify Docker registry credentials: https://coolify.io/docs/knowledge-base/docker/registry
- [Coolify Applications](https://coolify.io/docs/applications)
- [Coolify Build Packs](https://coolify.io/docs/applications/build-packs)
- [Coolify Dockerfile Build Pack](https://coolify.io/docs/applications/build-packs/dockerfile)
- [Coolify Nixpacks Build Pack](https://coolify.io/docs/applications/build-packs/nixpacks)
- [Coolify CI/CD (Git providers)](https://coolify.io/docs/applications/ci-cd)
- [Coolify Gitea integration](https://coolify.io/docs/applications/ci-cd/gitea/integration)
- [Coolify environment variables](https://coolify.io/docs/knowledge-base/environment-variables)
- [Coolify persistent storage](https://coolify.io/docs/knowledge-base/persistent-storage)
- [Coolify health checks](https://coolify.io/docs/knowledge-base/health-checks)
- [Coolify Docker registry credentials](https://coolify.io/docs/knowledge-base/docker/registry)
## Common Runtime Configuration
@@ -32,14 +32,14 @@ Use these values in both deployment modes.
- Add a persistent volume
- Mount path: `/app/data`
- Set DB path to: `DUCKDB_PATH=/app/data/arbitrade.duckdb`
- Set PG connection: `PG_HOST=postgres`, `PG_PORT=5432`, `PG_DATABASE=arbitrade`, `PG_USER=arbitrade`, `PG_PASSWORD=arbitrade`
### Required environment variables
- `APP_ENV=prod`
- `APP_HOST=0.0.0.0`
- `APP_PORT=9090`
- `DUCKDB_PATH=/app/data/arbitrade.duckdb`
- `PG_DATABASE=arbitrade`
- `LOG_LEVEL=INFO`
- `LOG_JSON=true`
- `KRAKEN_API_KEY=<set-in-coolify-secret>`
@@ -135,7 +135,7 @@ Update flow for new releases:
- Ensure the deployed image/wheel includes package templates under `arbitrade/web/templates/*`.
- If you build from source, do not remove packaged template files under `src/arbitrade/web/templates`.
- DB resets after deploy:
- Confirm persistent mount exists at `/app/data` and `DUCKDB_PATH` points there.
- Confirm PostgreSQL is reachable at `PG_HOST`.
- Registry pull fails:
- Re-check Docker registry credentials in Coolify.
- App starts but unavailable externally:
+9 -9
View File
@@ -8,7 +8,7 @@ Primary goals:
- Detect and execute triangular opportunities on Kraken with fee/slippage-aware math.
- Keep hot-path latency low with incremental order-book updates and event-driven scoring.
- Persist operational data in DuckDB for dev, test, and prod.
- Persist operational data in PostgreSQL for all environments.
- Provide operator controls, audit trail, and alerting through a server-rendered dashboard.
- Support backtesting, parameter sweeps, and deferred experimental strategy work behind feature flags.
@@ -17,7 +17,7 @@ Primary goals:
- Python 3.12+ runtime.
- Native Kraken WebSocket on the hot path.
- HTMX + Jinja2 UI, no SPA build step.
- DuckDB everywhere.
- PostgreSQL everywhere.
- Self-hosted Gitea Actions CI and Gitea registry.
- Windows development support.
- Secrets must stay out of the repository.
@@ -32,7 +32,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
- Kraken REST + WebSocket provide market data and execution.
- FastAPI serves HTML fragments, JSON endpoints, and SSE streams.
- DuckDB stores trades, opportunities, snapshots, audit events, and runtime state.
- PostgreSQL stores trades, opportunities, snapshots, audit events, and runtime state.
- Coolify can deploy the published image using environment variables and persistent storage.
## 4. Solution Strategy
@@ -55,7 +55,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
- `execution/` - multi-leg trade sequencing.
- `backtesting/` - replay engine, parameter sweep, experiment scaffolds.
- `strategy/` - experimental strategy modules such as stat-arb.
- `storage/` - DuckDB schema and repositories.
- `storage/` - PostgreSQL schema and repositories.
- `alerting/` - multi-channel notifications.
- `runtime/` - startup recovery and graceful shutdown.
@@ -64,7 +64,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
- `fastapi`, `uvicorn`, `jinja2`, `htmx`-driven templates.
- `orjson` for low-alloc parsing.
- `sortedcontainers` for book state.
- `duckdb` for persistence and analytics.
- `asyncpg` for PostgreSQL persistence.
- `pydantic` / `pydantic-settings` for typed configuration.
- `cryptography` / keyring for secret handling.
@@ -77,7 +77,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
3. Incremental detector scores impacted cycles.
4. Risk manager validates the opportunity.
5. Execution sequencer places legs if approved.
6. Trades and snapshots persist to DuckDB.
7. Trades and snapshots persist to PostgreSQL.
7. Dashboard and alerts reflect state changes.
### 6.2 Dashboard Control Flow
@@ -112,7 +112,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
- Deploy from the published image.
- Configure runtime via environment variables.
- Mount persistent storage at `/app/data` for DuckDB.
- Connect to PostgreSQL at configured `PG_HOST`.
## 8. Cross-Cutting Concepts
@@ -126,7 +126,7 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
## 9. Architecture Decisions
- Native Kraken WS instead of a generic exchange abstraction on the hot path.
- DuckDB as the single database engine.
- PostgreSQL as the single database engine.
- HTMX + Jinja2 instead of SPA frontend.
- Backtesting reuses production detector/risk/execution logic.
- Experimental stat-arb stays behind a feature flag.
@@ -152,5 +152,5 @@ The bot consumes Kraken market data, detects opportunities, and executes trades
- WS: WebSocket.
- HTMX: HTML-over-the-wire UI library.
- SSE: Server-Sent Events.
- DUCKDB: Embedded analytical database used for all environments.
- PGSQL: PostgreSQL database used for all environments.
- Stat arb: Statistical arbitrage, currently experimental and feature-flagged.
+88
View File
@@ -0,0 +1,88 @@
# Database Layer: Schema & Repositories
> **Database engine**: PostgreSQL 15+ on `192.168.88.35`
> **Driver**: `asyncpg` (async connection pool)
> **Store class**: `PgStore` in `src/arbitrade/storage/pg_store.py`
## Connection Lifecycle
```txt
FastAPI lifespan (create_app)
└─ PgStore.start() # creates asyncpg connection pool
└─ PgStore.migrate() # reads schema_pg.sql, creates tables
└─ ... application runs ...
└─ PgStore.stop() # closes the pool
```
All repository classes accept a `PgStore` instance and acquire connections
via `async with self._store.pool.acquire() as conn:`.
## Schema
Defined in `src/arbitrade/storage/schema_pg.sql`. 15 tables:
| Table | Purpose | PK | Notes |
| ----------------------------- | -------------------------- | --------------- | ---------------------------------------- |
| `schema_migrations` | Version tracking | `version` | Single-row per version |
| `config_sections` | Config section metadata | `id` (SERIAL) | `name` UNIQUE |
| `config_settings` | Key-value config store | `key` (VARCHAR) | JSON-serialized values |
| `config_pairings` | Currency pairs to monitor | `id` (SERIAL) | `(base_asset, quote_asset)` UNIQUE |
| `config_backtesting_defaults` | Default backtest params | `id` (SERIAL) | Singleton via `ORDER BY id DESC LIMIT 1` |
| `opportunities` | Detected arb opportunities | `id` (UUID) | |
| `trades` | Executed trades | `id` (UUID) | |
| `orders` | Individual leg orders | `id` (UUID) | |
| `pnl_events` | P&L event stream | `id` (UUID) | |
| `portfolio_snapshots` | Balance snapshots | — | Append-only |
| `market_snapshots` | Raw order-book snapshots | — | Append-only |
| `audit_events` | Audit trail | `id` (UUID) | |
| `runtime_state_snapshots` | Runtime state history | — | Append-only |
| `kraken_account_snapshots` | Fee tier + account data | — | Append-only |
| `backtest_jobs` | Backtest job records | `id` (UUID) | |
JSON columns use `JSONB` for indexability. UUID primary keys use
`gen_random_uuid()` (requires `pgcrypto` extension).
## Repository Classes
All in `src/arbitrade/storage/repositories.py`. Every method is `async def`.
| Class | Key Methods | Used By |
| ------------------------------------- | ---------------------------------------------------------- | --------------------------- |
| `MarketSnapshotRepository` | `insert()` | `AsyncMarketSnapshotWriter` |
| `OpportunityRepository` | `insert()` | `AsyncOpportunityWriter` |
| `TradeRepository` | `insert()` | `AsyncExecutionWriter` |
| `OrderRepository` | `insert()` | `AsyncExecutionWriter` |
| `PnLRepository` | `insert()` | `AsyncExecutionWriter` |
| `AuditRepository` | `insert()`, `list_recent()` | API routes, lifecycle |
| `RuntimeStateRepository` | `insert()`, `latest()` | Lifecycle, API |
| `ConfigSectionRepository` | `create_section()`, `get_section()`, `list_sections()` | Config service |
| `ConfigSettingRepository` | Full CRUD + `get_latest_updated_at()` | Config service |
| `ConfigPairingRepository` | Full CRUD + `upsert_pairing()`, `list_pairings()` | Feeds, pairing sync |
| `ConfigBacktestingDefaultsRepository` | `create_defaults()`, `get_defaults()`, `update_defaults()` | Config service |
| `KrakenAccountSnapshotRepository` | `insert_snapshot()`, `latest_snapshot()` | Fee sync loop |
| `BacktestJobRepository` | Full CRUD | Backtesting UI + worker |
## Async Writers
Three background writer tasks buffer high-frequency writes:
- **`AsyncExecutionWriter`** — trades/orders/P&L queue
- **`AsyncMarketSnapshotWriter`** — order-book snapshot queue
- **`AsyncOpportunityWriter`** — opportunity event queue
Each uses an `asyncio.Queue` and drains it in a background task with
`await repo.insert(...)`.
## Integration Tests
`tests/integration/test_postgresql_schema.py` verifies:
- Connection to PostgreSQL server
- `pgcrypto` extension availability
- All 15 tables exist after migration
- Migration is idempotent
- Correct columns per table
- Primary keys and unique constraints
- Tables start empty
- Simple INSERT/SELECT round-trip
- `ON CONFLICT ... DO UPDATE` on config_pairings
+1 -1
View File
@@ -39,7 +39,7 @@ Key end-to-end latency baselines from `latency_baseline.json`:
## Optimization Note
`MetricsCalculator.compute()` was optimized to use DuckDB SQL aggregations and quantiles, reducing Python-side row scans.
`MetricsCalculator.compute()` uses PostgreSQL SQL aggregations and percentiles, reducing Python-side row scans.
Measured benchmark (`scripts/benchmark_metrics_compute.py`):
+1 -1
View File
@@ -54,7 +54,7 @@ pretty = true
mypy_path = "src"
[[tool.mypy.overrides]]
module = ["duckdb", "keyring", "sortedcontainers"]
module = ["asyncpg", "keyring", "sortedcontainers"]
ignore_missing_imports = true
[tool.pytest.ini_options]
+1
View File
@@ -1,4 +1,5 @@
# Unpinned dev dependencies (latest available)
asyncpg-stubs
black
mypy
pre-commit
+1 -1
View File
@@ -1,6 +1,6 @@
# Unpinned runtime dependencies (latest available)
asyncpg
cryptography
duckdb
fastapi
httptools
httpx
+14 -10
View File
@@ -25,15 +25,15 @@ from arbitrade.market_data.feed_builder import (
)
from arbitrade.metrics import MetricsCalculator
from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter
from arbitrade.storage.opportunities import AsyncOpportunityWriter
from arbitrade.storage.repositories import AuditRepository, RuntimeStateRepository
from arbitrade.storage.repositories import AuditRepository, RuntimeStateRepository, MarketSnapshotRepository, OpportunityRepository
_LOG = structlog.get_logger(__name__)
def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task[None] | None:
async def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task[None] | None:
"""Create and start a MarketDataFeed task from enabled pairings.
If kill_switch_only=True, only create a kill-switch-bound stub (no detector/feed).
@@ -45,14 +45,14 @@ def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task
controls = app.state.dashboard_controls
# Build detector from enabled pairings
detector = build_detector_from_enabled_pairings(
detector = await build_detector_from_enabled_pairings(
db,
fee_rate=0.0, # will be overridden by fee sync
max_depth_levels=controls.strategy_max_depth_levels,
min_profit_threshold=controls.strategy_profit_threshold,
)
symbols = get_enabled_pair_symbols(db)
symbols = await get_enabled_pair_symbols(db)
if not symbols and not kill_switch_only:
_LOG.warning("no_enabled_pair_symbols_feed_not_started")
return None
@@ -64,8 +64,8 @@ def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task
ws_client.set_subscribed_symbols(symbols)
snapshot_writer = AsyncMarketSnapshotWriter(db)
opportunity_writer = AsyncOpportunityWriter(db)
snapshot_writer = AsyncMarketSnapshotWriter(MarketSnapshotRepository(db))
opportunity_writer = AsyncOpportunityWriter(OpportunityRepository(db))
feed = MarketDataFeed(
ws_client=ws_client,
@@ -90,8 +90,8 @@ def _start_feed(app: FastAPI, *, kill_switch_only: bool = False) -> asyncio.Task
def create_app(settings: Settings) -> FastAPI:
configure_logging(settings.log_level, settings.log_json)
db = DuckDBStore(settings)
db.migrate()
db = PgStore(settings)
kraken_client = KrakenRestClient(settings)
fee_sync_stop_event = asyncio.Event()
pairing_sync_stop_event = asyncio.Event()
@@ -101,6 +101,9 @@ def create_app(settings: Settings) -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
await app.state.store.start()
await app.state.store.migrate()
await app.state.configuration_service.load_database_settings()
await restore_runtime_state(app)
fee_sync_task = asyncio.create_task(
run_fee_sync_loop(
@@ -123,7 +126,7 @@ def create_app(settings: Settings) -> FastAPI:
name="backtest_worker",
)
# Start market data feed from enabled pairings
_start_feed(app)
await _start_feed(app)
app.state.fee_sync_task = fee_sync_task
app.state.pairing_sync_task = pairing_sync_task
app.state.backtest_task = backtest_task
@@ -161,6 +164,7 @@ def create_app(settings: Settings) -> FastAPI:
pass
await kraken_client.close()
await graceful_shutdown(app)
await app.state.store.stop()
app = FastAPI(title="arbitrade", version="0.1.0", lifespan=lifespan)
app.state.settings = settings
+153 -147
View File
@@ -9,7 +9,6 @@ from pathlib import Path
from typing import cast
from urllib.parse import parse_qs
import duckdb
import orjson
from fastapi import APIRouter, Depends, Request, Response
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
@@ -21,6 +20,7 @@ from arbitrade.api.control_state import DashboardControlState
from arbitrade.config.pairing_sync import sync_pairings_from_kraken
from arbitrade.config.service import ConfigPairing
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import (
AuditRecord,
AuditRepository,
@@ -35,7 +35,8 @@ public_router = APIRouter()
def _resolve_templates_directory() -> str:
# Support source layout, Docker runtime (/app), and installed package data.
source_layout_path = Path(__file__).resolve().parents[3] / "web" / "templates"
source_layout_path = Path(
__file__).resolve().parents[3] / "web" / "templates"
if source_layout_path.is_dir():
return str(source_layout_path)
@@ -44,7 +45,8 @@ def _resolve_templates_directory() -> str:
return str(docker_runtime_path)
try:
package_path = resources.files("arbitrade").joinpath("web", "templates")
package_path = resources.files(
"arbitrade").joinpath("web", "templates")
if package_path.is_dir():
return str(package_path)
except (ModuleNotFoundError, AttributeError):
@@ -64,8 +66,8 @@ def _format_metric(value: float | None, *, precision: int = 2, suffix: str = "")
return f"{value:.{precision}f}{suffix}"
def _dashboard_metrics(request: Request) -> dict[str, str]:
metrics = request.app.state.metrics.compute()
async def _dashboard_metrics(request: Request) -> dict[str, str]:
metrics = await request.app.state.metrics.compute()
return {
"realized_pnl": _format_metric(metrics.realized_pnl_usd, precision=2, suffix=" USD"),
"win_rate": _format_metric(
@@ -91,57 +93,85 @@ def _dashboard_metrics(request: Request) -> dict[str, str]:
}
def _table_columns(conn: duckdb.DuckDBPyConnection, table_name: str) -> set[str]:
rows = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall()
return {str(row[1]) for row in rows}
def _dashboard_overview(request: Request) -> dict[str, object]:
store = request.app.state.store
with store.connect() as conn:
trade_columns = _table_columns(conn, "trades")
trade_ref_expr = "trade_ref" if "trade_ref" in trade_columns else "CAST(id AS VARCHAR)"
cycle_expr = "cycle" if "cycle" in trade_columns else "NULL"
if "finished_at" in trade_columns:
open_trade_filter = "finished_at IS NULL"
else:
open_trade_filter = "LOWER(status) NOT IN ('filled', 'closed', 'cancelled', 'canceled')"
portfolio_row = conn.execute("""
async def _dashboard_overview(request: Request) -> dict[str, object]:
store: PgStore = request.app.state.store
async with store.pool.acquire() as conn:
portfolio_row = await conn.fetchrow("""
SELECT balances, total_value_usd
FROM portfolio_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
open_trades = conn.execute(f"""
SELECT {trade_ref_expr}, status, started_at, {cycle_expr}
""")
open_trades = await conn.fetch("""
SELECT trade_ref, status, started_at, cycle
FROM trades
WHERE {open_trade_filter}
WHERE finished_at IS NULL
ORDER BY started_at DESC
LIMIT 5
""").fetchall()
rpnl = conn.execute("""
""")
rpnl = await conn.fetchrow("""
SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0)
FROM trades
""").fetchone()
latest_opportunities = conn.execute("""
""")
latest_opportunities = await conn.fetch("""
SELECT cycle, net_pct, est_profit, detected_at
FROM opportunities
ORDER BY detected_at DESC
LIMIT 5
""").fetchall()
""")
# Query equity from kraken_account_snapshots
equity_value = ""
try:
equity_row = await conn.fetchrow("""
SELECT trade_balance_raw
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""")
if equity_row is not None and equity_row["trade_balance_raw"] is not None:
tb_raw = equity_row["trade_balance_raw"]
if isinstance(tb_raw, str):
tb_raw = json.loads(tb_raw)
if isinstance(tb_raw, dict):
eb = tb_raw.get("eb")
equity_value = f"{float(eb):.2f} USD" if eb is not None else ""
except Exception:
pass
# Query latest Kraken account snapshot for fee info
fee_tier = ""
maker_fee = ""
taker_fee = ""
thirty_day_volume = ""
try:
acct_row = await conn.fetchrow("""
SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""")
if acct_row is not None:
fee_tier = str(
acct_row["fee_tier"]) if acct_row["fee_tier"] is not None else ""
maker_fee = f"{float(acct_row['maker_fee']):.4%}" if acct_row["maker_fee"] is not None else ""
taker_fee = f"{float(acct_row['taker_fee']):.4%}" if acct_row["taker_fee"] is not None else ""
thirty_day_volume = f"{float(acct_row['thirty_day_volume']):.2f}" if acct_row[
"thirty_day_volume"] is not None else ""
except Exception:
pass
balances_value = ""
total_value = ""
equity_value = ""
if portfolio_row is not None:
balances_raw, total_value_raw = portfolio_row
balances_raw = portfolio_row["balances"]
total_value_raw = portfolio_row["total_value_usd"]
if isinstance(balances_raw, str) and balances_raw:
try:
parsed = json.loads(balances_raw)
if isinstance(parsed, dict):
# Filter out zero balances, show non-zero as "AMT ASSET"
non_zero = {k: float(v) for k, v in parsed.items() if float(v) > 0.0}
non_zero = {k: float(v)
for k, v in parsed.items() if float(v) > 0.0}
if non_zero:
balances_value = "<br>".join(
f"{v:.6g} {k}" for k, v in sorted(non_zero.items())
@@ -157,63 +187,25 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
if total_value_raw is not None:
total_value = f"{float(total_value_raw):.2f} USD"
# Query equity from kraken_account_snapshots
try:
equity_row = conn.execute("""
SELECT trade_balance_raw
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
if equity_row is not None and equity_row[0] is not None:
tb_raw = equity_row[0]
if isinstance(tb_raw, str):
tb_raw = json.loads(tb_raw)
if isinstance(tb_raw, dict):
eb = tb_raw.get("eb")
equity_value = f"{float(eb):.2f} USD" if eb is not None else ""
except Exception:
pass
open_trade_rows = [
{
"trade_ref": str(row[0]),
"status": str(row[1]),
"started_at": row[2].isoformat() if isinstance(row[2], datetime) else "",
"cycle": str(row[3]) if row[3] is not None else "",
"trade_ref": str(r["trade_ref"]),
"status": str(r["status"]),
"started_at": r["started_at"].isoformat() if isinstance(r["started_at"], datetime) else "",
"cycle": str(r["cycle"]) if r["cycle"] is not None else "",
}
for row in open_trades
for r in open_trades
]
opportunity_rows = [
{
"cycle": str(row[0]),
"net_pct": f"{float(row[1]):.2f}%" if row[1] is not None else "",
"est_profit": f"{float(row[2]):.2f} USD" if row[2] is not None else "",
"detected_at": row[3].isoformat() if isinstance(row[3], datetime) else "",
"cycle": str(r["cycle"]),
"net_pct": f"{float(r['net_pct']):.2f}%" if r["net_pct"] is not None else "",
"est_profit": f"{float(r['est_profit']):.2f} USD" if r["est_profit"] is not None else "",
"detected_at": r["detected_at"].isoformat() if isinstance(r["detected_at"], datetime) else "",
}
for row in latest_opportunities
for r in latest_opportunities
]
# Query latest Kraken account snapshot for fee info
fee_tier = ""
maker_fee = ""
taker_fee = ""
thirty_day_volume = ""
try:
acct_row = conn.execute("""
SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
if acct_row is not None:
fee_tier = str(acct_row[0]) if acct_row[0] is not None else ""
maker_fee = f"{float(acct_row[1]):.4%}" if acct_row[1] is not None else ""
taker_fee = f"{float(acct_row[2]):.4%}" if acct_row[2] is not None else ""
thirty_day_volume = f"{float(acct_row[3]):.2f}" if acct_row[3] is not None else ""
except Exception:
pass
return {
"status": "live",
"generated_at": datetime.now(UTC).isoformat(),
@@ -232,26 +224,28 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
}
def _dashboard_charts(request: Request) -> dict[str, object]:
store = request.app.state.store
with store.connect() as conn:
opportunity_rows = conn.execute("""
async def _dashboard_charts(request: Request) -> dict[str, object]:
store: PgStore = request.app.state.store
async with store.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT detected_at, cycle, net_pct, est_profit
FROM opportunities
ORDER BY detected_at DESC
LIMIT 10
""").fetchall()
""")
cr = list(reversed(opportunity_rows))
cr = list(reversed(rows))
labels = []
for index, row in enumerate(cr):
if isinstance(row[0], datetime):
labels.append(row[0].isoformat())
if isinstance(row["detected_at"], datetime):
labels.append(row["detected_at"].isoformat())
else:
labels.append(f"opportunity-{index + 1}")
np = [float(row[2]) if row[2] is not None else 0.0 for row in cr]
ep = [float(row[3]) if row[3] is not None else 0.0 for row in cr]
cycles = [str(row[1]) for row in cr]
np = [float(row["net_pct"]) if row["net_pct"]
is not None else 0.0 for row in cr]
ep = [float(row["est_profit"]) if row["est_profit"]
is not None else 0.0 for row in cr]
cycles = [str(row["cycle"]) for row in cr]
return {
"labels": labels,
@@ -272,7 +266,7 @@ def _audit_repository(request: Request) -> AuditRepository | None:
return cast(AuditRepository | None, repository)
def _record_audit(
async def _record_audit(
request: Request,
*,
actor: str,
@@ -288,7 +282,7 @@ def _record_audit(
ret_pl = {str(key): payload[key] for key in payload}
else:
ret_pl = None
repository.insert(
await repository.insert(
AuditRecord(
occurred_at=datetime.now(UTC),
actor=actor,
@@ -300,7 +294,7 @@ def _record_audit(
)
def _dashboard_audit(request: Request, *, limit: int = 15) -> dict[str, object]:
async def _dashboard_audit(request: Request, *, limit: int = 15) -> dict[str, object]:
repository = _audit_repository(request)
if repository is None:
return {
@@ -308,7 +302,7 @@ def _dashboard_audit(request: Request, *, limit: int = 15) -> dict[str, object]:
"generated_at": datetime.now(UTC).isoformat(),
}
records = repository.list_recent(limit=limit)
records = await repository.list_recent(limit=limit)
entries: list[dict[str, str]] = []
for record in records:
payload_text = ""
@@ -355,7 +349,7 @@ def _alert_status_snapshot(request: Request) -> dict[str, object]:
}
def _dashboard_config_context(request: Request) -> dict[str, object]:
async def _dashboard_config_context(request: Request) -> dict[str, object]:
ctl = _dashboard_controls_state(request)
rs = request.app.state.settings
max_trade_capital_usd = (
@@ -416,7 +410,8 @@ def _dashboard_config_context(request: Request) -> dict[str, object]:
max_consecutive_failures_value = (
str(rs.max_consecutive_failures) if rs.max_consecutive_failures is not None else ""
)
strategy_stat_arb_enabled = bool(getattr(rs, "strategy_enable_stat_arb_experiment", False))
strategy_stat_arb_enabled = bool(
getattr(rs, "strategy_enable_stat_arb_experiment", False))
return {
# Runtime
@@ -537,7 +532,8 @@ def _dashboard_controls(request: Request) -> dict[str, object]:
alerts_last_channel_results = [
str(item) for item in cast(list[object], alert_status.get("last_channel_results", []))
]
strategy_stat_arb_enabled = bool(getattr(rs, "strategy_enable_stat_arb_experiment", False))
strategy_stat_arb_enabled = bool(
getattr(rs, "strategy_enable_stat_arb_experiment", False))
return {
"execution_status": "running" if ctl.is_running else "stopped",
@@ -611,7 +607,7 @@ def _normalize_fee_profile(profile: str) -> str:
return profile.strip().lower().replace("-", "_")
def _fee_rate_for_profile(
async def _fee_rate_for_profile(
profile: str,
custom_fee_rate: float | None,
request: Request | None = None,
@@ -628,9 +624,9 @@ def _fee_rate_for_profile(
if normalized == "api":
if request is None:
raise ValueError("api fee profile requires request context")
store = request.app.state.store
store: PgStore = request.app.state.store
repo = KrakenAccountSnapshotRepository(store)
latest = repo.latest_snapshot()
latest = await repo.latest_snapshot()
if latest is not None and latest.maker_fee is not None:
return latest.maker_fee
# Fallback to standard if no snapshot yet
@@ -700,11 +696,11 @@ def _build_cycles_from_events(
return graph.index_cycles_by_pair(cycles), sorted(symbols)
def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
async def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
"""Fetch recent backtest jobs from DB."""
store = request.app.state.store
store: PgStore = request.app.state.store
repo = BacktestJobRepository(store)
jobs = repo.list_jobs(limit=20)
jobs = await repo.list_jobs(limit=20)
return [
{
"job_id": j.id or "",
@@ -719,7 +715,7 @@ def _recent_backtest_reports(request: Request) -> list[dict[str, object]]:
]
def _backtesting_panel_context(
async def _backtesting_panel_context(
request: Request,
*,
status: str = "idle",
@@ -742,7 +738,7 @@ def _backtesting_panel_context(
if defaults is not None:
default_values.update(defaults)
reports = _recent_backtest_reports(request)
reports = await _recent_backtest_reports(request)
latest = latest_report or (reports[0] if reports else None)
return {
@@ -814,10 +810,11 @@ async def dashboard_backtesting_page(request: Request) -> HTMLResponse:
@router.get("/dashboard/fragment/backtesting", response_class=HTMLResponse)
async def dashboard_backtesting_fragment(request: Request) -> HTMLResponse:
d_context = await _dashboard_config_context(request)
return templates.TemplateResponse(
request=request,
name="partials/backtesting_panel.html",
context={"request": request, **_backtesting_panel_context(request)},
context={"request": request, **d_context},
)
@@ -826,7 +823,7 @@ async def dashboard_backtesting_pairings_fragment(request: Request) -> HTMLRespo
"""HTMX fragment: pairing checkboxes for backtest form."""
store = request.app.state.store
repo = ConfigPairingRepository(store)
pairings = repo.list_pairings()
pairings = await repo.list_pairings()
pairings.sort(key=lambda p: (p.base_asset, p.quote_asset))
return templates.TemplateResponse(
request=request,
@@ -840,7 +837,7 @@ async def dashboard_metrics(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
name="partials/metrics.html",
context={"request": request, **_dashboard_metrics(request)},
context={"request": request, **(await _dashboard_metrics(request))},
)
@@ -849,7 +846,7 @@ async def dashboard_overview(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
name="partials/overview.html",
context={"request": request, **_dashboard_overview(request)},
context={"request": request, **(await _dashboard_overview(request))},
)
@@ -867,7 +864,7 @@ async def dashboard_charts(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
name="partials/charts.html",
context={"request": request, **_dashboard_charts(request)},
context={"request": request, **(await _dashboard_charts(request))},
)
@@ -879,7 +876,7 @@ async def dashboard_audit_page(request: Request) -> HTMLResponse:
context={
"title": "Arbitrade Audit Trail",
"request": request,
**_dashboard_audit(request),
**(await _dashboard_audit(request)),
},
)
@@ -889,7 +886,7 @@ async def dashboard_audit_fragment(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
name="partials/audit.html",
context={"request": request, **_dashboard_audit(request)},
context={"request": request, **(await _dashboard_audit(request))},
)
@@ -898,12 +895,13 @@ async def dashboard_audit(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
name="partials/audit.html",
context={"request": request, **_dashboard_audit(request)},
context={"request": request, **(await _dashboard_audit(request))},
)
@router.get("/dashboard/config", response_class=HTMLResponse)
async def dashboard_config_page(request: Request) -> HTMLResponse:
d_context = await _dashboard_config_context(request)
return templates.TemplateResponse(
request=request,
name="config.html",
@@ -911,17 +909,18 @@ async def dashboard_config_page(request: Request) -> HTMLResponse:
"title": "Arbitrade Configuration",
"request": request,
"config_endpoint": "/dashboard/control/config",
**_dashboard_config_context(request),
**d_context,
},
)
@router.get("/dashboard/fragment/config", response_class=HTMLResponse)
async def dashboard_config_fragment(request: Request) -> HTMLResponse:
d_context = await _dashboard_config_context(request)
return templates.TemplateResponse(
request=request,
name="partials/config.html",
context={"request": request, **_dashboard_config_context(request)},
context={"request": request, **d_context},
)
@@ -932,7 +931,7 @@ async def dashboard_alert_status(request: Request) -> JSONResponse:
@router.get("/dashboard/api/audit/recent", response_class=JSONResponse)
async def dashboard_audit_recent(request: Request) -> JSONResponse:
return JSONResponse(_dashboard_audit(request, limit=25))
return JSONResponse(await _dashboard_audit(request, limit=25))
@router.get("/dashboard/api/backtesting/reports", response_class=JSONResponse)
@@ -940,7 +939,7 @@ async def dashboard_backtesting_reports(request: Request) -> JSONResponse:
return JSONResponse(
{
"generated_at": datetime.now(UTC).isoformat(),
"reports": _recent_backtest_reports(request),
"reports": await _recent_backtest_reports(request),
}
)
@@ -965,9 +964,10 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
try:
custom_fee_rate = (
float(defaults["custom_fee_rate"]) if defaults["custom_fee_rate"].strip() else None
float(defaults["custom_fee_rate"]
) if defaults["custom_fee_rate"].strip() else None
)
fee_rate = _fee_rate_for_profile(defaults["fee_profile"], custom_fee_rate, request=request)
fee_rate = await _fee_rate_for_profile(defaults["fee_profile"], custom_fee_rate, request=request)
config_dict: dict[str, object] = {
"source": defaults["source"],
@@ -986,13 +986,13 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
store = request.app.state.store
repo = BacktestJobRepository(store)
events_label = defaults["symbols"] if defaults["symbols"] else "DB-sourced"
job = repo.create_job(events_label, config_dict)
job = await repo.create_job(events_label, config_dict)
msg_job = job.id[:8] if job.id else "unknown"
queue = request.app.state.backtest_queue
await queue.put((job.id or "", config_dict))
_record_audit(
await _record_audit(
request,
actor="dashboard_user",
event_type="dashboard.backtesting.submit",
@@ -1000,14 +1000,14 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
payload={"job_id": job.id, "source": defaults["source"]},
)
context = _backtesting_panel_context(
context = await _backtesting_panel_context(
request,
status="submitted",
message=f"Job {msg_job}... queued. Refresh to see results.",
defaults=defaults,
)
except ValueError as exc:
context = _backtesting_panel_context(
context = await _backtesting_panel_context(
request,
status="failed",
message=str(exc),
@@ -1025,11 +1025,12 @@ async def dashboard_backtesting_run(request: Request) -> HTMLResponse:
async def dashboard_backtesting_delete(request: Request, job_id: str) -> HTMLResponse:
store = request.app.state.store
repo = BacktestJobRepository(store)
repo.delete_job(job_id)
context = await _backtesting_panel_context(request)
await repo.delete_job(job_id)
return templates.TemplateResponse(
request=request,
name="partials/backtesting_panel.html",
context={"request": request, **_backtesting_panel_context(request)},
context={"request": request, **context},
)
@@ -1037,7 +1038,7 @@ async def dashboard_backtesting_delete(request: Request, job_id: str) -> HTMLRes
async def dashboard_backtesting_job_detail(request: Request, job_id: str) -> HTMLResponse:
store = request.app.state.store
repo = BacktestJobRepository(store)
job = repo.get_job(job_id)
job = await repo.get_job(job_id)
if job is None:
return HTMLResponse("<p>Job not found</p>", status_code=404)
@@ -1069,7 +1070,7 @@ async def dashboard_backtesting_job_detail(request: Request, job_id: str) -> HTM
async def dashboard_backtesting_export(request: Request, job_id: str) -> Response:
store = request.app.state.store
repo = BacktestJobRepository(store)
job = repo.get_job(job_id)
job = await repo.get_job(job_id)
if job is None:
return Response("Job not found", status_code=404)
@@ -1087,7 +1088,8 @@ async def dashboard_backtesting_export(request: Request, job_id: str) -> Respons
return Response(
content=orjson.dumps(payload).decode("utf-8"),
media_type="application/x-jsonlines",
headers={"Content-Disposition": f"attachment; filename=backtest_{job_id[:8]}.jsonl"},
headers={
"Content-Disposition": f"attachment; filename=backtest_{job_id[:8]}.jsonl"},
)
@@ -1104,7 +1106,7 @@ async def dashboard_control_start(request: Request) -> HTMLResponse:
title="Execution started",
message="Dashboard control started execution.",
)
_record_audit(
await _record_audit(
request,
actor="dashboard_user",
event_type="dashboard.control.start",
@@ -1131,7 +1133,7 @@ async def dashboard_control_stop(request: Request) -> HTMLResponse:
title="Execution stopped",
message="Dashboard control stopped execution.",
)
_record_audit(
await _record_audit(
request,
actor="dashboard_user",
event_type="dashboard.control.stop",
@@ -1162,7 +1164,7 @@ async def dashboard_control_kill_switch(request: Request) -> HTMLResponse:
message="Kill switch triggered from dashboard control.",
details={"reason": reason},
)
_record_audit(
await _record_audit(
request,
actor="dashboard_user",
event_type="dashboard.control.kill_switch",
@@ -1232,7 +1234,7 @@ async def dashboard_control_config(request: Request) -> HTMLResponse:
"paper_trading_mode": "true" if rs.paper_trading_mode else "false",
},
)
_record_audit(
await _record_audit(
request,
actor="dashboard_user",
event_type="dashboard.control.config",
@@ -1258,11 +1260,12 @@ async def dashboard_control_config(request: Request) -> HTMLResponse:
@router.get("/dashboard/stream/metrics")
async def dashboard_metrics_stream(request: Request) -> StreamingResponse:
metrics = await _dashboard_metrics(request)
fragment = (
templates.get_template("partials/metrics.html")
.render(
request=request,
**_dashboard_metrics(request),
**metrics,
)
.strip()
.replace("\n", "")
@@ -1277,9 +1280,10 @@ async def dashboard_metrics_stream(request: Request) -> StreamingResponse:
@router.get("/dashboard/stream/overview")
async def dashboard_overview_stream(request: Request) -> StreamingResponse:
overview = await _dashboard_overview(request)
fragment = (
templates.get_template("partials/overview.html")
.render(request=request, **_dashboard_overview(request))
.render(request=request, **overview)
.strip()
.replace("\n", "")
)
@@ -1316,7 +1320,7 @@ async def dashboard_api_pairings(
) -> JSONResponse:
"""List pairings with optional filters."""
repo = _pairing_repo(request)
pairings = repo.list_pairings()
pairings = await repo.list_pairings()
# Apply filters
if search:
@@ -1332,9 +1336,11 @@ async def dashboard_api_pairings(
if source:
pairings = [p for p in pairings if p.source.lower() == source.lower()]
if base:
pairings = [p for p in pairings if p.base_asset.lower() == base.lower()]
pairings = [p for p in pairings if p.base_asset.lower() ==
base.lower()]
if quote:
pairings = [p for p in pairings if p.quote_asset.lower() == quote.lower()]
pairings = [p for p in pairings if p.quote_asset.lower() ==
quote.lower()]
# Sort
reverse = order.lower() == "desc"
@@ -1370,7 +1376,7 @@ async def dashboard_pairings_fragment(
) -> HTMLResponse:
"""HTMX fragment: pairing table for config page."""
repo = _pairing_repo(request)
pairings = repo.list_pairings()
pairings = await repo.list_pairings()
# Apply search filter
if search:
@@ -1406,7 +1412,7 @@ async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse:
return HTMLResponse("Missing base_asset or quote_asset", status_code=400)
repo = _pairing_repo(request)
existing = repo.get_pairing(base_asset, quote_asset)
existing = await repo.get_pairing(base_asset, quote_asset)
if existing is None:
return HTMLResponse("Pairing not found", status_code=404)
@@ -1416,9 +1422,9 @@ async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse:
enabled=not existing.enabled,
source=existing.source,
)
repo.update_pairing(base_asset, quote_asset, toggled)
await repo.update_pairing(base_asset, quote_asset, toggled)
_record_audit(
await _record_audit(
request,
actor="dashboard_user",
event_type="dashboard.pairings.toggle",
@@ -1432,7 +1438,7 @@ async def dashboard_api_pairings_toggle(request: Request) -> HTMLResponse:
# Return refreshed fragment
pairings_repo = _pairing_repo(request)
pairings = pairings_repo.list_pairings()
pairings = await pairings_repo.list_pairings()
pairings.sort(key=lambda p: (p.base_asset, p.quote_asset))
return templates.TemplateResponse(
request=request,
@@ -1448,7 +1454,7 @@ async def dashboard_api_pairings_sync(request: Request) -> JSONResponse:
store = request.app.state.store
summary = await sync_pairings_from_kraken(kraken_client, store)
_record_audit(
await _record_audit(
request,
actor="dashboard_user",
event_type="dashboard.pairings.sync",
+11 -10
View File
@@ -17,6 +17,7 @@ from arbitrade.execution.sequencer import TriangularExecutionSequencer
from arbitrade.market_data.order_book import OrderBook
from arbitrade.risk.pre_trade import PreTradeValidator
from arbitrade.risk.trade_limits import TradeLimitsGuard
from arbitrade.storage.pg_store import PgStore
@dataclass(slots=True)
@@ -185,8 +186,8 @@ def load_replay_events(path: Path) -> list[ReplayBookEvent]:
return sorted(events, key=lambda event: event.occurred_at)
def load_replay_events_from_db(
store: object,
async def load_replay_events_from_db(
store: PgStore,
*,
symbols: list[str] | None = None,
start: datetime | None = None,
@@ -197,32 +198,32 @@ def load_replay_events_from_db(
Each market_snapshots row has snapshot_at, symbol, payload (raw Kraken WS).
Payload format: {channel, symbol, data: [{bids: [{price, qty}], asks: [{price, qty}]}]}
"""
with store.connect() as conn: # type: ignore
async with store.pool.acquire() as conn:
query = "SELECT snapshot_at, symbol, payload FROM market_snapshots WHERE 1=1"
params: list[object] = []
if symbols:
placeholders = ",".join("?" for _ in symbols)
placeholders = ",".join(f"${i+1}" for i in range(len(symbols)))
query += f" AND symbol IN ({placeholders})"
params.extend(symbols)
if start is not None:
query += " AND snapshot_at >= ?"
params.append(start)
query += f" AND snapshot_at >= ${len(params)}"
if end is not None:
query += " AND snapshot_at <= ?"
params.append(end)
query += f" AND snapshot_at <= ${len(params)}"
query += " ORDER BY snapshot_at ASC"
rows = conn.execute(query, params).fetchall()
rows = await conn.fetch(query, *params)
events: list[ReplayBookEvent] = []
for row in rows:
snapshot_at: datetime = row[0]
symbol: str = row[1]
payload_raw = row[2]
snapshot_at: datetime = row["snapshot_at"]
symbol: str = row["symbol"]
payload_raw = row["payload"]
if isinstance(payload_raw, str):
payload = orjson.loads(payload_raw)
+8 -8
View File
@@ -15,7 +15,7 @@ from arbitrade.backtesting.replay import (
load_replay_events_from_db,
)
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import BacktestJobRepository
_LOG = structlog.get_logger(__name__)
@@ -50,11 +50,11 @@ def _parse_balances(raw: str) -> dict[str, float]:
async def run_backtest_job(
job_id: str,
config_dict: dict[str, object] | None,
store: DuckDBStore,
store: PgStore,
) -> None:
"""Execute a single backtest job: load events from DB or file, run engine, store report."""
repo = BacktestJobRepository(store)
repo.update_status(job_id, "running")
await repo.update_status(job_id, "running")
_LOG.info("backtest_job_started", job_id=job_id)
try:
@@ -79,7 +79,7 @@ async def run_backtest_job(
elif isinstance(symbols_raw, list):
symbols = [str(s).upper() for s in symbols_raw]
events = load_replay_events_from_db(
events = await load_replay_events_from_db(
store,
symbols=symbols,
start=start_dt,
@@ -141,18 +141,18 @@ async def run_backtest_job(
"finished_at": report.finished_at.isoformat(),
}
repo.store_report(job_id, report_dict)
repo.update_status(job_id, "completed")
await repo.store_report(job_id, report_dict)
await repo.update_status(job_id, "completed")
_LOG.info("backtest_job_completed", job_id=job_id, pnl=report.realized_pnl_usd)
except Exception as exc:
repo.update_status(job_id, "failed", error=str(exc))
await repo.update_status(job_id, "failed", error=str(exc))
_LOG.exception("backtest_job_failed", job_id=job_id, error=str(exc))
async def backtest_worker(
queue: asyncio.Queue[tuple[str, dict[str, object] | None] | None],
store: DuckDBStore,
store: PgStore,
) -> None:
"""Worker coroutine: pull jobs from queue and execute them one at a time."""
_LOG.info("backtest_worker_started")
+5 -5
View File
@@ -9,7 +9,7 @@ import structlog
from arbitrade.config.service import ConfigPairing
from arbitrade.detection.graph import CurrencyGraph
from arbitrade.exchange.kraken_rest import KrakenRestClient
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import ConfigPairingRepository
_LOG = structlog.get_logger(__name__)
@@ -17,7 +17,7 @@ _LOG = structlog.get_logger(__name__)
async def sync_pairings_from_kraken(
kraken_client: KrakenRestClient,
store: DuckDBStore,
store: PgStore,
) -> dict[str, int]:
"""Fetch all asset pairs from Kraken and upsert into config_pairings.
@@ -37,7 +37,7 @@ async def sync_pairings_from_kraken(
if symbol in seen_symbols:
continue
seen_symbols.add(symbol)
existing = repo.get_pairing(base, quote)
existing = await repo.get_pairing(base, quote)
pairing = ConfigPairing(
base_asset=base,
quote_asset=quote,
@@ -45,7 +45,7 @@ async def sync_pairings_from_kraken(
source="kraken",
)
try:
repo.upsert_pairing(pairing)
await repo.upsert_pairing(pairing)
total += 1
if existing:
updated += 1
@@ -65,7 +65,7 @@ async def sync_pairings_from_kraken(
async def run_pairing_sync_loop(
kraken_client: KrakenRestClient,
store: DuckDBStore,
store: PgStore,
stop_event: asyncio.Event,
interval_seconds: int = 86400,
) -> None:
+17 -18
View File
@@ -7,7 +7,7 @@ import orjson
from pydantic import BaseModel
from arbitrade.config.settings import Settings
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
class ConfigSection(BaseModel):
@@ -49,16 +49,15 @@ class ConfigBacktestingDefaults(BaseModel):
class ConfigurationService:
"""Manages application configuration from environment and database sources."""
def __init__(self, settings: Settings, store: DuckDBStore, audit_repo: Any) -> None:
def __init__(self, settings: Settings, store: PgStore, audit_repo: Any) -> None:
self._settings = settings
self._store = store
self._audit_repo = audit_repo
self._config_version = 0
self._loaded_settings: dict[str, Any] = {}
self._last_updated_at: datetime | None = None
self._load_database_settings()
def _load_database_settings(self) -> None:
async def load_database_settings(self) -> None:
"""Load user settings from database and merge with defaults."""
# Import here to avoid circular imports
from arbitrade.storage.repositories import ConfigSettingRepository
@@ -66,7 +65,7 @@ class ConfigurationService:
setting_repo = ConfigSettingRepository(self._store)
# Load all settings from database
db_settings = setting_repo.list_settings()
db_settings = await setting_repo.list_settings()
# Convert to dictionary for easy access
for setting in db_settings:
@@ -116,7 +115,7 @@ class ConfigurationService:
"""Get the timestamp of the last configuration update."""
return self._last_updated_at
def is_config_outdated(self) -> bool:
async def is_config_outdated(self) -> bool:
"""Check if configuration has been updated since last load."""
# Import here to avoid circular imports
from arbitrade.storage.repositories import ConfigSettingRepository
@@ -124,7 +123,7 @@ class ConfigurationService:
setting_repo = ConfigSettingRepository(self._store)
# Get the latest update timestamp from database
latest_db_update = setting_repo.get_latest_updated_at()
latest_db_update = await setting_repo.get_latest_updated_at()
# Compare with our last loaded timestamp
if latest_db_update and self._last_updated_at:
@@ -133,15 +132,15 @@ class ConfigurationService:
return True
return False
def reload_if_changed(self) -> bool:
async def reload_if_changed(self) -> bool:
"""Reload configuration if it has been updated in the database."""
if self.is_config_outdated():
self._load_database_settings()
if await self.is_config_outdated():
await self.load_database_settings()
self._config_version += 1
return True
return False
def set_setting(self, key: str, value: Any, updated_by: str | None = None) -> None:
async def set_setting(self, key: str, value: Any, updated_by: str | None = None) -> None:
"""Set a configuration setting value and persist to database."""
# Import here to avoid circular imports
from arbitrade.storage.repositories import ConfigSettingRepository
@@ -183,13 +182,13 @@ class ConfigurationService:
)
# Check if setting exists
existing_setting = setting_repo.get_setting(key)
existing_setting = await setting_repo.get_setting(key)
if existing_setting:
# Update existing setting
updated_setting = setting_repo.update_setting(key, setting)
updated_setting = await setting_repo.update_setting(key, setting)
else:
# Create new setting
updated_setting = setting_repo.create_setting(setting)
updated_setting = await setting_repo.create_setting(setting)
# Update in-memory cache
self._loaded_settings[key] = value
@@ -211,18 +210,18 @@ class ConfigurationService:
return ConfigPairingRepository(self._store)
def list_pairings(self) -> list[ConfigPairing]:
async def list_pairings(self) -> list[ConfigPairing]:
"""List all currency pairings."""
r = self._pairing_repo() # type: ignore[no-untyped-call]
p = r.list_pairings()
p = await r.list_pairings()
return p # type: ignore[no-any-return]
def create_pairing(
async def create_pairing(
self, base_asset: str, quote_asset: str, source: str = "manual"
) -> ConfigPairing:
"""Create a new currency pairing."""
r = self._pairing_repo() # type: ignore[no-untyped-call]
e = r.get_pairing(base_asset, quote_asset)
e = await r.get_pairing(base_asset, quote_asset)
if e:
return e # type: ignore[no-any-return]
pairing = ConfigPairing(
+84 -40
View File
@@ -1,7 +1,6 @@
from __future__ import annotations
from functools import lru_cache
from pathlib import Path
from pydantic import Field, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -32,49 +31,78 @@ class Settings(BaseSettings):
)
alerts_enabled: bool = Field(default=True, alias="ALERTS_ENABLED")
alert_min_severity: str = Field(default="warning", alias="ALERT_MIN_SEVERITY")
alert_dedup_seconds: float = Field(default=30.0, alias="ALERT_DEDUP_SECONDS")
alert_on_trade_events: bool = Field(default=True, alias="ALERT_ON_TRADE_EVENTS")
alert_on_error_events: bool = Field(default=True, alias="ALERT_ON_ERROR_EVENTS")
alert_on_threshold_events: bool = Field(default=True, alias="ALERT_ON_THRESHOLD_EVENTS")
alert_on_system_events: bool = Field(default=True, alias="ALERT_ON_SYSTEM_EVENTS")
alert_min_severity: str = Field(
default="warning", alias="ALERT_MIN_SEVERITY")
alert_dedup_seconds: float = Field(
default=30.0, alias="ALERT_DEDUP_SECONDS")
alert_on_trade_events: bool = Field(
default=True, alias="ALERT_ON_TRADE_EVENTS")
alert_on_error_events: bool = Field(
default=True, alias="ALERT_ON_ERROR_EVENTS")
alert_on_threshold_events: bool = Field(
default=True, alias="ALERT_ON_THRESHOLD_EVENTS")
alert_on_system_events: bool = Field(
default=True, alias="ALERT_ON_SYSTEM_EVENTS")
telegram_alerts_enabled: bool = Field(default=False, alias="TELEGRAM_ALERTS_ENABLED")
telegram_bot_token: str | None = Field(default=None, alias="TELEGRAM_BOT_TOKEN")
telegram_chat_id: str | None = Field(default=None, alias="TELEGRAM_CHAT_ID")
telegram_alerts_enabled: bool = Field(
default=False, alias="TELEGRAM_ALERTS_ENABLED")
telegram_bot_token: str | None = Field(
default=None, alias="TELEGRAM_BOT_TOKEN")
telegram_chat_id: str | None = Field(
default=None, alias="TELEGRAM_CHAT_ID")
discord_alerts_enabled: bool = Field(default=False, alias="DISCORD_ALERTS_ENABLED")
discord_webhook_url: str | None = Field(default=None, alias="DISCORD_WEBHOOK_URL")
discord_alerts_enabled: bool = Field(
default=False, alias="DISCORD_ALERTS_ENABLED")
discord_webhook_url: str | None = Field(
default=None, alias="DISCORD_WEBHOOK_URL")
email_alerts_enabled: bool = Field(default=False, alias="EMAIL_ALERTS_ENABLED")
email_alerts_enabled: bool = Field(
default=False, alias="EMAIL_ALERTS_ENABLED")
email_smtp_host: str | None = Field(default=None, alias="EMAIL_SMTP_HOST")
email_smtp_port: int = Field(default=587, alias="EMAIL_SMTP_PORT")
email_smtp_username: str | None = Field(default=None, alias="EMAIL_SMTP_USERNAME")
email_smtp_password: str | None = Field(default=None, alias="EMAIL_SMTP_PASSWORD")
email_alert_from: str | None = Field(default=None, alias="EMAIL_ALERT_FROM")
email_smtp_username: str | None = Field(
default=None, alias="EMAIL_SMTP_USERNAME")
email_smtp_password: str | None = Field(
default=None, alias="EMAIL_SMTP_PASSWORD")
email_alert_from: str | None = Field(
default=None, alias="EMAIL_ALERT_FROM")
email_alert_to: str | None = Field(default=None, alias="EMAIL_ALERT_TO")
email_smtp_use_tls: bool = Field(default=True, alias="EMAIL_SMTP_USE_TLS")
duckdb_path: Path = Field(default=Path("./data/arbitrade.duckdb"), alias="DUCKDB_PATH")
# PostgreSQL connection settings
pg_host: str = Field(default="192.168.88.35", alias="PG_HOST")
pg_port: int = Field(default=5432, alias="PG_PORT")
pg_database: str = Field(default="arbitrade", alias="PG_DATABASE")
pg_user: str = Field(default="arbitrade", alias="PG_USER")
pg_password: str = Field(default="arbitrade", alias="PG_PASSWORD")
pg_min_connections: int = Field(default=2, alias="PG_MIN_CONNECTIONS")
pg_max_connections: int = Field(default=10, alias="PG_MAX_CONNECTIONS")
kraken_rest_url: str = Field(default="https://api.kraken.com", alias="KRAKEN_REST_URL")
kraken_ws_url: str = Field(default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL")
kraken_rest_url: str = Field(
default="https://api.kraken.com", alias="KRAKEN_REST_URL")
kraken_ws_url: str = Field(
default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL")
kraken_private_rate_limit_seconds: float = Field(
default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS"
)
kraken_http_timeout_seconds: float = Field(default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS")
kraken_retry_attempts: int = Field(default=3, alias="KRAKEN_RETRY_ATTEMPTS")
kraken_http_timeout_seconds: float = Field(
default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS")
kraken_retry_attempts: int = Field(
default=3, alias="KRAKEN_RETRY_ATTEMPTS")
kraken_retry_base_delay_seconds: float = Field(
default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS"
)
kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY")
kraken_api_secret: str | None = Field(default=None, alias="KRAKEN_API_SECRET")
kraken_api_secret: str | None = Field(
default=None, alias="KRAKEN_API_SECRET")
kraken_api_key_permissions: str = Field(
default="query,trade",
alias="KRAKEN_API_KEY_PERMISSIONS",
)
ws_heartbeat_timeout_seconds: float = Field(default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS")
ws_max_staleness_seconds: float = Field(default=5.0, alias="WS_MAX_STALENESS_SECONDS")
ws_heartbeat_timeout_seconds: float = Field(
default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS")
ws_max_staleness_seconds: float = Field(
default=5.0, alias="WS_MAX_STALENESS_SECONDS")
strategy_enable_stat_arb_experiment: bool = Field(
default=False,
alias="STRATEGY_ENABLE_STAT_ARB_EXPERIMENT",
@@ -97,20 +125,29 @@ class Settings(BaseSettings):
)
paper_trading_mode: bool = Field(default=True, alias="PAPER_TRADING_MODE")
trade_capital_usd: float = Field(default=100.0, alias="TRADE_CAPITAL_USD")
max_trade_capital_usd: float = Field(default=100.0, alias="MAX_TRADE_CAPITAL_USD")
max_concurrent_trades: int | None = Field(default=None, alias="MAX_CONCURRENT_TRADES")
max_trade_capital_usd: float = Field(
default=100.0, alias="MAX_TRADE_CAPITAL_USD")
max_concurrent_trades: int | None = Field(
default=None, alias="MAX_CONCURRENT_TRADES")
max_exposure_per_asset_usd: float | None = Field(
default=None,
alias="MAX_EXPOSURE_PER_ASSET_USD",
)
quote_balance_asset: str = Field(default="USD", alias="QUOTE_BALANCE_ASSET")
min_order_size_usd: float | None = Field(default=None, alias="MIN_ORDER_SIZE_USD")
quote_balance_asset: str = Field(
default="USD", alias="QUOTE_BALANCE_ASSET")
min_order_size_usd: float | None = Field(
default=None, alias="MIN_ORDER_SIZE_USD")
kill_switch_active: bool = Field(default=False, alias="KILL_SWITCH_ACTIVE")
daily_loss_limit_usd: float | None = Field(default=None, alias="DAILY_LOSS_LIMIT_USD")
cumulative_loss_limit_usd: float | None = Field(default=None, alias="CUMULATIVE_LOSS_LIMIT_USD")
max_source_latency_ms: float | None = Field(default=None, alias="MAX_SOURCE_LATENCY_MS")
max_apply_latency_ms: float | None = Field(default=None, alias="MAX_APPLY_LATENCY_MS")
max_consecutive_failures: int | None = Field(default=None, alias="MAX_CONSECUTIVE_FAILURES")
daily_loss_limit_usd: float | None = Field(
default=None, alias="DAILY_LOSS_LIMIT_USD")
cumulative_loss_limit_usd: float | None = Field(
default=None, alias="CUMULATIVE_LOSS_LIMIT_USD")
max_source_latency_ms: float | None = Field(
default=None, alias="MAX_SOURCE_LATENCY_MS")
max_apply_latency_ms: float | None = Field(
default=None, alias="MAX_APPLY_LATENCY_MS")
max_consecutive_failures: int | None = Field(
default=None, alias="MAX_CONSECUTIVE_FAILURES")
fernet_key: str | None = Field(default=None, alias="FERNET_KEY")
@@ -127,7 +164,8 @@ class Settings(BaseSettings):
def _validate_log_level(cls, value: str) -> str:
normalized = value.strip().upper()
if normalized not in {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}:
raise ValueError("LOG_LEVEL must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL")
raise ValueError(
"LOG_LEVEL must be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL")
return normalized
@field_validator("alert_min_severity")
@@ -135,16 +173,19 @@ class Settings(BaseSettings):
def _validate_alert_severity(cls, value: str) -> str:
normalized = value.strip().lower()
if normalized not in {"info", "warning", "error", "critical"}:
raise ValueError("ALERT_MIN_SEVERITY must be one of: info, warning, error, critical")
raise ValueError(
"ALERT_MIN_SEVERITY must be one of: info, warning, error, critical")
return normalized
@model_validator(mode="after")
def _validate_security_constraints(self) -> Settings:
if bool(self.dashboard_auth_username) ^ bool(self.dashboard_auth_password):
raise ValueError("dashboard auth requires both username and password")
raise ValueError(
"dashboard auth requires both username and password")
if bool(self.kraken_api_key) ^ bool(self.kraken_api_secret):
raise ValueError("Kraken API auth requires both API key and secret")
raise ValueError(
"Kraken API auth requires both API key and secret")
permissions = {
token.strip().lower()
@@ -152,9 +193,11 @@ class Settings(BaseSettings):
if token.strip()
}
if permissions and ("query" not in permissions or "trade" not in permissions):
raise ValueError("KRAKEN_API_KEY_PERMISSIONS must include query and trade")
raise ValueError(
"KRAKEN_API_KEY_PERMISSIONS must include query and trade")
if "withdraw" in permissions or "withdrawals" in permissions:
raise ValueError("KRAKEN_API_KEY_PERMISSIONS must not include withdrawal scope")
raise ValueError(
"KRAKEN_API_KEY_PERMISSIONS must not include withdrawal scope")
if self.alert_dedup_seconds < 0.0:
raise ValueError("ALERT_DEDUP_SECONDS must be >= 0")
@@ -170,7 +213,8 @@ class Settings(BaseSettings):
"STRATEGY_STAT_ARB_ENTRY_ZSCORE must be greater than STRATEGY_STAT_ARB_EXIT_ZSCORE"
)
if self.strategy_stat_arb_max_holding_seconds <= 0.0:
raise ValueError("STRATEGY_STAT_ARB_MAX_HOLDING_SECONDS must be > 0")
raise ValueError(
"STRATEGY_STAT_ARB_MAX_HOLDING_SECONDS must be > 0")
return self
+23 -18
View File
@@ -9,7 +9,7 @@ import orjson
import structlog
from arbitrade.exchange.kraken_rest import KrakenRestClient
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import (
KrakenAccountSnapshot,
KrakenAccountSnapshotRepository,
@@ -22,7 +22,7 @@ _FEE_REFRESH_INTERVAL_SECONDS = 86400 # 1 day
async def fetch_and_store_account_snapshot(
client: KrakenRestClient,
store: DuckDBStore,
store: PgStore,
) -> KrakenAccountSnapshot | None:
"""Query TradeVolume + TradeBalance, persist as snapshot.
@@ -42,9 +42,12 @@ async def fetch_and_store_account_snapshot(
_LOG.exception("trade_balance_fetch_failed")
return None
fee_tier = volume_data.get("fee_tier") if isinstance(volume_data, dict) else None
fees_dict = volume_data.get("fees") if isinstance(volume_data, dict) else None
fees_maker = volume_data.get("fees_maker") if isinstance(volume_data, dict) else None
fee_tier = volume_data.get("fee_tier") if isinstance(
volume_data, dict) else None
fees_dict = volume_data.get("fees") if isinstance(
volume_data, dict) else None
fees_maker = volume_data.get("fees_maker") if isinstance(
volume_data, dict) else None
currency = volume_data.get("currency")
thirty_day_volume_str = volume_data.get("volume")
@@ -70,7 +73,8 @@ async def fetch_and_store_account_snapshot(
if currency is not None:
fee_schedule["currency"] = currency
thirty_day_volume = float(thirty_day_volume_str) if thirty_day_volume_str is not None else None
thirty_day_volume = float(
thirty_day_volume_str) if thirty_day_volume_str is not None else None
snapshot = KrakenAccountSnapshot(
snapshot_at=datetime.now(UTC),
@@ -78,11 +82,12 @@ async def fetch_and_store_account_snapshot(
maker_fee=maker_fee,
taker_fee=taker_fee,
thirty_day_volume=thirty_day_volume,
trade_balance_raw=balance_data if isinstance(balance_data, dict) else None,
trade_balance_raw=balance_data if isinstance(
balance_data, dict) else None,
fee_schedule_raw=fee_schedule if fee_schedule else None,
)
repo.insert_snapshot(snapshot)
await repo.insert_snapshot(snapshot)
_LOG.info(
"account_snapshot_stored",
fee_tier=fee_tier_str,
@@ -97,15 +102,14 @@ async def fetch_and_store_account_snapshot(
if isinstance(balance_data, dict):
eb = balance_data.get("eb")
total_value = float(eb) if eb is not None else 0.0
with store.connect() as conn:
conn.execute(
async with store.pool.acquire() as conn:
await conn.execute(
"INSERT INTO portfolio_snapshots"
" (snapshot_at, balances, total_value_usd) VALUES (?, ?, ?)",
(
datetime.now(UTC),
orjson.dumps(wallet_balances).decode("utf-8") if wallet_balances else None,
total_value,
),
" (snapshot_at, balances, total_value_usd) VALUES ($1, $2, $3)",
datetime.now(UTC),
orjson.dumps(wallet_balances).decode(
"utf-8") if wallet_balances else None,
total_value,
)
_LOG.info("portfolio_snapshot_stored", total_value_usd=total_value)
except Exception:
@@ -116,14 +120,15 @@ async def fetch_and_store_account_snapshot(
async def run_fee_sync_loop(
client: KrakenRestClient,
store: DuckDBStore,
store: PgStore,
stop_event: asyncio.Event,
) -> None:
"""Periodic loop: fetch account snapshot every hour.
Runs until stop_event is set.
"""
_LOG.info("fee_sync_loop_started", interval_s=_FEE_REFRESH_INTERVAL_SECONDS)
_LOG.info("fee_sync_loop_started",
interval_s=_FEE_REFRESH_INTERVAL_SECONDS)
while not stop_event.is_set():
try:
+6 -6
View File
@@ -6,14 +6,14 @@ import structlog
from arbitrade.detection.engine import IncrementalCycleDetector
from arbitrade.detection.graph import CurrencyGraph
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import ConfigPairingRepository
_LOG = structlog.get_logger(__name__)
def build_detector_from_enabled_pairings(
store: DuckDBStore,
async def build_detector_from_enabled_pairings(
store: PgStore,
*,
fee_rate: float = 0.0,
max_depth_levels: int = 10,
@@ -24,7 +24,7 @@ def build_detector_from_enabled_pairings(
Returns None if no enabled pairings exist.
"""
repo = ConfigPairingRepository(store)
pairings = repo.list_pairings(enabled_only=True)
pairings = await repo.list_pairings(enabled_only=True)
if not pairings:
_LOG.warning("no_enabled_pairings_found_detector_not_created")
return None
@@ -55,8 +55,8 @@ def build_detector_from_enabled_pairings(
)
def get_enabled_pair_symbols(store: DuckDBStore) -> list[str]:
async def get_enabled_pair_symbols(store: PgStore) -> list[str]:
"""Return list of enabled pair symbols (e.g. ['BTC/USD', 'ETH/BTC'])."""
repo = ConfigPairingRepository(store)
pairings = repo.list_pairings(enabled_only=True)
pairings = await repo.list_pairings(enabled_only=True)
return [f"{p.base_asset}/{p.quote_asset}" for p in pairings if p.enabled]
+36 -34
View File
@@ -3,7 +3,7 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
@dataclass(frozen=True, slots=True)
@@ -19,57 +19,55 @@ class PerformanceMetrics:
class MetricsCalculator:
def __init__(self, store: DuckDBStore) -> None:
def __init__(self, store: PgStore) -> None:
self._store = store
def compute(self) -> PerformanceMetrics:
with self._store.connect() as conn:
tm = conn.execute("""
async def compute(self) -> PerformanceMetrics:
async with self._store.pool.acquire() as conn:
tm = await conn.fetchrow("""
SELECT
COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) AS realized_pnl_usd,
COUNT(*) AS total_trades,
SUM(CASE WHEN realized_pnl > 0 THEN 1 ELSE 0 END) AS winning_trades,
AVG(EPOCH(finished_at) - EPOCH(started_at)) AS avg_trade_duration_seconds,
quantile_cont(
EPOCH(finished_at) - EPOCH(started_at),
0.50
) AS latency_p50_seconds,
quantile_cont(
EPOCH(finished_at) - EPOCH(started_at),
0.95
) AS latency_p95_seconds,
quantile_cont(
EPOCH(finished_at) - EPOCH(started_at),
0.99
) AS latency_p99_seconds
AVG(EXTRACT(EPOCH FROM finished_at - started_at)) AS avg_trade_duration_seconds,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM finished_at - started_at)) AS latency_p50_seconds,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM finished_at - started_at)) AS latency_p95_seconds,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM finished_at - started_at)) AS latency_p99_seconds
FROM trades
WHERE finished_at IS NOT NULL
""").fetchone()
""")
om = conn.execute("""
om = await conn.fetchrow("""
SELECT
COUNT(*) AS opportunity_count,
MIN(detected_at) AS first_detected_at,
MAX(detected_at) AS last_detected_at
FROM opportunities
""").fetchone()
""")
fm = conn.execute("""
fm = await conn.fetchrow("""
SELECT AVG(filled_volume / volume) AS fill_rate
FROM orders
WHERE volume > 0 AND filled_volume IS NOT NULL
""").fetchone()
""")
r_pnl_usd = float(tm[0]) if tm and tm[0] is not None else 0.0
tt = int(tm[1]) if tm and tm[1] is not None else 0
wt = int(tm[2]) if tm and tm[2] is not None else 0
r_pnl_usd = float(
tm["realized_pnl_usd"]) if tm and tm["realized_pnl_usd"] is not None else 0.0
tt = int(tm["total_trades"]
) if tm and tm["total_trades"] is not None else 0
wt = int(tm["winning_trades"]
) if tm and tm["winning_trades"] is not None else 0
wr = wt / tt if tt > 0 else None
atd = float(tm[3]) if tm and tm[3] is not None else None
atd = float(tm["avg_trade_duration_seconds"]
) if tm and tm["avg_trade_duration_seconds"] is not None else None
oc = int(om[0]) if om is not None and om[0] is not None else 0
fo = om[1] if om is not None and isinstance(om[1], datetime) else None
lo = om[2] if om is not None and isinstance(om[2], datetime) else None
oc = int(om["opportunity_count"]
) if om is not None and om["opportunity_count"] is not None else 0
fo = om["first_detected_at"] if om is not None and isinstance(
om["first_detected_at"], datetime) else None
lo = om["last_detected_at"] if om is not None and isinstance(
om["last_detected_at"], datetime) else None
opportunities_per_minute: float | None
if oc >= 2 and fo is not None and lo is not None:
@@ -82,11 +80,15 @@ class MetricsCalculator:
else:
opportunities_per_minute = None
fill_rate = float(fm[0]) if fm and fm[0] is not None else None
fill_rate = float(
fm["fill_rate"]) if fm and fm["fill_rate"] is not None else None
lp50 = float(tm[4]) if tm and tm[4] is not None else None
lp95 = float(tm[5]) if tm and tm[5] is not None else None
lp99 = float(tm[6]) if tm and tm[6] is not None else None
lp50 = float(tm["latency_p50_seconds"]
) if tm and tm["latency_p50_seconds"] is not None else None
lp95 = float(tm["latency_p95_seconds"]
) if tm and tm["latency_p95_seconds"] is not None else None
lp99 = float(tm["latency_p99_seconds"]
) if tm and tm["latency_p99_seconds"] is not None else None
return PerformanceMetrics(
realized_pnl_usd=r_pnl_usd,
+24 -24
View File
@@ -8,7 +8,7 @@ from typing import Any, cast
from fastapi import FastAPI
from arbitrade.api.control_state import DashboardControlState
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import (
AuditRecord,
AuditRepository,
@@ -29,8 +29,8 @@ def _controls(app: FastAPI) -> DashboardControlState:
return cast(DashboardControlState, app.state.dashboard_controls)
def _store(app: FastAPI) -> DuckDBStore:
return cast(DuckDBStore, app.state.store)
def _store(app: FastAPI) -> PgStore:
return cast(PgStore, app.state.store)
def _audit_repository(app: FastAPI) -> AuditRepository | None:
@@ -43,34 +43,34 @@ def _runtime_repository(app: FastAPI) -> RuntimeStateRepository | None:
return repository if isinstance(repository, RuntimeStateRepository) else None
def _open_trade_count(store: DuckDBStore) -> int:
with store.connect() as conn:
row = conn.execute("""
async def _open_trade_count(store: PgStore) -> int:
async with store.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT COUNT(*)
FROM trades
WHERE finished_at IS NULL
""").fetchone()
""")
return int(row[0]) if row is not None else 0
def _latest_balances(store: DuckDBStore) -> dict[str, Any] | None:
with store.connect() as conn:
row = conn.execute("""
async def _latest_balances(store: PgStore) -> dict[str, Any] | None:
async with store.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT balances
FROM portfolio_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
""")
if row is None or row[0] is None:
if row is None or row["balances"] is None:
return None
raw_balances = row[0]
raw_balances = row["balances"]
if isinstance(raw_balances, str):
return {"raw": raw_balances}
return {"raw": str(raw_balances)}
def _record_audit(
async def _record_audit(
app: FastAPI,
*,
event_type: str,
@@ -80,7 +80,7 @@ def _record_audit(
repository = _audit_repository(app)
if repository is None:
return
repository.insert(
await repository.insert(
AuditRecord(
occurred_at=datetime.now(UTC),
actor="runtime",
@@ -106,7 +106,7 @@ async def _run_startup_reconciler(app: FastAPI) -> None:
await result
def persist_runtime_snapshot(app: FastAPI, *, note: str | None = None) -> RuntimeStateRecord | None:
async def persist_runtime_snapshot(app: FastAPI, *, note: str | None = None) -> RuntimeStateRecord | None:
repository = _runtime_repository(app)
if repository is None:
return None
@@ -118,11 +118,11 @@ def persist_runtime_snapshot(app: FastAPI, *, note: str | None = None) -> Runtim
is_running=controls.is_running,
kill_switch_active=controls.kill_switch.is_active,
kill_switch_reason=controls.kill_switch.reason,
open_trade_count=_open_trade_count(store),
last_known_balances=_latest_balances(store),
open_trade_count=await _open_trade_count(store),
last_known_balances=await _latest_balances(store),
note=note,
)
repository.insert(snapshot)
await repository.insert(snapshot)
return snapshot
@@ -134,7 +134,7 @@ async def restore_runtime_state(app: FastAPI) -> RuntimeRecoveryReport:
restored_from_snapshot = False
snapshot_at: str | None = None
latest = repo.latest() if repo is not None else None
latest = await repo.latest() if repo is not None else None
if latest is not None:
restored_from_snapshot = True
snapshot_at = latest.snapshot_at.isoformat()
@@ -146,7 +146,7 @@ async def restore_runtime_state(app: FastAPI) -> RuntimeRecoveryReport:
ctl.kill_switch.deactivate()
ctl.mark_updated()
open_trades = _open_trade_count(store)
open_trades = await _open_trade_count(store)
restart_guard_active = False
if open_trades > 0:
ctl.is_running = False
@@ -163,7 +163,7 @@ async def restore_runtime_state(app: FastAPI) -> RuntimeRecoveryReport:
)
app.state.recovery_report = report
_record_audit(
await _record_audit(
app,
event_type="runtime.startup_recovery",
decision="applied",
@@ -212,7 +212,7 @@ async def graceful_shutdown(app: FastAPI) -> None:
controls.is_running = False
controls.mark_updated()
_record_audit(
await _record_audit(
app,
event_type="runtime.shutdown",
decision="initiated",
@@ -220,4 +220,4 @@ async def graceful_shutdown(app: FastAPI) -> None:
)
await drain_background_workers(app)
persist_runtime_snapshot(app, note="graceful_shutdown")
await persist_runtime_snapshot(app, note="graceful_shutdown")
-233
View File
@@ -1,233 +0,0 @@
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
import duckdb
import structlog
from arbitrade.config.settings import Settings
_LOG = structlog.get_logger(__name__)
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS config_sections (
id INTEGER PRIMARY KEY,
name VARCHAR UNIQUE NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS config_settings (
key VARCHAR PRIMARY KEY,
section VARCHAR NOT NULL,
value_json TEXT NOT NULL,
value_type VARCHAR NOT NULL,
is_secret BOOLEAN DEFAULT FALSE,
is_runtime_reloadable BOOLEAN DEFAULT FALSE,
updated_at TIMESTAMP DEFAULT current_timestamp,
updated_by VARCHAR
);
CREATE TABLE IF NOT EXISTS config_pairings (
id INTEGER PRIMARY KEY,
base_asset VARCHAR NOT NULL,
quote_asset VARCHAR NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
source VARCHAR NOT NULL,
created_at TIMESTAMP DEFAULT current_timestamp,
updated_at TIMESTAMP DEFAULT current_timestamp,
UNIQUE(base_asset, quote_asset)
);
CREATE TABLE IF NOT EXISTS config_backtesting_defaults (
id INTEGER PRIMARY KEY,
starting_balances JSON,
trade_capital DOUBLE,
min_profit_threshold DOUBLE,
slippage_bps INTEGER,
execution_latency_ms INTEGER,
fee_source VARCHAR DEFAULT 'api'
);
CREATE TABLE IF NOT EXISTS opportunities (
id UUID DEFAULT uuid(),
detected_at TIMESTAMP NOT NULL,
cycle VARCHAR NOT NULL,
gross_pct DOUBLE,
net_pct DOUBLE,
est_profit DOUBLE,
executed BOOLEAN DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS trades (
id UUID DEFAULT uuid(),
trade_ref VARCHAR NOT NULL,
started_at TIMESTAMP NOT NULL,
finished_at TIMESTAMP,
status VARCHAR NOT NULL,
realized_pnl DOUBLE,
estimated_pnl DOUBLE,
capital_used DOUBLE,
cycle VARCHAR,
leg_count INTEGER
);
CREATE TABLE IF NOT EXISTS orders (
id UUID DEFAULT uuid(),
trade_ref VARCHAR NOT NULL,
order_ref VARCHAR NOT NULL,
leg_index INTEGER NOT NULL,
pair VARCHAR NOT NULL,
side VARCHAR NOT NULL,
volume DOUBLE NOT NULL,
user_ref INTEGER,
status VARCHAR,
filled_volume DOUBLE,
avg_price DOUBLE,
raw_response JSON,
recorded_at TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS pnl_events (
id UUID DEFAULT uuid(),
trade_ref VARCHAR NOT NULL,
recorded_at TIMESTAMP NOT NULL,
kind VARCHAR NOT NULL,
pnl_usd DOUBLE NOT NULL,
source VARCHAR NOT NULL
);
CREATE TABLE IF NOT EXISTS portfolio_snapshots (
snapshot_at TIMESTAMP NOT NULL,
balances JSON,
total_value_usd DOUBLE
);
CREATE TABLE IF NOT EXISTS market_snapshots (
snapshot_at TIMESTAMP NOT NULL,
symbol VARCHAR NOT NULL,
source VARCHAR NOT NULL,
payload JSON NOT NULL,
latency_ms DOUBLE
);
CREATE TABLE IF NOT EXISTS audit_events (
id UUID DEFAULT uuid(),
occurred_at TIMESTAMP NOT NULL,
actor VARCHAR NOT NULL,
event_type VARCHAR NOT NULL,
decision VARCHAR NOT NULL,
payload JSON,
correlation_id VARCHAR
);
CREATE TABLE IF NOT EXISTS runtime_state_snapshots (
snapshot_at TIMESTAMP NOT NULL,
is_running BOOLEAN NOT NULL,
kill_switch_active BOOLEAN NOT NULL,
kill_switch_reason VARCHAR,
open_trade_count INTEGER NOT NULL,
last_known_balances JSON,
note VARCHAR
);
CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
snapshot_at TIMESTAMP NOT NULL,
fee_tier VARCHAR,
maker_fee DOUBLE,
taker_fee DOUBLE,
thirty_day_volume DOUBLE,
trade_balance_raw JSON,
fee_schedule_raw JSON
);
CREATE TABLE IF NOT EXISTS backtest_jobs (
id UUID DEFAULT uuid(),
status VARCHAR NOT NULL DEFAULT 'pending',
events_path VARCHAR NOT NULL,
config JSON,
report JSON,
error VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
started_at TIMESTAMP,
finished_at TIMESTAMP
);
"""
class DuckDBStore:
SCHEMA_VERSION = 5
def __init__(self, settings: Settings) -> None:
self._db_path = Path(settings.duckdb_path)
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._use_memory_fallback = False
@contextmanager
def connect(self) -> Iterator[duckdb.DuckDBPyConnection]:
try:
conn = duckdb.connect(str(self._db_path))
except duckdb.IOException:
if not self._use_memory_fallback:
_LOG.warning(
"duckdb_path_unavailable_falling_back_to_memory", path=str(self._db_path)
)
self._use_memory_fallback = True
conn = duckdb.connect(":memory:")
try:
yield conn
finally:
conn.close()
def _get_table_columns(self, conn: duckdb.DuckDBPyConnection, table_name: str) -> set[str]:
try:
rows = conn.execute(f"PRAGMA table_info({table_name})").fetchall()
return {str(row[1]) for row in rows}
except Exception:
return set()
def _table_exists(self, conn: duckdb.DuckDBPyConnection, table_name: str) -> bool:
try:
result = conn.execute(
f"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{table_name}'"
).fetchone()
count = result[0] if result else 0
return count > 0
except Exception:
return False
def _ensure_column(
self, conn: duckdb.DuckDBPyConnection, table_name: str, column_def: str
) -> None:
"""Add a column to a table if it doesn't already exist."""
existing = self._get_table_columns(conn, table_name)
col_name = column_def.split()[0]
if col_name not in existing:
conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_def}")
def migrate(self) -> None:
with self.connect() as conn:
# Run CREATE TABLE IF NOT EXISTS for all tables
conn.execute(SCHEMA_SQL)
# Ensure schema_migrations table exists and get current version
if not self._table_exists(conn, "schema_migrations"):
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT current_timestamp
)
""")
# Update version to current
conn.execute(
f"INSERT OR REPLACE INTO schema_migrations (version, applied_at) "
f"VALUES ({self.SCHEMA_VERSION}, current_timestamp)"
)
+5 -4
View File
@@ -36,7 +36,8 @@ class AsyncExecutionWriter:
async def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(self._run(), name="execution-writer")
self._task = asyncio.create_task(
self._run(), name="execution-writer")
async def stop(self) -> None:
self._stop.set()
@@ -55,11 +56,11 @@ class AsyncExecutionWriter:
try:
if isinstance(record, TradeRecord):
self._trade_repository.insert(record)
await self._trade_repository.insert(record)
elif isinstance(record, OrderRecord):
self._order_repository.insert(record)
await self._order_repository.insert(record)
else:
self._pnl_repository.insert(record)
await self._pnl_repository.insert(record)
except Exception as exc:
_LOG.error("execution_write_failed", error=str(exc))
finally:
+7 -4
View File
@@ -24,14 +24,16 @@ class MarketSnapshot:
class AsyncMarketSnapshotWriter:
def __init__(self, repository: MarketSnapshotRepository, max_queue_size: int = 50_000) -> None:
self._repository = repository
self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue(maxsize=max_queue_size)
self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue(
maxsize=max_queue_size)
self._task: asyncio.Task[None] | None = None
self._stop = asyncio.Event()
async def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(self._run(), name="market-snapshot-writer")
self._task = asyncio.create_task(
self._run(), name="market-snapshot-writer")
async def stop(self) -> None:
self._stop.set()
@@ -49,7 +51,7 @@ class AsyncMarketSnapshotWriter:
continue
try:
self._repository.insert(
await self._repository.insert(
MarketSnapshotRecord(
snapshot_at=item.snapshot_at,
symbol=item.symbol,
@@ -59,6 +61,7 @@ class AsyncMarketSnapshotWriter:
)
)
except Exception as exc:
_LOG.error("market_snapshot_write_failed", error=str(exc), symbol=item.symbol)
_LOG.error("market_snapshot_write_failed",
error=str(exc), symbol=item.symbol)
finally:
self._queue.task_done()
+5 -3
View File
@@ -13,14 +13,16 @@ _LOG = structlog.get_logger(__name__)
class AsyncOpportunityWriter:
def __init__(self, repository: OpportunityRepository, max_queue_size: int = 50_000) -> None:
self._repository = repository
self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue(maxsize=max_queue_size)
self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue(
maxsize=max_queue_size)
self._task: asyncio.Task[None] | None = None
self._stop = asyncio.Event()
async def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(self._run(), name="opportunity-writer")
self._task = asyncio.create_task(
self._run(), name="opportunity-writer")
async def stop(self) -> None:
self._stop.set()
@@ -38,7 +40,7 @@ class AsyncOpportunityWriter:
continue
try:
self._repository.insert(
await self._repository.insert(
OpportunityRecord(
detected_at=event.detected_at,
cycle=event.cycle,
+134
View File
@@ -0,0 +1,134 @@
"""PostgreSQL store — async connection pool wrapper around asyncpg."""
from __future__ import annotations
from pathlib import Path
import asyncpg
import structlog
from arbitrade.config.settings import Settings
_LOG = structlog.get_logger(__name__)
SCHEMA_VERSION = 1
class PgStore:
"""Async PostgreSQL connection pool for the arbitrade bot.
Wraps an ``asyncpg.Pool`` with schema migration support.
"""
def __init__(self, settings: Settings) -> None:
self._dsn: str | None = None
self._pool: asyncpg.Pool | None = None
self._settings = settings
# ── lifecycle ────────────────────────────────────────────────
async def start(self) -> None:
"""Create the connection pool."""
s = self._settings
self._pool = await asyncpg.create_pool(
host=s.pg_host,
port=s.pg_port,
database=s.pg_database,
user=s.pg_user,
password=s.pg_password,
min_size=s.pg_min_connections,
max_size=s.pg_max_connections,
)
_LOG.info(
"pg_pool_created",
host=s.pg_host,
database=s.pg_database,
min_size=s.pg_min_connections,
max_size=s.pg_max_connections,
)
async def stop(self) -> None:
"""Close the connection pool."""
if self._pool is not None:
await self._pool.close()
self._pool = None
_LOG.info("pg_pool_closed")
@property
def pool(self) -> asyncpg.Pool:
"""Return the underlying connection pool.
Raises ``RuntimeError`` if ``start()`` has not been called yet.
"""
if self._pool is None:
raise RuntimeError("PgStore not started — call start() first")
return self._pool
# ── schema migration ─────────────────────────────────────────
async def migrate(self) -> None:
"""Apply the PostgreSQL schema.
Reads ``schema_pg.sql`` from the same package directory and
executes it, then records the migration version.
"""
schema_path = Path(__file__).with_name("schema_pg.sql")
schema_sql = schema_path.read_text(encoding="utf-8")
async with self.pool.acquire() as conn:
# Apply the full schema (CREATE TABLE IF NOT EXISTS …)
await conn.execute(schema_sql)
# Record the current schema version
await conn.execute(
"""
INSERT INTO schema_migrations (version, applied_at)
VALUES ($1, CURRENT_TIMESTAMP)
ON CONFLICT (version) DO UPDATE SET applied_at = CURRENT_TIMESTAMP
""",
SCHEMA_VERSION,
)
_LOG.info("pg_schema_migrated", version=SCHEMA_VERSION)
# ── helpers ──────────────────────────────────────────────────
async def table_exists(self, table_name: str) -> bool:
"""Check if a table exists in the current schema."""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT COUNT(*) AS cnt
FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = $1
""",
table_name,
)
return bool(row and row["cnt"] > 0)
async def get_table_columns(self, table_name: str) -> set[str]:
"""Return the set of column names for *table_name*."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = $1
""",
table_name,
)
return {str(r["column_name"]) for r in rows}
async def ensure_column(self, table_name: str, column_def: str) -> None:
"""Add a column to *table_name* if it does not already exist.
``column_def`` should be something like ``"my_col VARCHAR"``.
"""
existing = await self.get_table_columns(table_name)
col_name = column_def.split()[0]
if col_name not in existing:
async with self.pool.acquire() as conn:
await conn.execute(
f"ALTER TABLE {table_name} ADD COLUMN {column_def}"
)
_LOG.info("pg_column_added", table=table_name, column=col_name)
File diff suppressed because it is too large Load Diff
+167
View File
@@ -0,0 +1,167 @@
-- PostgreSQL schema for arbitrade bot
-- Requires pgcrypto extension for gen_random_uuid()
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- ========================================
-- Schema version tracking
-- ========================================
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- ========================================
-- Configuration
-- ========================================
CREATE TABLE IF NOT EXISTS config_sections (
id SERIAL PRIMARY KEY,
name VARCHAR UNIQUE NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS config_settings (
key VARCHAR PRIMARY KEY,
section VARCHAR NOT NULL,
value_json TEXT NOT NULL,
value_type VARCHAR NOT NULL,
is_secret BOOLEAN DEFAULT FALSE,
is_runtime_reloadable BOOLEAN DEFAULT FALSE,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_by VARCHAR
);
CREATE TABLE IF NOT EXISTS config_pairings (
id SERIAL PRIMARY KEY,
base_asset VARCHAR NOT NULL,
quote_asset VARCHAR NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
source VARCHAR NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(base_asset, quote_asset)
);
CREATE TABLE IF NOT EXISTS config_backtesting_defaults (
id SERIAL PRIMARY KEY,
starting_balances JSONB,
trade_capital DOUBLE PRECISION,
min_profit_threshold DOUBLE PRECISION,
slippage_bps INTEGER,
execution_latency_ms INTEGER,
fee_source VARCHAR DEFAULT 'api'
);
-- ========================================
-- Detection & Execution
-- ========================================
CREATE TABLE IF NOT EXISTS opportunities (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
detected_at TIMESTAMP NOT NULL,
cycle VARCHAR NOT NULL,
gross_pct DOUBLE PRECISION,
net_pct DOUBLE PRECISION,
est_profit DOUBLE PRECISION,
executed BOOLEAN DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS trades (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
trade_ref VARCHAR NOT NULL,
started_at TIMESTAMP NOT NULL,
finished_at TIMESTAMP,
status VARCHAR NOT NULL,
realized_pnl DOUBLE PRECISION,
estimated_pnl DOUBLE PRECISION,
capital_used DOUBLE PRECISION,
cycle VARCHAR,
leg_count INTEGER
);
CREATE TABLE IF NOT EXISTS orders (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
trade_ref VARCHAR NOT NULL,
order_ref VARCHAR NOT NULL,
leg_index INTEGER NOT NULL,
pair VARCHAR NOT NULL,
side VARCHAR NOT NULL,
volume DOUBLE PRECISION NOT NULL,
user_ref INTEGER,
status VARCHAR,
filled_volume DOUBLE PRECISION,
avg_price DOUBLE PRECISION,
raw_response JSONB,
recorded_at TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS pnl_events (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
trade_ref VARCHAR NOT NULL,
recorded_at TIMESTAMP NOT NULL,
kind VARCHAR NOT NULL,
pnl_usd DOUBLE PRECISION NOT NULL,
source VARCHAR NOT NULL
);
-- ========================================
-- Snapshots & Monitoring
-- ========================================
CREATE TABLE IF NOT EXISTS portfolio_snapshots (
snapshot_at TIMESTAMP NOT NULL,
balances JSONB,
total_value_usd DOUBLE PRECISION
);
CREATE TABLE IF NOT EXISTS market_snapshots (
snapshot_at TIMESTAMP NOT NULL,
symbol VARCHAR NOT NULL,
source VARCHAR NOT NULL,
payload JSONB NOT NULL,
latency_ms DOUBLE PRECISION
);
CREATE TABLE IF NOT EXISTS audit_events (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
occurred_at TIMESTAMP NOT NULL,
actor VARCHAR NOT NULL,
event_type VARCHAR NOT NULL,
decision VARCHAR NOT NULL,
payload JSONB,
correlation_id VARCHAR
);
CREATE TABLE IF NOT EXISTS runtime_state_snapshots (
snapshot_at TIMESTAMP NOT NULL,
is_running BOOLEAN NOT NULL,
kill_switch_active BOOLEAN NOT NULL,
kill_switch_reason VARCHAR,
open_trade_count INTEGER NOT NULL,
last_known_balances JSONB,
note VARCHAR
);
CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
snapshot_at TIMESTAMP NOT NULL,
fee_tier VARCHAR,
maker_fee DOUBLE PRECISION,
taker_fee DOUBLE PRECISION,
thirty_day_volume DOUBLE PRECISION,
trade_balance_raw JSONB,
fee_schedule_raw JSONB
);
-- ========================================
-- Backtesting
-- ========================================
CREATE TABLE IF NOT EXISTS backtest_jobs (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
status VARCHAR NOT NULL DEFAULT 'pending',
events_path VARCHAR NOT NULL,
config JSONB,
report JSONB,
error VARCHAR,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
finished_at TIMESTAMP
);
@@ -7,7 +7,7 @@
<div class="chart-head">
<div>
<div class="label">Opportunity Trend</div>
<div class="meta">Recent opportunities from DuckDB. Updated {{ generated_at }}</div>
<div class="meta">Recent opportunities from PostgreSQL. Updated {{ generated_at }}</div>
</div>
<button type="button" class="button secondary" x-on:click="expanded = !expanded">
<span x-text="expanded ? 'Hide chart' : 'Show chart'"></span>
+1
View File
@@ -0,0 +1 @@
"""End-to-end tests — require full app startup with PostgreSQL."""
+1
View File
@@ -0,0 +1 @@
"""Integration tests for PostgreSQL schema and connectivity."""
+25
View File
@@ -0,0 +1,25 @@
"""pytest configuration for integration tests.
Integration tests require a live PostgreSQL server at the configured host.
They are skipped automatically if the server is unreachable.
"""
from __future__ import annotations
import pytest
def pytest_ignore_collect(path: str, config: pytest.Config) -> bool:
"""Skip integration tests unless --integration is passed."""
if "integration" in path and not config.getoption("--integration", False):
return True
return False
def pytest_addoption(parser: pytest.Parser) -> None:
parser.addoption(
"--integration",
action="store_true",
default=False,
help="Run integration tests (requires PostgreSQL)",
)
@@ -0,0 +1,52 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from datetime import UTC, datetime
import pytest
import pytest_asyncio
from arbitrade.config.settings import get_settings
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import AuditRecord, AuditRepository
pytestmark = pytest.mark.integration
@asynccontextmanager
async def _pg() -> AsyncIterator[PgStore]:
s = get_settings()
store = PgStore(s)
try:
await store.start()
await store.migrate()
yield store
finally:
await store.stop()
@pytest.mark.asyncio
async def test_audit_repository_inserts_and_lists_recent() -> None:
async with _pg() as store:
repository = AuditRepository(store)
await repository.insert(
AuditRecord(
occurred_at=datetime.now(UTC),
actor="dashboard_user",
event_type="dashboard.control.start",
decision="approved",
payload={"execution_status": "running"},
correlation_id="req-1",
)
)
recent = await repository.list_recent(limit=5)
assert len(recent) == 1
assert recent[0].actor == "dashboard_user"
assert recent[0].event_type == "dashboard.control.start"
assert recent[0].decision == "approved"
assert recent[0].payload == {"execution_status": "running"}
assert recent[0].correlation_id == "req-1"
@@ -0,0 +1,103 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import UTC, datetime
import pytest
from arbitrade.config.settings import get_settings
from arbitrade.detection.engine import OpportunityEvent
from arbitrade.execution.sequencer import TriangularExecutionSequencer
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.executions import AsyncExecutionWriter
from arbitrade.storage.repositories import OrderRepository, PnLRepository, TradeRepository
pytestmark = pytest.mark.integration
@dataclass(slots=True)
class _FakeRestClient:
calls: int = 0
async def place_market_order(self, *, pair: str, side: str, volume: float) -> dict[str, object]:
self.calls += 1
return {"txid": [f"tx-{self.calls}"], "status": "submitted"}
def _sample_event() -> OpportunityEvent:
return OpportunityEvent(
detected_at=datetime.now(UTC),
cycle="USD->BTC->ETH->USD",
updated_pair="BTC/USD",
gross_rate=1.04,
net_rate=1.03,
gross_pct=4.0,
net_pct=3.0,
est_profit=0.03,
)
@asynccontextmanager
async def _pg() -> AsyncIterator[PgStore]:
s = get_settings()
store = PgStore(s)
try:
await store.start()
await store.migrate()
yield store
finally:
await store.stop()
@pytest.mark.asyncio
async def test_execution_writer_persists_trade_order_and_pnl() -> None:
async with _pg() as store:
writer = AsyncExecutionWriter(
TradeRepository(store),
OrderRepository(store),
PnLRepository(store),
max_queue_size=10,
)
await writer.start()
client = _FakeRestClient()
sequencer = TriangularExecutionSequencer(
client,
available_pairs=["BTC/USD", "ETH/BTC", "ETH/USD"],
execution_writer=writer,
)
result = await sequencer.execute(_sample_event())
await writer.stop()
assert result.success
assert client.calls == 3
async with store.pool.acquire() as conn:
trades = await conn.fetch(
"SELECT trade_ref, status, estimated_pnl, capital_used, cycle, leg_count FROM trades"
)
orders = await conn.fetch(
"SELECT trade_ref, order_ref, leg_index, pair, side, volume, status "
"FROM orders ORDER BY leg_index"
)
pnls = await conn.fetch("SELECT trade_ref, kind, pnl_usd, source FROM pnl_events")
assert len(trades) == 1
assert trades[0]["status"] == "filled"
assert trades[0]["estimated_pnl"] == 0.03
assert trades[0]["capital_used"] == 1.0
assert trades[0]["cycle"] == "USD->BTC->ETH->USD"
assert trades[0]["leg_count"] == 3
assert len(orders) == 3
assert orders[0]["leg_index"] == 0
assert orders[1]["leg_index"] == 1
assert orders[2]["leg_index"] == 2
assert orders[0]["status"] == "submitted"
assert len(pnls) == 1
assert pnls[0]["kind"] == "estimated"
assert pnls[0]["pnl_usd"] == 0.03
+84
View File
@@ -0,0 +1,84 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from datetime import UTC, datetime, timedelta
import pytest
import pytest_asyncio
from arbitrade.config.settings import get_settings
from arbitrade.metrics import MetricsCalculator
from arbitrade.storage.pg_store import PgStore
pytestmark = pytest.mark.integration
@asynccontextmanager
async def _pg() -> AsyncIterator[PgStore]:
s = get_settings()
store = PgStore(s)
try:
await store.start()
await store.migrate()
yield store
finally:
await store.stop()
@pytest.mark.asyncio
async def test_metrics_calculator_summarizes_execution_data() -> None:
async with _pg() as store:
started = datetime.now(UTC)
finished = started + timedelta(seconds=30)
started_two = started + timedelta(minutes=1)
finished_two = started_two + timedelta(seconds=90)
async with store.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO trades (
trade_ref, started_at, finished_at, status,
realized_pnl, estimated_pnl, capital_used, cycle, leg_count
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9),
($10, $11, $12, $13, $14, $15, $16, $17, $18)
""",
"trade-1", started, finished, "filled", 12.5, 10.0, 100.0, "USD->BTC->ETH->USD", 3,
"trade-2", started_two, finished_two, "filled", -
4.5, -2.0, 200.0, "USD->ETH->BTC->USD", 3,
)
await conn.execute(
"""
INSERT INTO opportunities (detected_at, cycle, gross_pct, net_pct, est_profit, executed)
VALUES ($1, $2, $3, $4, $5, $6),
($7, $8, $9, $10, $11, $12),
($13, $14, $15, $16, $17, $18)
""",
started, "USD->BTC->ETH->USD", 4.0, 3.0, 0.03, True,
started_two, "USD->ETH->BTC->USD", 2.0, 1.0, 0.01, False,
started_two +
timedelta(
seconds=30), "USD->BTC->ETH->USD", 5.0, 4.0, 0.04, True,
)
await conn.execute(
"""
INSERT INTO orders (
trade_ref, order_ref, leg_index, pair, side, volume,
user_ref, status, filled_volume, avg_price, raw_response, recorded_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12),
($13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
""",
"trade-1", "order-1", 0, "BTC/USD", "buy", 2.0, 101, "closed", 2.0, 100.0, "{}", started,
"trade-2", "order-2", 0, "ETH/USD", "sell", 4.0, 202, "closed", 3.0, 200.0, "{}", started_two,
)
metrics = await MetricsCalculator(store).compute()
assert metrics.realized_pnl_usd == 8.0
assert metrics.win_rate == 0.5
assert metrics.avg_trade_duration_seconds == 60.0
assert metrics.opportunities_per_minute == 2.0
assert metrics.fill_rate == 0.875
assert metrics.latency_p50_seconds == 60.0
assert metrics.latency_p95_seconds == 87.0
assert metrics.latency_p99_seconds == pytest.approx(89.4)
@@ -0,0 +1,61 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from datetime import UTC, datetime
import pytest
from arbitrade.config.settings import get_settings
from arbitrade.detection.engine import OpportunityEvent
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.opportunities import AsyncOpportunityWriter
from arbitrade.storage.repositories import OpportunityRepository
pytestmark = pytest.mark.integration
@asynccontextmanager
async def _pg() -> AsyncIterator[PgStore]:
s = get_settings()
store = PgStore(s)
try:
await store.start()
await store.migrate()
yield store
finally:
await store.stop()
@pytest.mark.asyncio
async def test_async_opportunity_writer_persists_events() -> None:
async with _pg() as store:
repository = OpportunityRepository(store)
writer = AsyncOpportunityWriter(repository, max_queue_size=10)
await writer.start()
event = OpportunityEvent(
detected_at=datetime.now(UTC),
cycle="USD->BTC->ETH->USD",
updated_pair="BTC/USD",
gross_rate=1.04,
net_rate=1.03,
gross_pct=4.0,
net_pct=3.0,
est_profit=0.03,
)
await writer.enqueue(event)
await writer.stop()
async with store.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT cycle, gross_pct, net_pct, est_profit, executed FROM opportunities"
)
assert len(rows) == 1
assert rows[0]["cycle"] == "USD->BTC->ETH->USD"
assert rows[0]["gross_pct"] == 4.0
assert rows[0]["net_pct"] == 3.0
assert rows[0]["est_profit"] == 0.03
assert rows[0]["executed"] is False
+359
View File
@@ -0,0 +1,359 @@
"""Integration tests: verify PostgreSQL schema and connection.
These tests connect to the PostgreSQL server at 192.168.88.35 and
validate that all expected tables, columns, and constraints exist.
They are skipped if the server is unreachable.
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import pytest
import pytest_asyncio
from arbitrade.config.settings import Settings, get_settings
from arbitrade.storage.pg_store import PgStore
pytestmark = pytest.mark.integration
# ── expected schema ──────────────────────────────────────────────────────────
EXPECTED_TABLES: dict[str, list[str]] = {
"schema_migrations": ["version", "applied_at"],
"config_sections": ["id", "name", "description", "updated_at"],
"config_settings": [
"key", "section", "value_json", "value_type", "is_secret",
"is_runtime_reloadable", "updated_at", "updated_by",
],
"config_pairings": [
"id", "base_asset", "quote_asset", "enabled", "source",
"created_at", "updated_at",
],
"config_backtesting_defaults": [
"id", "starting_balances", "trade_capital", "min_profit_threshold",
"slippage_bps", "execution_latency_ms", "fee_source",
],
"opportunities": [
"id", "detected_at", "cycle", "gross_pct", "net_pct",
"est_profit", "executed",
],
"trades": [
"id", "trade_ref", "started_at", "finished_at", "status",
"realized_pnl", "estimated_pnl", "capital_used", "cycle", "leg_count",
],
"orders": [
"id", "trade_ref", "order_ref", "leg_index", "pair", "side",
"volume", "user_ref", "status", "filled_volume", "avg_price",
"raw_response", "recorded_at",
],
"pnl_events": [
"id", "trade_ref", "recorded_at", "kind", "pnl_usd", "source",
],
"portfolio_snapshots": ["snapshot_at", "balances", "total_value_usd"],
"market_snapshots": ["snapshot_at", "symbol", "source", "payload", "latency_ms"],
"audit_events": [
"id", "occurred_at", "actor", "event_type", "decision",
"payload", "correlation_id",
],
"runtime_state_snapshots": [
"snapshot_at", "is_running", "kill_switch_active", "kill_switch_reason",
"open_trade_count", "last_known_balances", "note",
],
"kraken_account_snapshots": [
"snapshot_at", "fee_tier", "maker_fee", "taker_fee",
"thirty_day_volume", "trade_balance_raw", "fee_schedule_raw",
],
"backtest_jobs": [
"id", "status", "events_path", "config", "report", "error",
"created_at", "started_at", "finished_at",
],
}
# Tables that should have a primary key
TABLES_WITH_PRIMARY_KEY: dict[str, str | list[str]] = {
"schema_migrations": "version",
"config_sections": "id",
"config_settings": "key",
"config_pairings": "id",
"config_backtesting_defaults": "id",
"opportunities": "id",
"trades": "id",
"orders": "id",
"pnl_events": "id",
"audit_events": "id",
"backtest_jobs": "id",
}
# Tables with a UNIQUE constraint beyond the primary key
TABLES_WITH_UNIQUE_CONSTRAINTS: dict[str, list[str]] = {
"config_sections": ["name"],
"config_pairings": ["base_asset, quote_asset"],
}
# ── fixtures ────────────────────────────────────────────────────────────────
@asynccontextmanager
async def _pg_lifecycle() -> AsyncIterator[PgStore]:
"""Connect, yield store, then disconnect."""
settings = get_settings()
store = PgStore(settings)
try:
await store.start()
yield store
finally:
await store.stop()
@pytest_asyncio.fixture(name="pg")
async def pg_fixture() -> AsyncIterator[PgStore]:
async with _pg_lifecycle() as store:
yield store
# ── helpers ─────────────────────────────────────────────────────────────────
async def _get_actual_tables(store: PgStore) -> dict[str, list[str]]:
"""Return {table_name: [column_name, ...]} for the public schema."""
actual: dict[str, list[str]] = {}
async with store.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT table_name, column_name FROM information_schema.columns "
"WHERE table_schema = 'public' ORDER BY table_name, ordinal_position"
)
for row in rows:
tbl: str = row["table_name"]
col: str = row["column_name"]
actual.setdefault(tbl, []).append(col)
return actual
async def _table_row_count(store: PgStore, table: str) -> int:
async with store.pool.acquire() as conn:
row = await conn.fetchrow(f"SELECT COUNT(*) AS cnt FROM {table}")
return int(row["cnt"]) if row else 0
# ── tests ───────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_pg_connect(pg: PgStore) -> None:
"""Can connect to PostgreSQL and ping the server."""
async with pg.pool.acquire() as conn:
val = await conn.fetchval("SELECT 1 AS val")
assert val == 1
@pytest.mark.asyncio
async def test_pgcrypto_extension(pg: PgStore) -> None:
"""The pgcrypto extension is available (gen_random_uuid)."""
async with pg.pool.acquire() as conn:
val = await conn.fetchval("SELECT gen_random_uuid()")
assert val is not None
# The result should be a UUID object
assert len(str(val)) == 36 # UUID string length
@pytest.mark.asyncio
async def test_schema_migration_applies(pg: PgStore) -> None:
"""Migrate creates all expected tables."""
await pg.migrate()
actual = await _get_actual_tables(pg)
for table in EXPECTED_TABLES:
assert table in actual, (
f"Table '{table}' missing after migration. "
f"Found tables: {sorted(actual)}"
)
@pytest.mark.asyncio
async def test_migration_is_idempotent(pg: PgStore) -> None:
"""Running migrate twice does not raise."""
await pg.migrate()
await pg.migrate() # second call should be a no-op
actual = await _get_actual_tables(pg)
for table in EXPECTED_TABLES:
assert table in actual
@pytest.mark.asyncio
async def test_table_columns(pg: PgStore) -> None:
"""Every expected table has the correct columns."""
await pg.migrate()
actual = await _get_actual_tables(pg)
for table, expected_cols in EXPECTED_TABLES.items():
actual_cols = actual.get(table, [])
for col in expected_cols:
assert col in actual_cols, (
f"Column '{col}' missing from table '{table}'. "
f"Actual columns: {actual_cols}"
)
@pytest.mark.asyncio
async def test_primary_keys(pg: PgStore) -> None:
"""Tables that should have primary keys do."""
await pg.migrate()
async with pg.pool.acquire() as conn:
for table, expected_pk in TABLES_WITH_PRIMARY_KEY.items():
rows = await conn.fetch(
"SELECT kcu.column_name FROM information_schema.table_constraints tc "
"JOIN information_schema.key_column_usage kcu "
"ON tc.constraint_name = kcu.constraint_name "
"WHERE tc.table_schema = 'public' AND tc.table_name = $1 "
"AND tc.constraint_type = 'PRIMARY KEY' "
"ORDER BY kcu.ordinal_position",
table,
)
pk_columns = [r["column_name"] for r in rows]
expected_list = [expected_pk] if isinstance(expected_pk, str) else expected_pk
for col in expected_list:
assert col in pk_columns, (
f"Table '{table}' should have PK column '{col}'. "
f"Actual PK columns: {pk_columns}"
)
@pytest.mark.asyncio
async def test_unique_constraints(pg: PgStore) -> None:
"""Tables that should have UNIQUE constraints do."""
await pg.migrate()
async with pg.pool.acquire() as conn:
for table, expected_ucs in TABLES_WITH_UNIQUE_CONSTRAINTS.items():
rows = await conn.fetch(
"SELECT kcu.column_name FROM information_schema.table_constraints tc "
"JOIN information_schema.key_column_usage kcu "
"ON tc.constraint_name = kcu.constraint_name "
"WHERE tc.table_schema = 'public' AND tc.table_name = $1 "
"AND tc.constraint_type = 'UNIQUE'",
table,
)
uc_columns = {r["column_name"] for r in rows}
for expected_cols in expected_ucs:
cols = [c.strip() for c in expected_cols.split(",")]
for col in cols:
assert col in uc_columns, (
f"Table '{table}' should have UNIQUE column '{col}'. "
f"Actual UNIQUE columns: {uc_columns}"
)
@pytest.mark.asyncio
async def test_table_row_count_is_zero(pg: PgStore) -> None:
"""All tables start empty after migration."""
await pg.migrate()
for table in EXPECTED_TABLES:
count = await _table_row_count(pg, table)
assert count == 0, (
f"Table '{table}' should be empty after migration, "
f"but has {count} rows"
)
@pytest.mark.asyncio
async def test_schema_migration_version_recorded(pg: PgStore) -> None:
"""schema_migrations has the expected version after migrate."""
from arbitrade.storage.pg_store import SCHEMA_VERSION
await pg.migrate()
async with pg.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT MAX(version) AS v FROM schema_migrations"
)
assert row is not None
assert row["v"] == SCHEMA_VERSION, (
f"Expected schema version {SCHEMA_VERSION}, "
f"got {row['v']}"
)
@pytest.mark.asyncio
async def test_create_and_query_row(pg: PgStore) -> None:
"""Can INSERT a row and SELECT it back (round-trip for a simple table)."""
await pg.migrate()
async with pg.pool.acquire() as conn:
# ConfigSections round-trip
await conn.execute(
"INSERT INTO config_sections (name, description) VALUES ($1, $2)",
"test_section", "A test section for integration test",
)
row = await conn.fetchrow(
"SELECT name, description FROM config_sections WHERE name = $1",
"test_section",
)
assert row is not None
assert row["name"] == "test_section"
assert row["description"] == "A test section for integration test"
# Clean up
await conn.execute(
"DELETE FROM config_sections WHERE name = $1",
"test_section",
)
@pytest.mark.asyncio
async def test_config_pairings_upsert(pg: PgStore) -> None:
"""ON CONFLICT ... DO UPDATE works on config_pairings (unique constraint)."""
await pg.migrate()
from arbitrade.config.service import ConfigPairing
from arbitrade.storage.repositories import ConfigPairingRepository
repo = ConfigPairingRepository(pg)
# Insert
p1 = await repo.upsert_pairing(
ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=True, source="kraken")
)
assert p1.id is not None
assert p1.base_asset == "XBT"
assert p1.enabled is True
# Upsert (update)
p2 = await repo.upsert_pairing(
ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=False, source="manual")
)
assert p2.id == p1.id # same row
assert p2.enabled is False
assert p2.source == "manual"
# Clean up
deleted = await repo.delete_pairing("XBT", "USD")
assert deleted is True
@pytest.mark.asyncio
async def test_audit_list_recent(pg: PgStore) -> None:
"""AuditRepository.list_recent returns records in desc order."""
await pg.migrate()
from datetime import UTC, datetime
from arbitrade.storage.repositories import AuditRecord, AuditRepository
repo = AuditRepository(pg)
now = datetime.now(UTC)
# Insert a few records
for i in range(3):
await repo.insert(
AuditRecord(
occurred_at=now,
actor="test",
event_type="integration_test",
decision=f"decision_{i}",
payload={"index": i},
correlation_id=f"corr_{i}",
)
)
recent = await repo.list_recent(limit=5)
assert len(recent) >= 3
assert recent[0].decision in ("decision_2", "decision_1", "decision_0")
# Verify payload serialization worked
first = recent[0]
if first.payload:
assert "index" in first.payload
-34
View File
@@ -1,34 +0,0 @@
from __future__ import annotations
from datetime import UTC, datetime
from arbitrade.config.settings import Settings
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.repositories import AuditRecord, AuditRepository
def test_audit_repository_inserts_and_lists_recent(tmp_path) -> None:
settings = Settings(_env_file=None, DUCKDB_PATH=tmp_path / "audit.duckdb")
store = DuckDBStore(settings)
store.migrate()
repository = AuditRepository(store)
repository.insert(
AuditRecord(
occurred_at=datetime.now(UTC),
actor="dashboard_user",
event_type="dashboard.control.start",
decision="approved",
payload={"execution_status": "running"},
correlation_id="req-1",
)
)
recent = repository.list_recent(limit=5)
assert len(recent) == 1
assert recent[0].actor == "dashboard_user"
assert recent[0].event_type == "dashboard.control.start"
assert recent[0].decision == "approved"
assert recent[0].payload == {"execution_status": "running"}
assert recent[0].correlation_id == "req-1"
+4 -3
View File
@@ -8,7 +8,7 @@ from arbitrade.config.service import (
ConfigPairing,
ConfigSetting,
)
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.pg_store import PgStore
from arbitrade.storage.repositories import (
ConfigBacktestingDefaultsRepository,
ConfigPairingRepository,
@@ -19,7 +19,7 @@ from arbitrade.storage.repositories import (
@pytest.fixture
def mock_store():
"""Create a mock database store."""
store = Mock(spec=DuckDBStore)
store = Mock(spec=PgStore)
return store
@@ -244,7 +244,8 @@ def test_config_pairing_repository_create_pairing(mock_store):
]
# Create pairing
pairing = ConfigPairing(base_asset="BTC", quote_asset="USD", enabled=True, source="Kraken")
pairing = ConfigPairing(
base_asset="BTC", quote_asset="USD", enabled=True, source="Kraken")
result = repo.create_pairing(pairing)
-89
View File
@@ -1,89 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
import pytest
from arbitrade.config.settings import Settings
from arbitrade.detection.engine import OpportunityEvent
from arbitrade.execution.sequencer import TriangularExecutionSequencer
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.executions import AsyncExecutionWriter
from arbitrade.storage.repositories import OrderRepository, PnLRepository, TradeRepository
@dataclass(slots=True)
class _FakeRestClient:
calls: int = 0
async def place_market_order(self, *, pair: str, side: str, volume: float) -> dict[str, object]:
self.calls += 1
return {"txid": [f"tx-{self.calls}"], "status": "submitted"}
def _sample_event() -> OpportunityEvent:
return OpportunityEvent(
detected_at=datetime.now(UTC),
cycle="USD->BTC->ETH->USD",
updated_pair="BTC/USD",
gross_rate=1.04,
net_rate=1.03,
gross_pct=4.0,
net_pct=3.0,
est_profit=0.03,
)
@pytest.mark.asyncio
async def test_execution_writer_persists_trade_order_and_pnl(tmp_path) -> None:
settings = Settings(_env_file=None, DUCKDB_PATH=tmp_path / "exec.duckdb")
store = DuckDBStore(settings)
store.migrate()
writer = AsyncExecutionWriter(
TradeRepository(store),
OrderRepository(store),
PnLRepository(store),
max_queue_size=10,
)
await writer.start()
client = _FakeRestClient()
sequencer = TriangularExecutionSequencer(
client,
available_pairs=["BTC/USD", "ETH/BTC", "ETH/USD"],
execution_writer=writer,
)
result = await sequencer.execute(_sample_event())
await writer.stop()
assert result.success
assert client.calls == 3
with store.connect() as conn:
trades = conn.execute(
"SELECT trade_ref, status, estimated_pnl, capital_used, cycle, leg_count FROM trades"
).fetchall()
orders = conn.execute(
"SELECT trade_ref, order_ref, leg_index, pair, side, volume, status "
"FROM orders ORDER BY leg_index"
).fetchall()
pnls = conn.execute("SELECT trade_ref, kind, pnl_usd, source FROM pnl_events").fetchall()
assert len(trades) == 1
assert trades[0][1] == "filled"
assert trades[0][2] == 0.03
assert trades[0][3] == 1.0
assert trades[0][4] == "USD->BTC->ETH->USD"
assert trades[0][5] == 3
assert len(orders) == 3
assert orders[0][2] == 0
assert orders[1][2] == 1
assert orders[2][2] == 2
assert orders[0][6] == "submitted"
assert len(pnls) == 1
assert pnls[0][1] == "estimated"
assert pnls[0][2] == 0.03
-144
View File
@@ -1,144 +0,0 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import pytest
from arbitrade.config.settings import Settings
from arbitrade.metrics import MetricsCalculator
from arbitrade.storage.db import DuckDBStore
def test_metrics_calculator_summarizes_execution_data(tmp_path) -> None:
settings = Settings(_env_file=None, DUCKDB_PATH=tmp_path / "metrics.duckdb")
store = DuckDBStore(settings)
store.migrate()
started = datetime.now(UTC)
finished = started + timedelta(seconds=30)
started_two = started + timedelta(minutes=1)
finished_two = started_two + timedelta(seconds=90)
with store.connect() as conn:
conn.execute(
"""
INSERT INTO trades (
trade_ref,
started_at,
finished_at,
status,
realized_pnl,
estimated_pnl,
capital_used,
cycle,
leg_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
"trade-1",
started,
finished,
"filled",
12.5,
10.0,
100.0,
"USD->BTC->ETH->USD",
3,
"trade-2",
started_two,
finished_two,
"filled",
-4.5,
-2.0,
200.0,
"USD->ETH->BTC->USD",
3,
],
)
conn.execute(
"""
INSERT INTO opportunities (
detected_at,
cycle,
gross_pct,
net_pct,
est_profit,
executed
) VALUES (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?)
""",
[
started,
"USD->BTC->ETH->USD",
4.0,
3.0,
0.03,
True,
started_two,
"USD->ETH->BTC->USD",
2.0,
1.0,
0.01,
False,
started_two + timedelta(seconds=30),
"USD->BTC->ETH->USD",
5.0,
4.0,
0.04,
True,
],
)
conn.execute(
"""
INSERT INTO orders (
trade_ref,
order_ref,
leg_index,
pair,
side,
volume,
user_ref,
status,
filled_volume,
avg_price,
raw_response,
recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
"trade-1",
"order-1",
0,
"BTC/USD",
"buy",
2.0,
101,
"closed",
2.0,
100.0,
"{}",
started,
"trade-2",
"order-2",
0,
"ETH/USD",
"sell",
4.0,
202,
"closed",
3.0,
200.0,
"{}",
started_two,
],
)
metrics = MetricsCalculator(store).compute()
assert metrics.realized_pnl_usd == 8.0
assert metrics.win_rate == 0.5
assert metrics.avg_trade_duration_seconds == 60.0
assert metrics.opportunities_per_minute == 2.0
assert metrics.fill_rate == 0.875
assert metrics.latency_p50_seconds == 60.0
assert metrics.latency_p95_seconds == 87.0
assert metrics.latency_p99_seconds == pytest.approx(89.4)
-48
View File
@@ -1,48 +0,0 @@
from __future__ import annotations
from datetime import UTC, datetime
import pytest
from arbitrade.config.settings import Settings
from arbitrade.detection.engine import OpportunityEvent
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.opportunities import AsyncOpportunityWriter
from arbitrade.storage.repositories import OpportunityRepository
@pytest.mark.asyncio
async def test_async_opportunity_writer_persists_events(tmp_path) -> None:
settings = Settings(_env_file=None, DUCKDB_PATH=tmp_path / "test.duckdb")
store = DuckDBStore(settings)
store.migrate()
repository = OpportunityRepository(store)
writer = AsyncOpportunityWriter(repository, max_queue_size=10)
await writer.start()
event = OpportunityEvent(
detected_at=datetime.now(UTC),
cycle="USD->BTC->ETH->USD",
updated_pair="BTC/USD",
gross_rate=1.04,
net_rate=1.03,
gross_pct=4.0,
net_pct=3.0,
est_profit=0.03,
)
await writer.enqueue(event)
await writer.stop()
with store.connect() as conn:
rows = conn.execute(
"SELECT cycle, gross_pct, net_pct, est_profit, executed FROM opportunities"
).fetchall()
assert len(rows) == 1
assert rows[0][0] == "USD->BTC->ETH->USD"
assert rows[0][1] == 4.0
assert rows[0][2] == 3.0
assert rows[0][3] == 0.03
assert rows[0][4] is False