a89886186f
- Introduced OpportunityEvent class for structured opportunity data. - Enhanced IncrementalCycleDetector to generate opportunities based on updated pairs. - Implemented AsyncOpportunityWriter for persisting opportunities to the database. - Updated MarketDataFeed to handle opportunity detection and execution in both paper and live trading modes. - Added unit tests for opportunity detection and persistence.
113 lines
2.9 KiB
Python
113 lines
2.9 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from arbitrade.detection.engine import OpportunityEvent
|
|
from arbitrade.exchange.models import BookDelta, BookLevel
|
|
from arbitrade.market_data.feed import MarketDataFeed
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class _FakeWsClient:
|
|
delta: BookDelta
|
|
|
|
async def connect_stream(self):
|
|
yield SimpleNamespace(payload={"channel": "book"})
|
|
|
|
def parse_book_delta(self, _payload: dict[str, object]) -> BookDelta:
|
|
return self.delta
|
|
|
|
|
|
class _FakeSnapshotWriter:
|
|
def __init__(self) -> None:
|
|
self.items: list[object] = []
|
|
|
|
async def enqueue(self, snapshot: object) -> None:
|
|
self.items.append(snapshot)
|
|
|
|
|
|
class _FakeOpportunityWriter:
|
|
def __init__(self) -> None:
|
|
self.items: list[OpportunityEvent] = []
|
|
|
|
async def enqueue(self, event: OpportunityEvent) -> None:
|
|
self.items.append(event)
|
|
|
|
|
|
class _FakeDetector:
|
|
def __init__(self, event: OpportunityEvent) -> None:
|
|
self._event = event
|
|
|
|
def opportunities_for_updated_pair(self, _updated_pair: str, _books: dict[str, object]):
|
|
return [self._event]
|
|
|
|
|
|
class _FakeExecutor:
|
|
def __init__(self) -> None:
|
|
self.calls: list[OpportunityEvent] = []
|
|
|
|
async def execute(self, event: OpportunityEvent) -> None:
|
|
self.calls.append(event)
|
|
|
|
|
|
def _sample_event() -> OpportunityEvent:
|
|
return OpportunityEvent(
|
|
detected_at=datetime.now(UTC),
|
|
cycle="USD->BTC->ETH->USD",
|
|
updated_pair="BTC/USD",
|
|
gross_rate=1.04,
|
|
net_rate=1.03,
|
|
gross_pct=4.0,
|
|
net_pct=3.0,
|
|
est_profit=0.03,
|
|
)
|
|
|
|
|
|
def _sample_delta() -> BookDelta:
|
|
return BookDelta(
|
|
symbol="BTC/USD",
|
|
bids=[BookLevel(price=100.0, volume=1.0)],
|
|
asks=[BookLevel(price=100.5, volume=1.0)],
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_market_data_feed_dry_run_does_not_execute_orders() -> None:
|
|
event = _sample_event()
|
|
executor = _FakeExecutor()
|
|
feed = MarketDataFeed(
|
|
ws_client=_FakeWsClient(_sample_delta()),
|
|
snapshot_writer=_FakeSnapshotWriter(),
|
|
detector=_FakeDetector(event),
|
|
opportunity_writer=_FakeOpportunityWriter(),
|
|
paper_trading_mode=True,
|
|
opportunity_executor=executor.execute,
|
|
)
|
|
|
|
await feed.run()
|
|
|
|
assert executor.calls == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_market_data_feed_live_mode_executes_orders() -> None:
|
|
event = _sample_event()
|
|
executor = _FakeExecutor()
|
|
feed = MarketDataFeed(
|
|
ws_client=_FakeWsClient(_sample_delta()),
|
|
snapshot_writer=_FakeSnapshotWriter(),
|
|
detector=_FakeDetector(event),
|
|
opportunity_writer=_FakeOpportunityWriter(),
|
|
paper_trading_mode=False,
|
|
opportunity_executor=executor.execute,
|
|
)
|
|
|
|
await feed.run()
|
|
|
|
assert len(executor.calls) == 1
|
|
assert executor.calls[0].cycle == "USD->BTC->ETH->USD"
|