Files
arbitrade/tests/integration/test_postgresql_schema.py
zwitschi dc99f1604e
CI / lint-test-build (push) Successful in 54s
Refactor code for improved readability and consistency
- Cleaned up multiline statements and removed unnecessary line breaks in various files.
- Ensured consistent formatting in function definitions and calls across the codebase.
- Updated docstrings and comments for clarity where applicable.
- Removed trailing newlines in module docstrings.
- Enhanced logging statements for better clarity in maintenance tasks.
2026-06-07 21:59:09 +02:00

425 lines
13 KiB
Python

"""Integration tests: verify PostgreSQL schema and connection.
These tests connect to the PostgreSQL server at 192.168.88.35 and
validate that all expected tables, columns, and constraints exist.
They are skipped if the server is unreachable.
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import pytest
import pytest_asyncio
from arbitrade.config.settings import get_settings
from arbitrade.storage.pg_store import PgStore
pytestmark = pytest.mark.integration
# ── expected schema ──────────────────────────────────────────────────────────
EXPECTED_TABLES: dict[str, list[str]] = {
"schema_migrations": ["version", "applied_at"],
"config_sections": ["id", "name", "description", "updated_at"],
"config_settings": [
"key",
"section",
"value_json",
"value_type",
"is_secret",
"is_runtime_reloadable",
"updated_at",
"updated_by",
],
"config_pairings": [
"id",
"base_asset",
"quote_asset",
"enabled",
"source",
"created_at",
"updated_at",
],
"config_backtesting_defaults": [
"id",
"starting_balances",
"trade_capital",
"min_profit_threshold",
"slippage_bps",
"execution_latency_ms",
"fee_source",
],
"opportunities": [
"id",
"detected_at",
"cycle",
"gross_pct",
"net_pct",
"est_profit",
"executed",
],
"trades": [
"id",
"trade_ref",
"started_at",
"finished_at",
"status",
"realized_pnl",
"estimated_pnl",
"capital_used",
"cycle",
"leg_count",
],
"orders": [
"id",
"trade_ref",
"order_ref",
"leg_index",
"pair",
"side",
"volume",
"user_ref",
"status",
"filled_volume",
"avg_price",
"raw_response",
"recorded_at",
],
"pnl_events": [
"id",
"trade_ref",
"recorded_at",
"kind",
"pnl_usd",
"source",
],
"portfolio_snapshots": ["snapshot_at", "balances", "total_value_usd"],
"market_snapshots": ["snapshot_at", "symbol", "source", "payload", "latency_ms"],
"audit_events": [
"id",
"occurred_at",
"actor",
"event_type",
"decision",
"payload",
"correlation_id",
],
"runtime_state_snapshots": [
"snapshot_at",
"is_running",
"kill_switch_active",
"kill_switch_reason",
"open_trade_count",
"last_known_balances",
"note",
],
"kraken_account_snapshots": [
"snapshot_at",
"fee_tier",
"maker_fee",
"taker_fee",
"thirty_day_volume",
"trade_balance_raw",
"fee_schedule_raw",
],
"backtest_jobs": [
"id",
"status",
"events_path",
"config",
"report",
"error",
"created_at",
"started_at",
"finished_at",
],
}
# Tables that should have a primary key
TABLES_WITH_PRIMARY_KEY: dict[str, str | list[str]] = {
"schema_migrations": "version",
"config_sections": "id",
"config_settings": "key",
"config_pairings": "id",
"config_backtesting_defaults": "id",
"opportunities": "id",
"trades": "id",
"orders": "id",
"pnl_events": "id",
"audit_events": "id",
"backtest_jobs": "id",
}
# Tables with a UNIQUE constraint beyond the primary key
TABLES_WITH_UNIQUE_CONSTRAINTS: dict[str, list[str]] = {
"config_sections": ["name"],
"config_pairings": ["base_asset, quote_asset"],
}
# ── fixtures ────────────────────────────────────────────────────────────────
@asynccontextmanager
async def _pg_lifecycle() -> AsyncIterator[PgStore]:
"""Connect, yield store, then disconnect."""
settings = get_settings()
store = PgStore(settings)
try:
await store.start()
yield store
finally:
await store.stop()
@pytest_asyncio.fixture(name="pg")
async def pg_fixture() -> AsyncIterator[PgStore]:
async with _pg_lifecycle() as store:
yield store
# ── helpers ─────────────────────────────────────────────────────────────────
async def _get_actual_tables(store: PgStore) -> dict[str, list[str]]:
"""Return {table_name: [column_name, ...]} for the public schema."""
actual: dict[str, list[str]] = {}
async with store.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT table_name, column_name FROM information_schema.columns "
"WHERE table_schema = 'public' ORDER BY table_name, ordinal_position"
)
for row in rows:
tbl: str = row["table_name"]
col: str = row["column_name"]
actual.setdefault(tbl, []).append(col)
return actual
async def _table_row_count(store: PgStore, table: str) -> int:
async with store.pool.acquire() as conn:
row = await conn.fetchrow(f"SELECT COUNT(*) AS cnt FROM {table}")
return int(row["cnt"]) if row else 0
# ── tests ───────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_pg_connect(pg: PgStore) -> None:
"""Can connect to PostgreSQL and ping the server."""
async with pg.pool.acquire() as conn:
val = await conn.fetchval("SELECT 1 AS val")
assert val == 1
@pytest.mark.asyncio
async def test_pgcrypto_extension(pg: PgStore) -> None:
"""The pgcrypto extension is available (gen_random_uuid)."""
async with pg.pool.acquire() as conn:
val = await conn.fetchval("SELECT gen_random_uuid()")
assert val is not None
# The result should be a UUID object
assert len(str(val)) == 36 # UUID string length
@pytest.mark.asyncio
async def test_schema_migration_applies(pg: PgStore) -> None:
"""Migrate creates all expected tables."""
await pg.migrate()
actual = await _get_actual_tables(pg)
for table in EXPECTED_TABLES:
assert table in actual, (
f"Table '{table}' missing after migration. " f"Found tables: {sorted(actual)}"
)
@pytest.mark.asyncio
async def test_migration_is_idempotent(pg: PgStore) -> None:
"""Running migrate twice does not raise."""
await pg.migrate()
await pg.migrate() # second call should be a no-op
actual = await _get_actual_tables(pg)
for table in EXPECTED_TABLES:
assert table in actual
@pytest.mark.asyncio
async def test_table_columns(pg: PgStore) -> None:
"""Every expected table has the correct columns."""
await pg.migrate()
actual = await _get_actual_tables(pg)
for table, expected_cols in EXPECTED_TABLES.items():
actual_cols = actual.get(table, [])
for col in expected_cols:
assert col in actual_cols, (
f"Column '{col}' missing from table '{table}'. " f"Actual columns: {actual_cols}"
)
@pytest.mark.asyncio
async def test_primary_keys(pg: PgStore) -> None:
"""Tables that should have primary keys do."""
await pg.migrate()
async with pg.pool.acquire() as conn:
for table, expected_pk in TABLES_WITH_PRIMARY_KEY.items():
rows = await conn.fetch(
"SELECT kcu.column_name FROM information_schema.table_constraints tc "
"JOIN information_schema.key_column_usage kcu "
"ON tc.constraint_name = kcu.constraint_name "
"WHERE tc.table_schema = 'public' AND tc.table_name = $1 "
"AND tc.constraint_type = 'PRIMARY KEY' "
"ORDER BY kcu.ordinal_position",
table,
)
pk_columns = [r["column_name"] for r in rows]
expected_list = [expected_pk] if isinstance(expected_pk, str) else expected_pk
for col in expected_list:
assert col in pk_columns, (
f"Table '{table}' should have PK column '{col}'. "
f"Actual PK columns: {pk_columns}"
)
@pytest.mark.asyncio
async def test_unique_constraints(pg: PgStore) -> None:
"""Tables that should have UNIQUE constraints do."""
await pg.migrate()
async with pg.pool.acquire() as conn:
for table, expected_ucs in TABLES_WITH_UNIQUE_CONSTRAINTS.items():
rows = await conn.fetch(
"SELECT kcu.column_name FROM information_schema.table_constraints tc "
"JOIN information_schema.key_column_usage kcu "
"ON tc.constraint_name = kcu.constraint_name "
"WHERE tc.table_schema = 'public' AND tc.table_name = $1 "
"AND tc.constraint_type = 'UNIQUE'",
table,
)
uc_columns = {r["column_name"] for r in rows}
for expected_cols in expected_ucs:
cols = [c.strip() for c in expected_cols.split(",")]
for col in cols:
assert col in uc_columns, (
f"Table '{table}' should have UNIQUE column '{col}'. "
f"Actual UNIQUE columns: {uc_columns}"
)
@pytest.mark.asyncio
async def test_table_row_count_is_zero(pg: PgStore) -> None:
"""All tables start empty after migration."""
await pg.migrate()
for table in EXPECTED_TABLES:
count = await _table_row_count(pg, table)
assert count == 0, (
f"Table '{table}' should be empty after migration, " f"but has {count} rows"
)
@pytest.mark.asyncio
async def test_schema_migration_version_recorded(pg: PgStore) -> None:
"""schema_migrations has the expected version after migrate."""
from arbitrade.storage.pg_store import SCHEMA_VERSION
await pg.migrate()
async with pg.pool.acquire() as conn:
row = await conn.fetchrow("SELECT MAX(version) AS v FROM schema_migrations")
assert row is not None
assert row["v"] == SCHEMA_VERSION, (
f"Expected schema version {SCHEMA_VERSION}, " f"got {row['v']}"
)
@pytest.mark.asyncio
async def test_create_and_query_row(pg: PgStore) -> None:
"""Can INSERT a row and SELECT it back (round-trip for a simple table)."""
await pg.migrate()
async with pg.pool.acquire() as conn:
# ConfigSections round-trip
await conn.execute(
"INSERT INTO config_sections (name, description) VALUES ($1, $2)",
"test_section",
"A test section for integration test",
)
row = await conn.fetchrow(
"SELECT name, description FROM config_sections WHERE name = $1",
"test_section",
)
assert row is not None
assert row["name"] == "test_section"
assert row["description"] == "A test section for integration test"
# Clean up
await conn.execute(
"DELETE FROM config_sections WHERE name = $1",
"test_section",
)
@pytest.mark.asyncio
async def test_config_pairings_upsert(pg: PgStore) -> None:
"""ON CONFLICT ... DO UPDATE works on config_pairings (unique constraint)."""
await pg.migrate()
from arbitrade.config.service import ConfigPairing
from arbitrade.storage.repositories import ConfigPairingRepository
repo = ConfigPairingRepository(pg)
# Insert
p1 = await repo.upsert_pairing(
ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=True, source="kraken")
)
assert p1.id is not None
assert p1.base_asset == "XBT"
assert p1.enabled is True
# Upsert (update)
p2 = await repo.upsert_pairing(
ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=False, source="manual")
)
assert p2.id == p1.id # same row
assert p2.enabled is False
assert p2.source == "manual"
# Clean up
deleted = await repo.delete_pairing("XBT", "USD")
assert deleted is True
@pytest.mark.asyncio
async def test_audit_list_recent(pg: PgStore) -> None:
"""AuditRepository.list_recent returns records in desc order."""
await pg.migrate()
from datetime import UTC, datetime
from arbitrade.storage.repositories import AuditRecord, AuditRepository
repo = AuditRepository(pg)
now = datetime.now(UTC)
# Insert a few records
for i in range(3):
await repo.insert(
AuditRecord(
occurred_at=now,
actor="test",
event_type="integration_test",
decision=f"decision_{i}",
payload={"index": i},
correlation_id=f"corr_{i}",
)
)
recent = await repo.list_recent(limit=5)
assert len(recent) >= 3
assert recent[0].decision in ("decision_2", "decision_1", "decision_0")
# Verify payload serialization worked
first = recent[0]
if first.payload:
assert "index" in first.payload