"""Integration tests: verify PostgreSQL schema and connection. These tests connect to the PostgreSQL server at 192.168.88.35 and validate that all expected tables, columns, and constraints exist. They are skipped if the server is unreachable. """ from __future__ import annotations from collections.abc import AsyncIterator from contextlib import asynccontextmanager import pytest import pytest_asyncio from arbitrade.config.settings import get_settings from arbitrade.storage.pg_store import PgStore pytestmark = pytest.mark.integration # ── expected schema ────────────────────────────────────────────────────────── EXPECTED_TABLES: dict[str, list[str]] = { "schema_migrations": ["version", "applied_at"], "config_sections": ["id", "name", "description", "updated_at"], "config_settings": [ "key", "section", "value_json", "value_type", "is_secret", "is_runtime_reloadable", "updated_at", "updated_by", ], "config_pairings": [ "id", "base_asset", "quote_asset", "enabled", "source", "created_at", "updated_at", ], "config_backtesting_defaults": [ "id", "starting_balances", "trade_capital", "min_profit_threshold", "slippage_bps", "execution_latency_ms", "fee_source", ], "opportunities": [ "id", "detected_at", "cycle", "gross_pct", "net_pct", "est_profit", "executed", ], "trades": [ "id", "trade_ref", "started_at", "finished_at", "status", "realized_pnl", "estimated_pnl", "capital_used", "cycle", "leg_count", ], "orders": [ "id", "trade_ref", "order_ref", "leg_index", "pair", "side", "volume", "user_ref", "status", "filled_volume", "avg_price", "raw_response", "recorded_at", ], "pnl_events": [ "id", "trade_ref", "recorded_at", "kind", "pnl_usd", "source", ], "portfolio_snapshots": ["snapshot_at", "balances", "total_value_usd"], "market_snapshots": ["snapshot_at", "symbol", "source", "payload", "latency_ms"], "audit_events": [ "id", "occurred_at", "actor", "event_type", "decision", "payload", "correlation_id", ], "runtime_state_snapshots": [ "snapshot_at", "is_running", "kill_switch_active", "kill_switch_reason", "open_trade_count", "last_known_balances", "note", ], "kraken_account_snapshots": [ "snapshot_at", "fee_tier", "maker_fee", "taker_fee", "thirty_day_volume", "trade_balance_raw", "fee_schedule_raw", ], "backtest_jobs": [ "id", "status", "events_path", "config", "report", "error", "created_at", "started_at", "finished_at", ], } # Tables that should have a primary key TABLES_WITH_PRIMARY_KEY: dict[str, str | list[str]] = { "schema_migrations": "version", "config_sections": "id", "config_settings": "key", "config_pairings": "id", "config_backtesting_defaults": "id", "opportunities": "id", "trades": "id", "orders": "id", "pnl_events": "id", "audit_events": "id", "backtest_jobs": "id", } # Tables with a UNIQUE constraint beyond the primary key TABLES_WITH_UNIQUE_CONSTRAINTS: dict[str, list[str]] = { "config_sections": ["name"], "config_pairings": ["base_asset, quote_asset"], } # ── fixtures ──────────────────────────────────────────────────────────────── @asynccontextmanager async def _pg_lifecycle() -> AsyncIterator[PgStore]: """Connect, yield store, then disconnect.""" settings = get_settings() store = PgStore(settings) try: await store.start() yield store finally: await store.stop() @pytest_asyncio.fixture(name="pg") async def pg_fixture() -> AsyncIterator[PgStore]: async with _pg_lifecycle() as store: yield store # ── helpers ───────────────────────────────────────────────────────────────── async def _get_actual_tables(store: PgStore) -> dict[str, list[str]]: """Return {table_name: [column_name, ...]} for the public schema.""" actual: dict[str, list[str]] = {} async with store.pool.acquire() as conn: rows = await conn.fetch( "SELECT table_name, column_name FROM information_schema.columns " "WHERE table_schema = 'public' ORDER BY table_name, ordinal_position" ) for row in rows: tbl: str = row["table_name"] col: str = row["column_name"] actual.setdefault(tbl, []).append(col) return actual async def _table_row_count(store: PgStore, table: str) -> int: async with store.pool.acquire() as conn: row = await conn.fetchrow(f"SELECT COUNT(*) AS cnt FROM {table}") return int(row["cnt"]) if row else 0 # ── tests ─────────────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_pg_connect(pg: PgStore) -> None: """Can connect to PostgreSQL and ping the server.""" async with pg.pool.acquire() as conn: val = await conn.fetchval("SELECT 1 AS val") assert val == 1 @pytest.mark.asyncio async def test_pgcrypto_extension(pg: PgStore) -> None: """The pgcrypto extension is available (gen_random_uuid).""" async with pg.pool.acquire() as conn: val = await conn.fetchval("SELECT gen_random_uuid()") assert val is not None # The result should be a UUID object assert len(str(val)) == 36 # UUID string length @pytest.mark.asyncio async def test_schema_migration_applies(pg: PgStore) -> None: """Migrate creates all expected tables.""" await pg.migrate() actual = await _get_actual_tables(pg) for table in EXPECTED_TABLES: assert table in actual, ( f"Table '{table}' missing after migration. " f"Found tables: {sorted(actual)}" ) @pytest.mark.asyncio async def test_migration_is_idempotent(pg: PgStore) -> None: """Running migrate twice does not raise.""" await pg.migrate() await pg.migrate() # second call should be a no-op actual = await _get_actual_tables(pg) for table in EXPECTED_TABLES: assert table in actual @pytest.mark.asyncio async def test_table_columns(pg: PgStore) -> None: """Every expected table has the correct columns.""" await pg.migrate() actual = await _get_actual_tables(pg) for table, expected_cols in EXPECTED_TABLES.items(): actual_cols = actual.get(table, []) for col in expected_cols: assert col in actual_cols, ( f"Column '{col}' missing from table '{table}'. " f"Actual columns: {actual_cols}" ) @pytest.mark.asyncio async def test_primary_keys(pg: PgStore) -> None: """Tables that should have primary keys do.""" await pg.migrate() async with pg.pool.acquire() as conn: for table, expected_pk in TABLES_WITH_PRIMARY_KEY.items(): rows = await conn.fetch( "SELECT kcu.column_name FROM information_schema.table_constraints tc " "JOIN information_schema.key_column_usage kcu " "ON tc.constraint_name = kcu.constraint_name " "WHERE tc.table_schema = 'public' AND tc.table_name = $1 " "AND tc.constraint_type = 'PRIMARY KEY' " "ORDER BY kcu.ordinal_position", table, ) pk_columns = [r["column_name"] for r in rows] expected_list = [expected_pk] if isinstance(expected_pk, str) else expected_pk for col in expected_list: assert col in pk_columns, ( f"Table '{table}' should have PK column '{col}'. " f"Actual PK columns: {pk_columns}" ) @pytest.mark.asyncio async def test_unique_constraints(pg: PgStore) -> None: """Tables that should have UNIQUE constraints do.""" await pg.migrate() async with pg.pool.acquire() as conn: for table, expected_ucs in TABLES_WITH_UNIQUE_CONSTRAINTS.items(): rows = await conn.fetch( "SELECT kcu.column_name FROM information_schema.table_constraints tc " "JOIN information_schema.key_column_usage kcu " "ON tc.constraint_name = kcu.constraint_name " "WHERE tc.table_schema = 'public' AND tc.table_name = $1 " "AND tc.constraint_type = 'UNIQUE'", table, ) uc_columns = {r["column_name"] for r in rows} for expected_cols in expected_ucs: cols = [c.strip() for c in expected_cols.split(",")] for col in cols: assert col in uc_columns, ( f"Table '{table}' should have UNIQUE column '{col}'. " f"Actual UNIQUE columns: {uc_columns}" ) @pytest.mark.asyncio async def test_table_row_count_is_zero(pg: PgStore) -> None: """All tables start empty after migration.""" await pg.migrate() for table in EXPECTED_TABLES: count = await _table_row_count(pg, table) assert count == 0, ( f"Table '{table}' should be empty after migration, " f"but has {count} rows" ) @pytest.mark.asyncio async def test_schema_migration_version_recorded(pg: PgStore) -> None: """schema_migrations has the expected version after migrate.""" from arbitrade.storage.pg_store import SCHEMA_VERSION await pg.migrate() async with pg.pool.acquire() as conn: row = await conn.fetchrow("SELECT MAX(version) AS v FROM schema_migrations") assert row is not None assert row["v"] == SCHEMA_VERSION, ( f"Expected schema version {SCHEMA_VERSION}, " f"got {row['v']}" ) @pytest.mark.asyncio async def test_create_and_query_row(pg: PgStore) -> None: """Can INSERT a row and SELECT it back (round-trip for a simple table).""" await pg.migrate() async with pg.pool.acquire() as conn: # ConfigSections round-trip await conn.execute( "INSERT INTO config_sections (name, description) VALUES ($1, $2)", "test_section", "A test section for integration test", ) row = await conn.fetchrow( "SELECT name, description FROM config_sections WHERE name = $1", "test_section", ) assert row is not None assert row["name"] == "test_section" assert row["description"] == "A test section for integration test" # Clean up await conn.execute( "DELETE FROM config_sections WHERE name = $1", "test_section", ) @pytest.mark.asyncio async def test_config_pairings_upsert(pg: PgStore) -> None: """ON CONFLICT ... DO UPDATE works on config_pairings (unique constraint).""" await pg.migrate() from arbitrade.config.service import ConfigPairing from arbitrade.storage.repositories import ConfigPairingRepository repo = ConfigPairingRepository(pg) # Insert p1 = await repo.upsert_pairing( ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=True, source="kraken") ) assert p1.id is not None assert p1.base_asset == "XBT" assert p1.enabled is True # Upsert (update) p2 = await repo.upsert_pairing( ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=False, source="manual") ) assert p2.id == p1.id # same row assert p2.enabled is False assert p2.source == "manual" # Clean up deleted = await repo.delete_pairing("XBT", "USD") assert deleted is True @pytest.mark.asyncio async def test_audit_list_recent(pg: PgStore) -> None: """AuditRepository.list_recent returns records in desc order.""" await pg.migrate() from datetime import UTC, datetime from arbitrade.storage.repositories import AuditRecord, AuditRepository repo = AuditRepository(pg) now = datetime.now(UTC) # Insert a few records for i in range(3): await repo.insert( AuditRecord( occurred_at=now, actor="test", event_type="integration_test", decision=f"decision_{i}", payload={"index": i}, correlation_id=f"corr_{i}", ) ) recent = await repo.list_recent(limit=5) assert len(recent) >= 3 assert recent[0].decision in ("decision_2", "decision_1", "decision_0") # Verify payload serialization worked first = recent[0] if first.payload: assert "index" in first.payload