from __future__ import annotations import asyncio from dataclasses import dataclass from datetime import UTC, datetime from types import SimpleNamespace import pytest from arbitrade.detection.engine import OpportunityEvent from arbitrade.exchange.models import BookDelta, BookLevel from arbitrade.market_data.feed import ExecutionOutcome, MarketDataFeed from arbitrade.risk.kill_switch import KillSwitch from arbitrade.risk.loss_limits import LossLimitGuard from arbitrade.risk.pre_trade import PreTradeValidator from arbitrade.risk.stop_conditions import StopConditionsGuard from arbitrade.risk.trade_limits import TradeLimitsGuard @dataclass(slots=True) class _FakeWsClient: delta: BookDelta async def connect_stream(self): yield SimpleNamespace(payload={"channel": "book"}) def parse_book_delta(self, _payload: dict[str, object]) -> BookDelta: return self.delta class _FakeSnapshotWriter: def __init__(self) -> None: self.items: list[object] = [] async def enqueue(self, snapshot: object) -> None: self.items.append(snapshot) class _FakeOpportunityWriter: def __init__(self) -> None: self.items: list[OpportunityEvent] = [] async def enqueue(self, event: OpportunityEvent) -> None: self.items.append(event) class _FakeDetector: def __init__(self, event: OpportunityEvent) -> None: self._event = event self.last_base_capital: float | None = None def opportunities_for_updated_pair( self, _updated_pair: str, _books: dict[str, object], *, base_capital: float, ): self.last_base_capital = base_capital return [self._event] class _FakeExecutor: def __init__(self) -> None: self.calls: list[OpportunityEvent] = [] self.realized_pnls: list[float | None] = [] self.outcomes: list[ExecutionOutcome] = [] async def execute(self, event: OpportunityEvent) -> ExecutionOutcome | float | None: self.calls.append(event) if self.outcomes: return self.outcomes.pop(0) if not self.realized_pnls: return None return self.realized_pnls.pop(0) class _FakeFailingExecutor: def __init__(self) -> None: self.calls: int = 0 async def execute(self, _event: OpportunityEvent) -> None: self.calls += 1 raise RuntimeError("executor failure") class _FakeAlertNotifier: def __init__(self) -> None: self.events: list[dict[str, str]] = [] async def notify( self, *, category: str, severity: str, title: str, message: str, details: dict[str, str] | None = None, ) -> bool: self.events.append( { "category": category, "severity": severity, "title": title, "message": message, **(details or {}), } ) return True @dataclass(slots=True) class _FakeWsClientTwoMessages: delta: BookDelta async def connect_stream(self): yield SimpleNamespace(payload={"channel": "book", "seq": 1}) yield SimpleNamespace(payload={"channel": "book", "seq": 2}) def parse_book_delta(self, _payload: dict[str, object]) -> BookDelta: return self.delta def _sample_event(*, allocated_capital: float = 1.0) -> OpportunityEvent: return OpportunityEvent( detected_at=datetime.now(UTC), cycle="USD->BTC->ETH->USD", updated_pair="BTC/USD", gross_rate=1.04, net_rate=1.03, gross_pct=4.0, net_pct=3.0, est_profit=0.03, allocated_capital=allocated_capital, ) def _sample_delta() -> BookDelta: return BookDelta( symbol="BTC/USD", bids=[BookLevel(price=100.0, volume=1.0)], asks=[BookLevel(price=100.5, volume=1.0)], ) @pytest.mark.asyncio async def test_market_data_feed_dry_run_does_not_execute_orders() -> None: event = _sample_event() executor = _FakeExecutor() feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=_FakeDetector(event), opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=True, opportunity_executor=executor.execute, ) await feed.run() assert executor.calls == [] @pytest.mark.asyncio async def test_market_data_feed_live_mode_executes_orders() -> None: event = _sample_event() executor = _FakeExecutor() feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=_FakeDetector(event), opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, ) await feed.run() assert len(executor.calls) == 1 assert executor.calls[0].cycle == "USD->BTC->ETH->USD" @pytest.mark.asyncio async def test_market_data_feed_enforces_per_trade_capital_limit() -> None: event = _sample_event() detector = _FakeDetector(event) feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=True, trade_capital=250.0, max_trade_capital=100.0, ) await feed.run() assert detector.last_base_capital == 100.0 @pytest.mark.asyncio async def test_market_data_feed_auto_halts_on_daily_loss_limit() -> None: event = _sample_event() detector = _FakeDetector(event) executor = _FakeExecutor() executor.realized_pnls = [-60.0, -10.0] loss_guard = LossLimitGuard(daily_loss_limit=50.0) feed = MarketDataFeed( ws_client=_FakeWsClientTwoMessages(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, loss_limit_guard=loss_guard, ) await feed.run() assert len(executor.calls) == 1 assert loss_guard.is_halted assert loss_guard.halted_reason == "daily_loss_limit_breached" @pytest.mark.asyncio async def test_market_data_feed_auto_halts_on_cumulative_loss_limit() -> None: event = _sample_event() detector = _FakeDetector(event) executor = _FakeExecutor() executor.realized_pnls = [-40.0, -15.0] loss_guard = LossLimitGuard(cumulative_loss_limit=50.0) feed = MarketDataFeed( ws_client=_FakeWsClientTwoMessages(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, loss_limit_guard=loss_guard, ) await feed.run() assert len(executor.calls) == 2 assert loss_guard.is_halted assert loss_guard.halted_reason == "cumulative_loss_limit_breached" @pytest.mark.asyncio async def test_market_data_feed_enforces_max_concurrent_trades() -> None: event = _sample_event() detector = _FakeDetector(event) executor = _FakeExecutor() executor.outcomes = [ExecutionOutcome(realized_pnl=None, close_trade=False)] trade_guard = TradeLimitsGuard(max_concurrent_trades=1) feed = MarketDataFeed( ws_client=_FakeWsClientTwoMessages(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, trade_limits_guard=trade_guard, ) await feed.run() assert len(executor.calls) == 1 assert trade_guard.active_trades == 1 @pytest.mark.asyncio async def test_market_data_feed_enforces_per_asset_exposure_cap() -> None: event = _sample_event(allocated_capital=100.0) detector = _FakeDetector(event) executor = _FakeExecutor() trade_guard = TradeLimitsGuard(max_exposure_per_asset=50.0) feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, trade_limits_guard=trade_guard, ) await feed.run() assert len(executor.calls) == 0 @pytest.mark.asyncio async def test_market_data_feed_blocks_when_pre_trade_balance_insufficient() -> None: event = _sample_event(allocated_capital=100.0) detector = _FakeDetector(event) executor = _FakeExecutor() validator = PreTradeValidator(min_order_size_by_asset={"USD": 50.0}) feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, pre_trade_validator=validator, balance_provider=lambda: {"USD": 25.0}, quote_balance_asset="USD", ) await feed.run() assert len(executor.calls) == 0 @pytest.mark.asyncio async def test_market_data_feed_blocks_when_pre_trade_min_order_not_met() -> None: event = _sample_event(allocated_capital=25.0) detector = _FakeDetector(event) executor = _FakeExecutor() validator = PreTradeValidator(min_order_size_by_asset={"USD": 50.0}) feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, pre_trade_validator=validator, balance_provider=lambda: {"USD": 500.0}, quote_balance_asset="USD", ) await feed.run() assert len(executor.calls) == 0 @pytest.mark.asyncio async def test_market_data_feed_allows_when_pre_trade_validation_passes() -> None: event = _sample_event(allocated_capital=75.0) detector = _FakeDetector(event) executor = _FakeExecutor() validator = PreTradeValidator(min_order_size_by_asset={"USD": 50.0}) feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, pre_trade_validator=validator, balance_provider=lambda: {"USD": 500.0}, quote_balance_asset="USD", ) await feed.run() assert len(executor.calls) == 1 @pytest.mark.asyncio async def test_market_data_feed_blocks_when_kill_switch_active() -> None: event = _sample_event(allocated_capital=75.0) detector = _FakeDetector(event) executor = _FakeExecutor() kill_switch = KillSwitch(active=True, reason="manual") feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, kill_switch=kill_switch, ) await feed.run() assert len(executor.calls) == 0 @pytest.mark.asyncio async def test_market_data_feed_allows_when_kill_switch_inactive() -> None: event = _sample_event(allocated_capital=75.0) detector = _FakeDetector(event) executor = _FakeExecutor() kill_switch = KillSwitch(active=False) feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, kill_switch=kill_switch, ) await feed.run() assert len(executor.calls) == 1 @pytest.mark.asyncio async def test_market_data_feed_halts_on_abnormal_source_latency() -> None: event = _sample_event(allocated_capital=75.0) detector = _FakeDetector(event) executor = _FakeExecutor() kill_switch = KillSwitch(active=False) stop_guard = StopConditionsGuard(max_source_latency_ms=1.0) delta = _sample_delta() delta.source_timestamp_ms = 0 feed = MarketDataFeed( ws_client=_FakeWsClient(delta), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, kill_switch=kill_switch, stop_conditions_guard=stop_guard, ) await feed.run() assert stop_guard.is_halted assert stop_guard.halted_reason == "source_latency_limit_breached" assert kill_switch.is_active assert kill_switch.reason == "source_latency_limit_breached" assert len(executor.calls) == 0 @pytest.mark.asyncio async def test_market_data_feed_halts_on_repeated_execution_failures() -> None: event = _sample_event(allocated_capital=75.0) detector = _FakeDetector(event) executor = _FakeFailingExecutor() kill_switch = KillSwitch(active=False) stop_guard = StopConditionsGuard(max_consecutive_failures=2) feed = MarketDataFeed( ws_client=_FakeWsClientTwoMessages(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, kill_switch=kill_switch, stop_conditions_guard=stop_guard, ) await feed.run() assert executor.calls == 2 assert stop_guard.is_halted assert stop_guard.halted_reason == "consecutive_failures_limit_breached" assert kill_switch.is_active assert kill_switch.reason == "consecutive_failures_limit_breached" @pytest.mark.asyncio async def test_market_data_feed_emits_critical_alert_on_executor_exception() -> None: event = _sample_event(allocated_capital=75.0) detector = _FakeDetector(event) executor = _FakeFailingExecutor() notifier = _FakeAlertNotifier() feed = MarketDataFeed( ws_client=_FakeWsClient(_sample_delta()), snapshot_writer=_FakeSnapshotWriter(), detector=detector, opportunity_writer=_FakeOpportunityWriter(), paper_trading_mode=False, opportunity_executor=executor.execute, alert_notifier=notifier, ) await feed.run() await asyncio.sleep(0) assert executor.calls == 1 assert len(notifier.events) == 1 assert notifier.events[0]["category"] == "system" assert notifier.events[0]["severity"] == "critical" assert notifier.events[0]["title"] == "Critical execution exception"