dc99f1604e
CI / lint-test-build (push) Successful in 54s
- Cleaned up multiline statements and removed unnecessary line breaks in various files. - Ensured consistent formatting in function definitions and calls across the codebase. - Updated docstrings and comments for clarity where applicable. - Removed trailing newlines in module docstrings. - Enhanced logging statements for better clarity in maintenance tasks.
425 lines
13 KiB
Python
425 lines
13 KiB
Python
"""Integration tests: verify PostgreSQL schema and connection.
|
|
|
|
These tests connect to the PostgreSQL server at 192.168.88.35 and
|
|
validate that all expected tables, columns, and constraints exist.
|
|
They are skipped if the server is unreachable.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncIterator
|
|
from contextlib import asynccontextmanager
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from arbitrade.config.settings import get_settings
|
|
from arbitrade.storage.pg_store import PgStore
|
|
|
|
pytestmark = pytest.mark.integration
|
|
|
|
|
|
# ── expected schema ──────────────────────────────────────────────────────────
|
|
|
|
EXPECTED_TABLES: dict[str, list[str]] = {
|
|
"schema_migrations": ["version", "applied_at"],
|
|
"config_sections": ["id", "name", "description", "updated_at"],
|
|
"config_settings": [
|
|
"key",
|
|
"section",
|
|
"value_json",
|
|
"value_type",
|
|
"is_secret",
|
|
"is_runtime_reloadable",
|
|
"updated_at",
|
|
"updated_by",
|
|
],
|
|
"config_pairings": [
|
|
"id",
|
|
"base_asset",
|
|
"quote_asset",
|
|
"enabled",
|
|
"source",
|
|
"created_at",
|
|
"updated_at",
|
|
],
|
|
"config_backtesting_defaults": [
|
|
"id",
|
|
"starting_balances",
|
|
"trade_capital",
|
|
"min_profit_threshold",
|
|
"slippage_bps",
|
|
"execution_latency_ms",
|
|
"fee_source",
|
|
],
|
|
"opportunities": [
|
|
"id",
|
|
"detected_at",
|
|
"cycle",
|
|
"gross_pct",
|
|
"net_pct",
|
|
"est_profit",
|
|
"executed",
|
|
],
|
|
"trades": [
|
|
"id",
|
|
"trade_ref",
|
|
"started_at",
|
|
"finished_at",
|
|
"status",
|
|
"realized_pnl",
|
|
"estimated_pnl",
|
|
"capital_used",
|
|
"cycle",
|
|
"leg_count",
|
|
],
|
|
"orders": [
|
|
"id",
|
|
"trade_ref",
|
|
"order_ref",
|
|
"leg_index",
|
|
"pair",
|
|
"side",
|
|
"volume",
|
|
"user_ref",
|
|
"status",
|
|
"filled_volume",
|
|
"avg_price",
|
|
"raw_response",
|
|
"recorded_at",
|
|
],
|
|
"pnl_events": [
|
|
"id",
|
|
"trade_ref",
|
|
"recorded_at",
|
|
"kind",
|
|
"pnl_usd",
|
|
"source",
|
|
],
|
|
"portfolio_snapshots": ["snapshot_at", "balances", "total_value_usd"],
|
|
"market_snapshots": ["snapshot_at", "symbol", "source", "payload", "latency_ms"],
|
|
"audit_events": [
|
|
"id",
|
|
"occurred_at",
|
|
"actor",
|
|
"event_type",
|
|
"decision",
|
|
"payload",
|
|
"correlation_id",
|
|
],
|
|
"runtime_state_snapshots": [
|
|
"snapshot_at",
|
|
"is_running",
|
|
"kill_switch_active",
|
|
"kill_switch_reason",
|
|
"open_trade_count",
|
|
"last_known_balances",
|
|
"note",
|
|
],
|
|
"kraken_account_snapshots": [
|
|
"snapshot_at",
|
|
"fee_tier",
|
|
"maker_fee",
|
|
"taker_fee",
|
|
"thirty_day_volume",
|
|
"trade_balance_raw",
|
|
"fee_schedule_raw",
|
|
],
|
|
"backtest_jobs": [
|
|
"id",
|
|
"status",
|
|
"events_path",
|
|
"config",
|
|
"report",
|
|
"error",
|
|
"created_at",
|
|
"started_at",
|
|
"finished_at",
|
|
],
|
|
}
|
|
|
|
# Tables that should have a primary key
|
|
TABLES_WITH_PRIMARY_KEY: dict[str, str | list[str]] = {
|
|
"schema_migrations": "version",
|
|
"config_sections": "id",
|
|
"config_settings": "key",
|
|
"config_pairings": "id",
|
|
"config_backtesting_defaults": "id",
|
|
"opportunities": "id",
|
|
"trades": "id",
|
|
"orders": "id",
|
|
"pnl_events": "id",
|
|
"audit_events": "id",
|
|
"backtest_jobs": "id",
|
|
}
|
|
|
|
# Tables with a UNIQUE constraint beyond the primary key
|
|
TABLES_WITH_UNIQUE_CONSTRAINTS: dict[str, list[str]] = {
|
|
"config_sections": ["name"],
|
|
"config_pairings": ["base_asset, quote_asset"],
|
|
}
|
|
|
|
|
|
# ── fixtures ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@asynccontextmanager
|
|
async def _pg_lifecycle() -> AsyncIterator[PgStore]:
|
|
"""Connect, yield store, then disconnect."""
|
|
settings = get_settings()
|
|
store = PgStore(settings)
|
|
try:
|
|
await store.start()
|
|
yield store
|
|
finally:
|
|
await store.stop()
|
|
|
|
|
|
@pytest_asyncio.fixture(name="pg")
|
|
async def pg_fixture() -> AsyncIterator[PgStore]:
|
|
async with _pg_lifecycle() as store:
|
|
yield store
|
|
|
|
|
|
# ── helpers ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def _get_actual_tables(store: PgStore) -> dict[str, list[str]]:
|
|
"""Return {table_name: [column_name, ...]} for the public schema."""
|
|
actual: dict[str, list[str]] = {}
|
|
async with store.pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"SELECT table_name, column_name FROM information_schema.columns "
|
|
"WHERE table_schema = 'public' ORDER BY table_name, ordinal_position"
|
|
)
|
|
for row in rows:
|
|
tbl: str = row["table_name"]
|
|
col: str = row["column_name"]
|
|
actual.setdefault(tbl, []).append(col)
|
|
return actual
|
|
|
|
|
|
async def _table_row_count(store: PgStore, table: str) -> int:
|
|
async with store.pool.acquire() as conn:
|
|
row = await conn.fetchrow(f"SELECT COUNT(*) AS cnt FROM {table}")
|
|
return int(row["cnt"]) if row else 0
|
|
|
|
|
|
# ── tests ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pg_connect(pg: PgStore) -> None:
|
|
"""Can connect to PostgreSQL and ping the server."""
|
|
async with pg.pool.acquire() as conn:
|
|
val = await conn.fetchval("SELECT 1 AS val")
|
|
assert val == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pgcrypto_extension(pg: PgStore) -> None:
|
|
"""The pgcrypto extension is available (gen_random_uuid)."""
|
|
async with pg.pool.acquire() as conn:
|
|
val = await conn.fetchval("SELECT gen_random_uuid()")
|
|
assert val is not None
|
|
# The result should be a UUID object
|
|
assert len(str(val)) == 36 # UUID string length
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_schema_migration_applies(pg: PgStore) -> None:
|
|
"""Migrate creates all expected tables."""
|
|
await pg.migrate()
|
|
actual = await _get_actual_tables(pg)
|
|
|
|
for table in EXPECTED_TABLES:
|
|
assert table in actual, (
|
|
f"Table '{table}' missing after migration. " f"Found tables: {sorted(actual)}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_migration_is_idempotent(pg: PgStore) -> None:
|
|
"""Running migrate twice does not raise."""
|
|
await pg.migrate()
|
|
await pg.migrate() # second call should be a no-op
|
|
actual = await _get_actual_tables(pg)
|
|
for table in EXPECTED_TABLES:
|
|
assert table in actual
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_table_columns(pg: PgStore) -> None:
|
|
"""Every expected table has the correct columns."""
|
|
await pg.migrate()
|
|
actual = await _get_actual_tables(pg)
|
|
|
|
for table, expected_cols in EXPECTED_TABLES.items():
|
|
actual_cols = actual.get(table, [])
|
|
for col in expected_cols:
|
|
assert col in actual_cols, (
|
|
f"Column '{col}' missing from table '{table}'. " f"Actual columns: {actual_cols}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_primary_keys(pg: PgStore) -> None:
|
|
"""Tables that should have primary keys do."""
|
|
await pg.migrate()
|
|
async with pg.pool.acquire() as conn:
|
|
for table, expected_pk in TABLES_WITH_PRIMARY_KEY.items():
|
|
rows = await conn.fetch(
|
|
"SELECT kcu.column_name FROM information_schema.table_constraints tc "
|
|
"JOIN information_schema.key_column_usage kcu "
|
|
"ON tc.constraint_name = kcu.constraint_name "
|
|
"WHERE tc.table_schema = 'public' AND tc.table_name = $1 "
|
|
"AND tc.constraint_type = 'PRIMARY KEY' "
|
|
"ORDER BY kcu.ordinal_position",
|
|
table,
|
|
)
|
|
pk_columns = [r["column_name"] for r in rows]
|
|
expected_list = [expected_pk] if isinstance(expected_pk, str) else expected_pk
|
|
for col in expected_list:
|
|
assert col in pk_columns, (
|
|
f"Table '{table}' should have PK column '{col}'. "
|
|
f"Actual PK columns: {pk_columns}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unique_constraints(pg: PgStore) -> None:
|
|
"""Tables that should have UNIQUE constraints do."""
|
|
await pg.migrate()
|
|
async with pg.pool.acquire() as conn:
|
|
for table, expected_ucs in TABLES_WITH_UNIQUE_CONSTRAINTS.items():
|
|
rows = await conn.fetch(
|
|
"SELECT kcu.column_name FROM information_schema.table_constraints tc "
|
|
"JOIN information_schema.key_column_usage kcu "
|
|
"ON tc.constraint_name = kcu.constraint_name "
|
|
"WHERE tc.table_schema = 'public' AND tc.table_name = $1 "
|
|
"AND tc.constraint_type = 'UNIQUE'",
|
|
table,
|
|
)
|
|
uc_columns = {r["column_name"] for r in rows}
|
|
for expected_cols in expected_ucs:
|
|
cols = [c.strip() for c in expected_cols.split(",")]
|
|
for col in cols:
|
|
assert col in uc_columns, (
|
|
f"Table '{table}' should have UNIQUE column '{col}'. "
|
|
f"Actual UNIQUE columns: {uc_columns}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_table_row_count_is_zero(pg: PgStore) -> None:
|
|
"""All tables start empty after migration."""
|
|
await pg.migrate()
|
|
for table in EXPECTED_TABLES:
|
|
count = await _table_row_count(pg, table)
|
|
assert count == 0, (
|
|
f"Table '{table}' should be empty after migration, " f"but has {count} rows"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_schema_migration_version_recorded(pg: PgStore) -> None:
|
|
"""schema_migrations has the expected version after migrate."""
|
|
from arbitrade.storage.pg_store import SCHEMA_VERSION
|
|
|
|
await pg.migrate()
|
|
async with pg.pool.acquire() as conn:
|
|
row = await conn.fetchrow("SELECT MAX(version) AS v FROM schema_migrations")
|
|
assert row is not None
|
|
assert row["v"] == SCHEMA_VERSION, (
|
|
f"Expected schema version {SCHEMA_VERSION}, " f"got {row['v']}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_and_query_row(pg: PgStore) -> None:
|
|
"""Can INSERT a row and SELECT it back (round-trip for a simple table)."""
|
|
await pg.migrate()
|
|
async with pg.pool.acquire() as conn:
|
|
# ConfigSections round-trip
|
|
await conn.execute(
|
|
"INSERT INTO config_sections (name, description) VALUES ($1, $2)",
|
|
"test_section",
|
|
"A test section for integration test",
|
|
)
|
|
row = await conn.fetchrow(
|
|
"SELECT name, description FROM config_sections WHERE name = $1",
|
|
"test_section",
|
|
)
|
|
assert row is not None
|
|
assert row["name"] == "test_section"
|
|
assert row["description"] == "A test section for integration test"
|
|
|
|
# Clean up
|
|
await conn.execute(
|
|
"DELETE FROM config_sections WHERE name = $1",
|
|
"test_section",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_pairings_upsert(pg: PgStore) -> None:
|
|
"""ON CONFLICT ... DO UPDATE works on config_pairings (unique constraint)."""
|
|
await pg.migrate()
|
|
from arbitrade.config.service import ConfigPairing
|
|
from arbitrade.storage.repositories import ConfigPairingRepository
|
|
|
|
repo = ConfigPairingRepository(pg)
|
|
|
|
# Insert
|
|
p1 = await repo.upsert_pairing(
|
|
ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=True, source="kraken")
|
|
)
|
|
assert p1.id is not None
|
|
assert p1.base_asset == "XBT"
|
|
assert p1.enabled is True
|
|
|
|
# Upsert (update)
|
|
p2 = await repo.upsert_pairing(
|
|
ConfigPairing(base_asset="XBT", quote_asset="USD", enabled=False, source="manual")
|
|
)
|
|
assert p2.id == p1.id # same row
|
|
assert p2.enabled is False
|
|
assert p2.source == "manual"
|
|
|
|
# Clean up
|
|
deleted = await repo.delete_pairing("XBT", "USD")
|
|
assert deleted is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_audit_list_recent(pg: PgStore) -> None:
|
|
"""AuditRepository.list_recent returns records in desc order."""
|
|
await pg.migrate()
|
|
from datetime import UTC, datetime
|
|
|
|
from arbitrade.storage.repositories import AuditRecord, AuditRepository
|
|
|
|
repo = AuditRepository(pg)
|
|
now = datetime.now(UTC)
|
|
|
|
# Insert a few records
|
|
for i in range(3):
|
|
await repo.insert(
|
|
AuditRecord(
|
|
occurred_at=now,
|
|
actor="test",
|
|
event_type="integration_test",
|
|
decision=f"decision_{i}",
|
|
payload={"index": i},
|
|
correlation_id=f"corr_{i}",
|
|
)
|
|
)
|
|
|
|
recent = await repo.list_recent(limit=5)
|
|
assert len(recent) >= 3
|
|
assert recent[0].decision in ("decision_2", "decision_1", "decision_0")
|
|
# Verify payload serialization worked
|
|
first = recent[0]
|
|
if first.payload:
|
|
assert "index" in first.payload
|