ef22e217c7
CI / lint-test-build (push) Failing after 11s
- Added PG_PASSWORD to .env.example for database connection. - Removed unnecessary imports and streamlined code in various modules. - Enhanced error handling in ConfigSettingRepository and ConfigPairingRepository. - Updated test files to remove unused imports and improve clarity.
360 lines
13 KiB
Python
360 lines
13 KiB
Python
"""Integration tests: verify PostgreSQL schema and connection.
|
|
|
|
These tests connect to the PostgreSQL server at 192.168.88.35 and
|
|
validate that all expected tables, columns, and constraints exist.
|
|
They are skipped if the server is unreachable.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncIterator
|
|
from contextlib import asynccontextmanager
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from arbitrade.config.settings import get_settings
|
|
from arbitrade.storage.pg_store import PgStore
|
|
|
|
pytestmark = pytest.mark.integration
|
|
|
|
|
|
# ── expected schema ──────────────────────────────────────────────────────────
|
|
|
|
EXPECTED_TABLES: dict[str, list[str]] = {
|
|
"schema_migrations": ["version", "applied_at"],
|
|
"config_sections": ["id", "name", "description", "updated_at"],
|
|
"config_settings": [
|
|
"key", "section", "value_json", "value_type", "is_secret",
|
|
"is_runtime_reloadable", "updated_at", "updated_by",
|
|
],
|
|
"config_pairings": [
|
|
"id", "base_asset", "quote_asset", "enabled", "source",
|
|
"created_at", "updated_at",
|
|
],
|
|
"config_backtesting_defaults": [
|
|
"id", "starting_balances", "trade_capital", "min_profit_threshold",
|
|
"slippage_bps", "execution_latency_ms", "fee_source",
|
|
],
|
|
"opportunities": [
|
|
"id", "detected_at", "cycle", "gross_pct", "net_pct",
|
|
"est_profit", "executed",
|
|
],
|
|
"trades": [
|
|
"id", "trade_ref", "started_at", "finished_at", "status",
|
|
"realized_pnl", "estimated_pnl", "capital_used", "cycle", "leg_count",
|
|
],
|
|
"orders": [
|
|
"id", "trade_ref", "order_ref", "leg_index", "pair", "side",
|
|
"volume", "user_ref", "status", "filled_volume", "avg_price",
|
|
"raw_response", "recorded_at",
|
|
],
|
|
"pnl_events": [
|
|
"id", "trade_ref", "recorded_at", "kind", "pnl_usd", "source",
|
|
],
|
|
"portfolio_snapshots": ["snapshot_at", "balances", "total_value_usd"],
|
|
"market_snapshots": ["snapshot_at", "symbol", "source", "payload", "latency_ms"],
|
|
"audit_events": [
|
|
"id", "occurred_at", "actor", "event_type", "decision",
|
|
"payload", "correlation_id",
|
|
],
|
|
"runtime_state_snapshots": [
|
|
"snapshot_at", "is_running", "kill_switch_active", "kill_switch_reason",
|
|
"open_trade_count", "last_known_balances", "note",
|
|
],
|
|
"kraken_account_snapshots": [
|
|
"snapshot_at", "fee_tier", "maker_fee", "taker_fee",
|
|
"thirty_day_volume", "trade_balance_raw", "fee_schedule_raw",
|
|
],
|
|
"backtest_jobs": [
|
|
"id", "status", "events_path", "config", "report", "error",
|
|
"created_at", "started_at", "finished_at",
|
|
],
|
|
}
|
|
|
|
# Tables that should have a primary key
|
|
TABLES_WITH_PRIMARY_KEY: dict[str, str | list[str]] = {
|
|
"schema_migrations": "version",
|
|
"config_sections": "id",
|
|
"config_settings": "key",
|
|
"config_pairings": "id",
|
|
"config_backtesting_defaults": "id",
|
|
"opportunities": "id",
|
|
"trades": "id",
|
|
"orders": "id",
|
|
"pnl_events": "id",
|
|
"audit_events": "id",
|
|
"backtest_jobs": "id",
|
|
}
|
|
|
|
# Tables with a UNIQUE constraint beyond the primary key
|
|
TABLES_WITH_UNIQUE_CONSTRAINTS: dict[str, list[str]] = {
|
|
"config_sections": ["name"],
|
|
"config_pairings": ["base_asset, quote_asset"],
|
|
}
|
|
|
|
|
|
# ── fixtures ────────────────────────────────────────────────────────────────
|
|
|
|
@asynccontextmanager
|
|
async def _pg_lifecycle() -> AsyncIterator[PgStore]:
|
|
"""Connect, yield store, then disconnect."""
|
|
settings = get_settings()
|
|
store = PgStore(settings)
|
|
try:
|
|
await store.start()
|
|
yield store
|
|
finally:
|
|
await store.stop()
|
|
|
|
|
|
@pytest_asyncio.fixture(name="pg")
|
|
async def pg_fixture() -> AsyncIterator[PgStore]:
|
|
async with _pg_lifecycle() as store:
|
|
yield store
|
|
|
|
|
|
# ── helpers ─────────────────────────────────────────────────────────────────
|
|
|
|
async def _get_actual_tables(store: PgStore) -> dict[str, list[str]]:
|
|
"""Return {table_name: [column_name, ...]} for the public schema."""
|
|
actual: dict[str, list[str]] = {}
|
|
async with store.pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"SELECT table_name, column_name FROM information_schema.columns "
|
|
"WHERE table_schema = 'public' ORDER BY table_name, ordinal_position"
|
|
)
|
|
for row in rows:
|
|
tbl: str = row["table_name"]
|
|
col: str = row["column_name"]
|
|
actual.setdefault(tbl, []).append(col)
|
|
return actual
|
|
|
|
|
|
async def _table_row_count(store: PgStore, table: str) -> int:
|
|
async with store.pool.acquire() as conn:
|
|
row = await conn.fetchrow(f"SELECT COUNT(*) AS cnt FROM {table}")
|
|
return int(row["cnt"]) if row else 0
|
|
|
|
|
|
# ── tests ───────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pg_connect(pg: PgStore) -> None:
|
|
"""Can connect to PostgreSQL and ping the server."""
|
|
async with pg.pool.acquire() as conn:
|
|
val = await conn.fetchval("SELECT 1 AS val")
|
|
assert val == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pgcrypto_extension(pg: PgStore) -> None:
|
|
"""The pgcrypto extension is available (gen_random_uuid)."""
|
|
async with pg.pool.acquire() as conn:
|
|
val = await conn.fetchval("SELECT gen_random_uuid()")
|
|
assert val is not None
|
|
# The result should be a UUID object
|
|
assert len(str(val)) == 36 # UUID string length
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_schema_migration_applies(pg: PgStore) -> None:
|
|
"""Migrate creates all expected tables."""
|
|
await pg.migrate()
|
|
actual = await _get_actual_tables(pg)
|
|
|
|
for table in EXPECTED_TABLES:
|
|
assert table in actual, (
|
|
f"Table '{table}' missing after migration. "
|
|
f"Found tables: {sorted(actual)}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_migration_is_idempotent(pg: PgStore) -> None:
|
|
"""Running migrate twice does not raise."""
|
|
await pg.migrate()
|
|
await pg.migrate() # second call should be a no-op
|
|
actual = await _get_actual_tables(pg)
|
|
for table in EXPECTED_TABLES:
|
|
assert table in actual
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_table_columns(pg: PgStore) -> None:
|
|
"""Every expected table has the correct columns."""
|
|
await pg.migrate()
|
|
actual = await _get_actual_tables(pg)
|
|
|
|
for table, expected_cols in EXPECTED_TABLES.items():
|
|
actual_cols = actual.get(table, [])
|
|
for col in expected_cols:
|
|
assert col in actual_cols, (
|
|
f"Column '{col}' missing from table '{table}'. "
|
|
f"Actual columns: {actual_cols}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_primary_keys(pg: PgStore) -> None:
|
|
"""Tables that should have primary keys do."""
|
|
await pg.migrate()
|
|
async with pg.pool.acquire() as conn:
|
|
for table, expected_pk in TABLES_WITH_PRIMARY_KEY.items():
|
|
rows = await conn.fetch(
|
|
"SELECT kcu.column_name FROM information_schema.table_constraints tc "
|
|
"JOIN information_schema.key_column_usage kcu "
|
|
"ON tc.constraint_name = kcu.constraint_name "
|
|
"WHERE tc.table_schema = 'public' AND tc.table_name = $1 "
|
|
"AND tc.constraint_type = 'PRIMARY KEY' "
|
|
"ORDER BY kcu.ordinal_position",
|
|
table,
|
|
)
|
|
pk_columns = [r["column_name"] for r in rows]
|
|
expected_list = [expected_pk] if isinstance(expected_pk, str) else expected_pk
|
|
for col in expected_list:
|
|
assert col in pk_columns, (
|
|
f"Table '{table}' should have PK column '{col}'. "
|
|
f"Actual PK columns: {pk_columns}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unique_constraints(pg: PgStore) -> None:
|
|
"""Tables that should have UNIQUE constraints do."""
|
|
await pg.migrate()
|
|
async with pg.pool.acquire() as conn:
|
|
for table, expected_ucs in TABLES_WITH_UNIQUE_CONSTRAINTS.items():
|
|
rows = await conn.fetch(
|
|
"SELECT kcu.column_name FROM information_schema.table_constraints tc "
|
|
"JOIN information_schema.key_column_usage kcu "
|
|
"ON tc.constraint_name = kcu.constraint_name "
|
|
"WHERE tc.table_schema = 'public' AND tc.table_name = $1 "
|
|
"AND tc.constraint_type = 'UNIQUE'",
|
|
table,
|
|
)
|
|
uc_columns = {r["column_name"] for r in rows}
|
|
for expected_cols in expected_ucs:
|
|
cols = [c.strip() for c in expected_cols.split(",")]
|
|
for col in cols:
|
|
assert col in uc_columns, (
|
|
f"Table '{table}' should have UNIQUE column '{col}'. "
|
|
f"Actual UNIQUE columns: {uc_columns}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_table_row_count_is_zero(pg: PgStore) -> None:
|
|
"""All tables start empty after migration."""
|
|
await pg.migrate()
|
|
for table in EXPECTED_TABLES:
|
|
count = await _table_row_count(pg, table)
|
|
assert count == 0, (
|
|
f"Table '{table}' should be empty after migration, "
|
|
f"but has {count} rows"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_schema_migration_version_recorded(pg: PgStore) -> None:
|
|
"""schema_migrations has the expected version after migrate."""
|
|
from arbitrade.storage.pg_store import SCHEMA_VERSION
|
|
|
|
await pg.migrate()
|
|
async with pg.pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"SELECT MAX(version) AS v FROM schema_migrations"
|
|
)
|
|
assert row is not None
|
|
assert row["v"] == SCHEMA_VERSION, (
|
|
f"Expected schema version {SCHEMA_VERSION}, "
|
|
f"got {row['v']}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_and_query_row(pg: PgStore) -> None:
|
|
"""Can INSERT a row and SELECT it back (round-trip for a simple table)."""
|
|
await pg.migrate()
|
|
async with pg.pool.acquire() as conn:
|
|
# ConfigSections round-trip
|
|
await conn.execute(
|
|
"INSERT INTO config_sections (name, description) VALUES ($1, $2)",
|
|
"test_section", "A test section for integration test",
|
|
)
|
|
row = await conn.fetchrow(
|
|
"SELECT name, description FROM config_sections WHERE name = $1",
|
|
"test_section",
|
|
)
|
|
assert row is not None
|
|
assert row["name"] == "test_section"
|
|
assert row["description"] == "A test section for integration test"
|
|
|
|
# Clean up
|
|
await conn.execute(
|
|
"DELETE FROM config_sections WHERE name = $1",
|
|
"test_section",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_pairings_upsert(pg: PgStore) -> None:
|
|
"""ON CONFLICT ... DO UPDATE works on config_pairings (unique constraint)."""
|
|
await pg.migrate()
|
|
from arbitrade.config.service import ConfigPairing
|
|
from arbitrade.storage.repositories import ConfigPairingRepository
|
|
|
|
repo = ConfigPairingRepository(pg)
|
|
|
|
# Insert
|
|
p1 = await repo.upsert_pairing(
|
|
ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=True, source="kraken")
|
|
)
|
|
assert p1.id is not None
|
|
assert p1.base_asset == "XBT"
|
|
assert p1.enabled is True
|
|
|
|
# Upsert (update)
|
|
p2 = await repo.upsert_pairing(
|
|
ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=False, source="manual")
|
|
)
|
|
assert p2.id == p1.id # same row
|
|
assert p2.enabled is False
|
|
assert p2.source == "manual"
|
|
|
|
# Clean up
|
|
deleted = await repo.delete_pairing("XBT", "USD")
|
|
assert deleted is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_audit_list_recent(pg: PgStore) -> None:
|
|
"""AuditRepository.list_recent returns records in desc order."""
|
|
await pg.migrate()
|
|
from datetime import UTC, datetime
|
|
|
|
from arbitrade.storage.repositories import AuditRecord, AuditRepository
|
|
|
|
repo = AuditRepository(pg)
|
|
now = datetime.now(UTC)
|
|
|
|
# Insert a few records
|
|
for i in range(3):
|
|
await repo.insert(
|
|
AuditRecord(
|
|
occurred_at=now,
|
|
actor="test",
|
|
event_type="integration_test",
|
|
decision=f"decision_{i}",
|
|
payload={"index": i},
|
|
correlation_id=f"corr_{i}",
|
|
)
|
|
)
|
|
|
|
recent = await repo.list_recent(limit=5)
|
|
assert len(recent) >= 3
|
|
assert recent[0].decision in ("decision_2", "decision_1", "decision_0")
|
|
# Verify payload serialization worked
|
|
first = recent[0]
|
|
if first.payload:
|
|
assert "index" in first.payload |