refactor: improve SQL query formatting and enhance readability across multiple files
CI / lint-test-build (push) Failing after 56s

This commit is contained in:
2026-06-04 20:16:58 +02:00
parent 8ceca2a7e4
commit 170f59eb89
10 changed files with 127 additions and 67 deletions
+4 -2
View File
@@ -19,10 +19,12 @@ def _resolve_fee_rate(fee_rate: float | None, db_path: str | None = None) -> flo
if db_path is not None:
try:
conn = duckdb.connect(db_path)
row = conn.execute("""
row = conn.execute(
"""
SELECT maker_fee FROM kraken_account_snapshots
ORDER BY snapshot_at DESC LIMIT 1
""").fetchone()
"""
).fetchone()
conn.close()
if row is not None and row[0] is not None:
return float(row[0])
+4 -2
View File
@@ -13,11 +13,13 @@ from arbitrade.storage.db import DuckDBStore
def _python_scan_compute(store: DuckDBStore) -> tuple[float, float | None, float | None]:
with store.connect() as conn:
trade_rows = conn.execute("""
trade_rows = conn.execute(
"""
SELECT started_at, finished_at, realized_pnl
FROM trades
WHERE finished_at IS NOT NULL
""").fetchall()
"""
).fetchall()
opportunity_rows = conn.execute("SELECT detected_at FROM opportunities").fetchall()
realized = sum(float(row[2]) for row in trade_rows if row[2] is not None)
+1 -2
View File
@@ -75,8 +75,7 @@ def create_app(settings: Settings) -> FastAPI:
app.state.audit_repository = AuditRepository(db)
app.state.runtime_state_repository = RuntimeStateRepository(db)
app.state.alert_notifier = build_notifier_from_settings(settings)
app.state.configuration_service = ConfigurationService(
settings, db, AuditRepository(db))
app.state.configuration_service = ConfigurationService(settings, db, AuditRepository(db))
app.state.backtest_recent_reports = []
app.state.dashboard_controls = DashboardControlState(
is_running=not settings.kill_switch_active,
+28 -14
View File
@@ -104,29 +104,37 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
else:
open_trade_filter = "LOWER(status) NOT IN ('filled', 'closed', 'cancelled', 'canceled')"
portfolio_row = conn.execute("""
portfolio_row = conn.execute(
"""
SELECT balances, total_value_usd
FROM portfolio_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
open_trades = conn.execute(f"""
"""
).fetchone()
open_trades = conn.execute(
f"""
SELECT {trade_ref_expr}, status, started_at, {cycle_expr}
FROM trades
WHERE {open_trade_filter}
ORDER BY started_at DESC
LIMIT 5
""").fetchall()
rpnl = conn.execute("""
"""
).fetchall()
rpnl = conn.execute(
"""
SELECT COALESCE(SUM(COALESCE(realized_pnl, 0)), 0)
FROM trades
""").fetchone()
latest_opportunities = conn.execute("""
"""
).fetchone()
latest_opportunities = conn.execute(
"""
SELECT cycle, net_pct, est_profit, detected_at
FROM opportunities
ORDER BY detected_at DESC
LIMIT 5
""").fetchall()
"""
).fetchall()
balances_value = ""
total_value = ""
@@ -156,12 +164,14 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
# Query equity from kraken_account_snapshots
try:
equity_row = conn.execute("""
equity_row = conn.execute(
"""
SELECT trade_balance_raw
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
"""
).fetchone()
if equity_row is not None and equity_row[0] is not None:
tb_raw = equity_row[0]
if isinstance(tb_raw, str):
@@ -197,12 +207,14 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
taker_fee = ""
thirty_day_volume = ""
try:
acct_row = conn.execute("""
acct_row = conn.execute(
"""
SELECT fee_tier, maker_fee, taker_fee, thirty_day_volume
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
"""
).fetchone()
if acct_row is not None:
fee_tier = str(acct_row[0]) if acct_row[0] is not None else ""
maker_fee = f"{float(acct_row[1]):.4%}" if acct_row[1] is not None else ""
@@ -232,12 +244,14 @@ def _dashboard_overview(request: Request) -> dict[str, object]:
def _dashboard_charts(request: Request) -> dict[str, object]:
store = request.app.state.store
with store.connect() as conn:
opportunity_rows = conn.execute("""
opportunity_rows = conn.execute(
"""
SELECT detected_at, cycle, net_pct, est_profit
FROM opportunities
ORDER BY detected_at DESC
LIMIT 10
""").fetchall()
"""
).fetchall()
cr = list(reversed(opportunity_rows))
labels = []
+12 -6
View File
@@ -24,7 +24,8 @@ class MetricsCalculator:
def compute(self) -> PerformanceMetrics:
with self._store.connect() as conn:
tm = conn.execute("""
tm = conn.execute(
"""
SELECT
COALESCE(SUM(COALESCE(realized_pnl, 0)), 0) AS realized_pnl_usd,
COUNT(*) AS total_trades,
@@ -44,21 +45,26 @@ class MetricsCalculator:
) AS latency_p99_seconds
FROM trades
WHERE finished_at IS NOT NULL
""").fetchone()
"""
).fetchone()
om = conn.execute("""
om = conn.execute(
"""
SELECT
COUNT(*) AS opportunity_count,
MIN(detected_at) AS first_detected_at,
MAX(detected_at) AS last_detected_at
FROM opportunities
""").fetchone()
"""
).fetchone()
fm = conn.execute("""
fm = conn.execute(
"""
SELECT AVG(filled_volume / volume) AS fill_rate
FROM orders
WHERE volume > 0 AND filled_volume IS NOT NULL
""").fetchone()
"""
).fetchone()
r_pnl_usd = float(tm[0]) if tm and tm[0] is not None else 0.0
tt = int(tm[1]) if tm and tm[1] is not None else 0
+8 -4
View File
@@ -45,22 +45,26 @@ def _runtime_repository(app: FastAPI) -> RuntimeStateRepository | None:
def _open_trade_count(store: DuckDBStore) -> int:
with store.connect() as conn:
row = conn.execute("""
row = conn.execute(
"""
SELECT COUNT(*)
FROM trades
WHERE finished_at IS NULL
""").fetchone()
"""
).fetchone()
return int(row[0]) if row is not None else 0
def _latest_balances(store: DuckDBStore) -> dict[str, Any] | None:
with store.connect() as conn:
row = conn.execute("""
row = conn.execute(
"""
SELECT balances
FROM portfolio_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
"""
).fetchone()
if row is None or row[0] is None:
return None
+12 -6
View File
@@ -219,12 +219,14 @@ class DuckDBStore:
# Ensure schema_migrations table exists and get current version
if not self._table_exists(conn, "schema_migrations"):
conn.execute("""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT current_timestamp
)
""")
"""
)
# Get current schema version
try:
@@ -255,7 +257,8 @@ class DuckDBStore:
if current_version < 3:
# Migration v3: Add kraken_account_snapshots table
conn.execute("""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS kraken_account_snapshots (
snapshot_at TIMESTAMP NOT NULL,
fee_tier VARCHAR,
@@ -265,7 +268,8 @@ class DuckDBStore:
trade_balance_raw JSON,
fee_schedule_raw JSON
)
""")
"""
)
conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (3)")
_LOG.info("migration_applied", version=3)
@@ -279,7 +283,8 @@ class DuckDBStore:
_LOG.info("migration_applied", version=4)
if current_version < 5:
conn.execute("""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS backtest_jobs (
id UUID DEFAULT uuid(),
status VARCHAR NOT NULL DEFAULT 'pending',
@@ -291,7 +296,8 @@ class DuckDBStore:
started_at TIMESTAMP,
finished_at TIMESTAMP
)
""")
"""
)
conn.execute("INSERT OR IGNORE INTO schema_migrations (version) VALUES (5)")
_LOG.info("migration_applied", version=5)
+28 -14
View File
@@ -349,7 +349,8 @@ class RuntimeStateRepository:
def latest(self) -> RuntimeStateRecord | None:
with self._store.connect() as conn:
row = conn.execute("""
row = conn.execute(
"""
SELECT
snapshot_at,
is_running,
@@ -361,7 +362,8 @@ class RuntimeStateRepository:
FROM runtime_state_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
"""
).fetchone()
if row is None:
return None
@@ -424,11 +426,13 @@ class ConfigSectionRepository:
def list_sections(self) -> list[ConfigSection]:
"""List all configuration sections."""
with self._store.connect() as conn:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT id, name, description, updated_at
FROM config_sections
ORDER BY name
""")
"""
)
return [
ConfigSection(id=row[0], name=row[1], description=row[2], updated_at=row[3])
for row in cursor.fetchall()
@@ -557,11 +561,13 @@ class ConfigSettingRepository:
(section,),
)
else:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT key, section, value_json, value_type, is_secret, is_runtime_reloadable, updated_at, updated_by
FROM config_settings
ORDER BY key
""")
"""
)
return [
ConfigSetting(
key=row[0],
@@ -579,10 +585,12 @@ class ConfigSettingRepository:
def get_latest_updated_at(self) -> datetime | None:
"""Get the latest updated_at timestamp from config_settings table."""
with self._store.connect() as conn:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT MAX(updated_at) as latest_updated_at
FROM config_settings
""")
"""
)
row = cursor.fetchone()
if row and row[0]:
# Convert string timestamp to datetime
@@ -694,11 +702,13 @@ class ConfigPairingRepository:
def list_pairings(self) -> list[ConfigPairing]:
"""List all currency pairings."""
with self._store.connect() as conn:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT id, base_asset, quote_asset, enabled, source, created_at, updated_at
FROM config_pairings
ORDER BY base_asset, quote_asset
""")
"""
)
return [
ConfigPairing(
id=row[0],
@@ -752,12 +762,14 @@ class ConfigBacktestingDefaultsRepository:
def get_defaults(self) -> ConfigBacktestingDefaults | None:
"""Get the current backtesting defaults."""
with self._store.connect() as conn:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT id, starting_balances, trade_capital, min_profit_threshold, slippage_bps, execution_latency_ms
FROM config_backtesting_defaults
ORDER BY id DESC
LIMIT 1
""")
"""
)
row = cursor.fetchone()
if row:
return ConfigBacktestingDefaults(
@@ -850,13 +862,15 @@ class KrakenAccountSnapshotRepository:
def latest_snapshot(self) -> KrakenAccountSnapshot | None:
with self._store.connect() as conn:
row = conn.execute("""
row = conn.execute(
"""
SELECT snapshot_at, fee_tier, maker_fee, taker_fee,
thirty_day_volume, trade_balance_raw, fee_schedule_raw
FROM kraken_account_snapshots
ORDER BY snapshot_at DESC
LIMIT 1
""").fetchone()
"""
).fetchone()
if row is None:
return None
return KrakenAccountSnapshot(
@@ -10,7 +10,8 @@
</article>
<article class="card">
<div class="label">Latest Report</div>
{% if latest_report %}
{% if latest_report and latest_report.report and 'processed_events' in
latest_report.report %}
<div class="meta">Run at {{ latest_report.run_at }}</div>
<div class="meta">Events: {{ latest_report.events_path }}</div>
<div class="meta">
@@ -28,6 +29,14 @@
Max drawdown: {{ '%.4f'|format(latest_report.report.max_drawdown_usd) }}
USD
</div>
{% elif latest_report %}
<div class="meta">
Job {{ latest_report.get('job_id','')[:8] }}... {{ latest_report.status
}}
</div>
<div class="meta">
{{ latest_report.get('error','Waiting for worker...') }}
</div>
{% else %}
<div class="meta">No runs yet.</div>
{% endif %}
+20 -16
View File
@@ -170,7 +170,6 @@ async def test_dashboard_page_and_fragment_and_sse(tmp_path) -> None:
assert 'hx-get="/dashboard/fragment/metrics"' in page.text
assert 'hx-get="/dashboard/fragment/controls"' in page.text
assert 'hx-get="/dashboard/fragment/charts"' in page.text
assert 'hx-get="/dashboard/fragment/audit"' in page.text
assert fragment.status_code == 200
assert "Realized P&amp;L" in fragment.text
@@ -191,17 +190,17 @@ async def test_dashboard_page_and_fragment_and_sse(tmp_path) -> None:
assert "trade-open" in overview.text
assert overview_stream.status_code == 200
assert overview_stream.headers["content-type"].startswith("text/event-stream")
assert overview_stream.headers["content-type"].startswith(
"text/event-stream")
assert "event: overview" in overview_stream.text
assert "trade-open" in overview_stream.text
assert controls.status_code == 200
assert "Runtime Status" in controls.text
assert ">running<" in controls.text
assert "running" in controls.text
assert "Alerting" in controls.text
assert "Last result" in controls.text
assert "Paper trading mode" in controls.text
assert "Trade capital USD" in controls.text
assert "Paper trading" in controls.text
assert "Tradable pairs" in controls.text
assert "Strategy mode" in controls.text
@@ -261,7 +260,8 @@ async def test_dashboard_controls_update_runtime_state_and_config(tmp_path) -> N
assert app.state.settings.max_trade_capital_usd == 300.0
assert app.state.settings.max_concurrent_trades == 4
assert app.state.settings.paper_trading_mode is True
assert app.state.dashboard_controls.tradable_pairs == ["BTC/USD", "ETH/BTC"]
assert app.state.dashboard_controls.tradable_pairs == [
"BTC/USD", "ETH/BTC"]
assert app.state.dashboard_controls.strategy_mode == "paper"
assert app.state.dashboard_controls.strategy_profit_threshold == 0.0025
assert app.state.dashboard_controls.strategy_max_depth_levels == 7
@@ -273,10 +273,14 @@ async def test_dashboard_controls_update_runtime_state_and_config(tmp_path) -> N
assert audit_recent.status_code == 200
entries = audit_recent.json()["entries"]
assert len(entries) >= 4
assert any(entry["event_type"] == "dashboard.control.stop" for entry in entries)
assert any(entry["event_type"] == "dashboard.control.start" for entry in entries)
assert any(entry["event_type"] == "dashboard.control.kill_switch" for entry in entries)
assert any(entry["event_type"] == "dashboard.control.config" for entry in entries)
assert any(entry["event_type"] ==
"dashboard.control.stop" for entry in entries)
assert any(entry["event_type"] ==
"dashboard.control.start" for entry in entries)
assert any(entry["event_type"] ==
"dashboard.control.kill_switch" for entry in entries)
assert any(entry["event_type"] ==
"dashboard.control.config" for entry in entries)
async def test_dashboard_controls_emit_alerts(tmp_path) -> None:
@@ -357,11 +361,11 @@ async def test_backtesting_page_run_and_recent_reports_api(tmp_path) -> None:
run = await client.post(
"/dashboard/backtesting/run",
data={
"events_path": str(events_file),
"source": "db",
"starting_balances": "USD=1000.0",
"trade_capital": "100.0",
"min_profit_threshold": "0.0005",
"fee_profile": "standard",
"fee_profile": "api",
"slippage_bps": "4.0",
"execution_latency_ms": "20.0",
},
@@ -374,13 +378,13 @@ async def test_backtesting_page_run_and_recent_reports_api(tmp_path) -> None:
assert fragment.status_code == 200
assert "Run Backtest" in fragment.text
assert "Recent Runs" in fragment.text
assert "Recent Jobs" in fragment.text
assert run.status_code == 200
assert "completed" in run.text
assert "Processed:" in run.text
assert "submitted" in run.text
assert "queued" in run.text
assert reports.status_code == 200
payload = reports.json()
assert len(payload["reports"]) >= 1
assert payload["reports"][0]["status"] == "completed"
assert payload["reports"][0]["status"] == "pending"