From a89886186f8c89bd065e7489625fc19646b96340 Mon Sep 17 00:00:00 2001 From: zwitschi Date: Mon, 1 Jun 2026 10:59:09 +0200 Subject: [PATCH] Add opportunity detection and storage functionality with async processing - Introduced OpportunityEvent class for structured opportunity data. - Enhanced IncrementalCycleDetector to generate opportunities based on updated pairs. - Implemented AsyncOpportunityWriter for persisting opportunities to the database. - Updated MarketDataFeed to handle opportunity detection and execution in both paper and live trading modes. - Added unit tests for opportunity detection and persistence. --- .env.example | 1 + src/arbitrade/config/settings.py | 28 +++- src/arbitrade/detection/__init__.py | 3 +- src/arbitrade/detection/engine.py | 207 ++++++++++++++++++++--- src/arbitrade/market_data/feed.py | 57 ++++++- src/arbitrade/storage/opportunities.py | 58 +++++++ src/arbitrade/storage/repositories.py | 39 +++++ tests/unit/test_incremental_detector.py | 214 ++++++++++++++++++++++++ tests/unit/test_market_data_feed.py | 112 +++++++++++++ tests/unit/test_opportunity_writer.py | 48 ++++++ 10 files changed, 728 insertions(+), 39 deletions(-) create mode 100644 src/arbitrade/storage/opportunities.py create mode 100644 tests/unit/test_market_data_feed.py create mode 100644 tests/unit/test_opportunity_writer.py diff --git a/.env.example b/.env.example index 910ec2f..02cfdbd 100644 --- a/.env.example +++ b/.env.example @@ -15,3 +15,4 @@ KRAKEN_RETRY_ATTEMPTS=3 KRAKEN_RETRY_BASE_DELAY_SECONDS=0.25 WS_HEARTBEAT_TIMEOUT_SECONDS=20.0 WS_MAX_STALENESS_SECONDS=5.0 +PAPER_TRADING_MODE=true diff --git a/src/arbitrade/config/settings.py b/src/arbitrade/config/settings.py index 7987332..7ae2355 100644 --- a/src/arbitrade/config/settings.py +++ b/src/arbitrade/config/settings.py @@ -8,7 +8,8 @@ from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): - model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") + model_config = SettingsConfigDict( + env_file=".env", env_file_encoding="utf-8", extra="ignore") app_env: str = Field(default="dev", alias="APP_ENV") app_host: str = Field(default="0.0.0.0", alias="APP_HOST") @@ -17,22 +18,31 @@ class Settings(BaseSettings): log_level: str = Field(default="INFO", alias="LOG_LEVEL") log_json: bool = Field(default=True, alias="LOG_JSON") - duckdb_path: Path = Field(default=Path("./data/arbitrade.duckdb"), alias="DUCKDB_PATH") + duckdb_path: Path = Field(default=Path( + "./data/arbitrade.duckdb"), alias="DUCKDB_PATH") - kraken_rest_url: str = Field(default="https://api.kraken.com", alias="KRAKEN_REST_URL") - kraken_ws_url: str = Field(default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL") + kraken_rest_url: str = Field( + default="https://api.kraken.com", alias="KRAKEN_REST_URL") + kraken_ws_url: str = Field( + default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL") kraken_private_rate_limit_seconds: float = Field( default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS" ) - kraken_http_timeout_seconds: float = Field(default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS") - kraken_retry_attempts: int = Field(default=3, alias="KRAKEN_RETRY_ATTEMPTS") + kraken_http_timeout_seconds: float = Field( + default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS") + kraken_retry_attempts: int = Field( + default=3, alias="KRAKEN_RETRY_ATTEMPTS") kraken_retry_base_delay_seconds: float = Field( default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS" ) kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY") - kraken_api_secret: str | None = Field(default=None, alias="KRAKEN_API_SECRET") - ws_heartbeat_timeout_seconds: float = Field(default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS") - ws_max_staleness_seconds: float = Field(default=5.0, alias="WS_MAX_STALENESS_SECONDS") + kraken_api_secret: str | None = Field( + default=None, alias="KRAKEN_API_SECRET") + ws_heartbeat_timeout_seconds: float = Field( + default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS") + ws_max_staleness_seconds: float = Field( + default=5.0, alias="WS_MAX_STALENESS_SECONDS") + paper_trading_mode: bool = Field(default=True, alias="PAPER_TRADING_MODE") fernet_key: str | None = Field(default=None, alias="FERNET_KEY") diff --git a/src/arbitrade/detection/__init__.py b/src/arbitrade/detection/__init__.py index 73bc859..efdc829 100644 --- a/src/arbitrade/detection/__init__.py +++ b/src/arbitrade/detection/__init__.py @@ -1,11 +1,12 @@ """Arbitrage detection package.""" -from arbitrade.detection.engine import CycleScore, IncrementalCycleDetector +from arbitrade.detection.engine import CycleScore, IncrementalCycleDetector, OpportunityEvent from arbitrade.detection.graph import CurrencyGraph, TriangularCycle __all__ = [ "CurrencyGraph", "TriangularCycle", "CycleScore", + "OpportunityEvent", "IncrementalCycleDetector", ] diff --git a/src/arbitrade/detection/engine.py b/src/arbitrade/detection/engine.py index f7b1d97..2f12ae1 100644 --- a/src/arbitrade/detection/engine.py +++ b/src/arbitrade/detection/engine.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from datetime import UTC, datetime from arbitrade.detection.graph import TriangularCycle +from arbitrade.exchange.models import BookLevel from arbitrade.market_data.order_book import OrderBook @@ -20,19 +21,80 @@ def _normalize_pair_symbol(symbol: str) -> str: class CycleScore: cycle: TriangularCycle gross_rate: float + net_rate: float + min_profit_threshold: float updated_pair: str scored_at: datetime @property def is_profitable(self) -> bool: - return self.gross_rate > 1.0 + return (self.net_rate - 1.0) >= self.min_profit_threshold + + +@dataclass(frozen=True, slots=True) +class OpportunityEvent: + detected_at: datetime + cycle: str + updated_pair: str + gross_rate: float + net_rate: float + gross_pct: float + net_pct: float + est_profit: float + + @classmethod + def from_cycle_score(cls, score: CycleScore, base_capital: float = 1.0) -> OpportunityEvent: + gross_pct = (score.gross_rate - 1.0) * 100.0 + net_pct = (score.net_rate - 1.0) * 100.0 + est_profit = (score.net_rate - 1.0) * base_capital + a, b, c = score.cycle.currencies + cycle = f"{a}->{b}->{c}->{a}" + return cls( + detected_at=score.scored_at, + cycle=cycle, + updated_pair=score.updated_pair, + gross_rate=score.gross_rate, + net_rate=score.net_rate, + gross_pct=gross_pct, + net_pct=net_pct, + est_profit=est_profit, + ) class IncrementalCycleDetector: - def __init__(self, cycles_by_pair: Mapping[str, list[TriangularCycle]]) -> None: + def __init__( + self, + cycles_by_pair: Mapping[str, list[TriangularCycle]], + *, + fee_rate: float = 0.0, + max_depth_levels: int = 10, + min_profit_threshold: float = 0.0, + min_order_size_by_pair: Mapping[str, float] | None = None, + max_book_age_seconds: float | None = None, + ) -> None: self._cycles_by_pair = { _normalize_pair_symbol(pair): list(cycles) for pair, cycles in cycles_by_pair.items() } + self._fee_multiplier = 1.0 - fee_rate + self._max_depth_levels = max_depth_levels + self._min_profit_threshold = min_profit_threshold + self._max_book_age_seconds = max_book_age_seconds + self._min_order_size_by_pair = { + _normalize_pair_symbol(pair): float(min_size) + for pair, min_size in (min_order_size_by_pair or {}).items() + } + + if self._fee_multiplier < 0.0: + raise ValueError("fee_rate must be <= 1.0") + if self._max_depth_levels <= 0: + raise ValueError("max_depth_levels must be > 0") + if self._min_profit_threshold < 0.0: + raise ValueError("min_profit_threshold must be >= 0.0") + if self._max_book_age_seconds is not None and self._max_book_age_seconds <= 0.0: + raise ValueError("max_book_age_seconds must be > 0.0") + for min_size in self._min_order_size_by_pair.values(): + if min_size <= 0.0: + raise ValueError("minimum order size must be > 0.0") def score_updated_pair( self, @@ -42,19 +104,23 @@ class IncrementalCycleDetector: normalized_pair = _normalize_pair_symbol(updated_pair) impacted_cycles = self._cycles_by_pair.get(normalized_pair, []) - normalized_books = {_normalize_pair_symbol( - symbol): book for symbol, book in books.items()} + normalized_books = {_normalize_pair_symbol(symbol): book for symbol, book in books.items()} scores: list[CycleScore] = [] scored_at = datetime.now(UTC) for cycle in impacted_cycles: - gross_rate = self._score_cycle(cycle, normalized_books) - if gross_rate is None: + rates = self._score_cycle(cycle, normalized_books, scored_at) + if rates is None: + continue + gross_rate, net_rate = rates + if (net_rate - 1.0) < self._min_profit_threshold: continue scores.append( CycleScore( cycle=cycle, gross_rate=gross_rate, + net_rate=net_rate, + min_profit_threshold=self._min_profit_threshold, updated_pair=normalized_pair, scored_at=scored_at, ) @@ -62,28 +128,74 @@ class IncrementalCycleDetector: return scores + def opportunities_for_updated_pair( + self, + updated_pair: str, + books: Mapping[str, OrderBook], + *, + base_capital: float = 1.0, + ) -> list[OpportunityEvent]: + if base_capital <= 0.0: + raise ValueError("base_capital must be > 0.0") + + scores = self.score_updated_pair(updated_pair, books) + return [OpportunityEvent.from_cycle_score(score, base_capital) for score in scores] + def _score_cycle( self, cycle: TriangularCycle, books: Mapping[str, OrderBook], - ) -> float | None: + scored_at: datetime, + ) -> tuple[float, float] | None: + if not self._is_cycle_fresh(cycle, books, scored_at): + return None + a, b, c = cycle.currencies - amount = 1.0 + gross_amount = 1.0 - amount_ab = self._convert(amount, a, b, cycle, books) - if amount_ab is None: + gross_ab = self._convert(gross_amount, a, b, cycle, books) + if gross_ab is None: return None - amount = amount_ab + net_ab = gross_ab * self._fee_multiplier - amount_bc = self._convert(amount, b, c, cycle, books) - if amount_bc is None: + gross_bc = self._convert(gross_ab, b, c, cycle, books) + if gross_bc is None: return None - amount = amount_bc + net_bc = self._convert(net_ab, b, c, cycle, books) + if net_bc is None: + return None + net_bc *= self._fee_multiplier - amount_ca = self._convert(amount, c, a, cycle, books) - if amount_ca is None: + gross_ca = self._convert(gross_bc, c, a, cycle, books) + if gross_ca is None: return None - return amount_ca + net_ca = self._convert(net_bc, c, a, cycle, books) + if net_ca is None: + return None + net_ca *= self._fee_multiplier + + return gross_ca, net_ca + + def _is_cycle_fresh( + self, + cycle: TriangularCycle, + books: Mapping[str, OrderBook], + scored_at: datetime, + ) -> bool: + if self._max_book_age_seconds is None: + return True + + for pair in cycle.pairs: + normalized_pair = _normalize_pair_symbol(pair) + book = books.get(normalized_pair) + if book is None: + return False + + age_seconds = (scored_at - book.updated_at).total_seconds() + if age_seconds > self._max_book_age_seconds: + return False + + return True @staticmethod def _pair_for_edge(cycle: TriangularCycle, from_currency: str, to_currency: str) -> str | None: @@ -113,20 +225,69 @@ class IncrementalCycleDetector: if book is None: return None + bids, asks = book.top_levels(depth=self._max_depth_levels) + base, quote = pair.split("/", 1) base = base.upper() quote = quote.upper() if from_currency == base and to_currency == quote: - best_bid = book.best_bid() - if best_bid is None: + quote_out = self._sell_base_for_quote(amount, bids) + if quote_out is None: return None - return amount * best_bid.price + if not self._is_min_order_size_satisfied(pair, amount): + return None + return quote_out if from_currency == quote and to_currency == base: - best_ask = book.best_ask() - if best_ask is None or best_ask.price <= 0.0: + base_out = self._buy_base_with_quote(amount, asks) + if base_out is None: return None - return amount / best_ask.price + if not self._is_min_order_size_satisfied(pair, base_out): + return None + return base_out return None + + def _is_min_order_size_satisfied(self, pair: str, base_amount: float) -> bool: + min_size = self._min_order_size_by_pair.get(pair) + if min_size is None: + return True + return base_amount >= min_size + + @staticmethod + def _sell_base_for_quote(amount_base: float, bids: list[BookLevel]) -> float | None: + remaining = amount_base + quote_out = 0.0 + for level in bids: + if remaining <= 0.0: + break + if level.price <= 0.0 or level.volume <= 0.0: + continue + + executed = min(remaining, level.volume) + quote_out += executed * level.price + remaining -= executed + + if remaining > 0.0: + return None + return quote_out + + @staticmethod + def _buy_base_with_quote(amount_quote: float, asks: list[BookLevel]) -> float | None: + remaining_quote = amount_quote + base_out = 0.0 + for level in asks: + if remaining_quote <= 0.0: + break + if level.price <= 0.0 or level.volume <= 0.0: + continue + + level_quote_capacity = level.volume * level.price + spend = min(remaining_quote, level_quote_capacity) + base_out += spend / level.price + remaining_quote -= spend + + if remaining_quote > 0.0: + return None + return base_out diff --git a/src/arbitrade/market_data/feed.py b/src/arbitrade/market_data/feed.py index 9c06ff4..df6f442 100644 --- a/src/arbitrade/market_data/feed.py +++ b/src/arbitrade/market_data/feed.py @@ -1,14 +1,16 @@ from __future__ import annotations import time +from collections.abc import Awaitable, Callable from datetime import UTC, datetime import structlog -from arbitrade.detection.engine import IncrementalCycleDetector +from arbitrade.detection.engine import IncrementalCycleDetector, OpportunityEvent from arbitrade.exchange.kraken_ws import KrakenWsClient from arbitrade.market_data.order_book import OrderBook from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter, MarketSnapshot +from arbitrade.storage.opportunities import AsyncOpportunityWriter _LOG = structlog.get_logger(__name__) @@ -19,11 +21,18 @@ class MarketDataFeed: ws_client: KrakenWsClient, snapshot_writer: AsyncMarketSnapshotWriter, detector: IncrementalCycleDetector | None = None, + opportunity_writer: AsyncOpportunityWriter | None = None, + paper_trading_mode: bool = True, + opportunity_executor: Callable[[ + OpportunityEvent], Awaitable[None]] | None = None, ) -> None: self._ws_client = ws_client self._snapshot_writer = snapshot_writer self._books: dict[str, OrderBook] = {} self._detector = detector + self._opportunity_writer = opportunity_writer + self._paper_trading_mode = paper_trading_mode + self._opportunity_executor = opportunity_executor @property def books(self) -> dict[str, OrderBook]: @@ -62,12 +71,48 @@ class MarketDataFeed: ) if self._detector is not None: - scores = self._detector.score_updated_pair(delta.symbol, self._books) - _LOG.debug( - "incremental_cycle_scores", - symbol=delta.symbol, - impacted_scores=len(scores), + opportunities = self._detector.opportunities_for_updated_pair( + delta.symbol, + self._books, ) + _LOG.debug( + "incremental_opportunity_scores", + symbol=delta.symbol, + opportunities=len(opportunities), + ) + + for event in opportunities: + _LOG.info( + "opportunity_detected", + cycle=event.cycle, + updated_pair=event.updated_pair, + gross_pct=event.gross_pct, + net_pct=event.net_pct, + est_profit=event.est_profit, + mode="paper" if self._paper_trading_mode else "live", + ) + + if self._opportunity_writer is not None: + await self._opportunity_writer.enqueue(event) + + if self._paper_trading_mode: + _LOG.info( + "paper_trade_simulated", + cycle=event.cycle, + updated_pair=event.updated_pair, + net_pct=event.net_pct, + ) + continue + + if self._opportunity_executor is None: + _LOG.warning( + "live_trade_skipped_no_executor", + cycle=event.cycle, + updated_pair=event.updated_pair, + ) + continue + + await self._opportunity_executor(event) await self._snapshot_writer.enqueue( MarketSnapshot( diff --git a/src/arbitrade/storage/opportunities.py b/src/arbitrade/storage/opportunities.py new file mode 100644 index 0000000..032b23a --- /dev/null +++ b/src/arbitrade/storage/opportunities.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import asyncio + +import structlog + +from arbitrade.detection.engine import OpportunityEvent +from arbitrade.storage.repositories import OpportunityRecord, OpportunityRepository + +_LOG = structlog.get_logger(__name__) + + +class AsyncOpportunityWriter: + def __init__(self, repository: OpportunityRepository, max_queue_size: int = 50_000) -> None: + self._repository = repository + self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue(maxsize=max_queue_size) + self._task: asyncio.Task[None] | None = None + self._stop = asyncio.Event() + + async def start(self) -> None: + if self._task is None or self._task.done(): + self._stop.clear() + self._task = asyncio.create_task(self._run(), name="opportunity-writer") + + async def stop(self) -> None: + self._stop.set() + if self._task is not None: + await self._task + + async def enqueue(self, event: OpportunityEvent) -> None: + await self._queue.put(event) + + async def _run(self) -> None: + while not (self._stop.is_set() and self._queue.empty()): + try: + event = await asyncio.wait_for(self._queue.get(), timeout=0.5) + except TimeoutError: + continue + + try: + self._repository.insert( + OpportunityRecord( + detected_at=event.detected_at, + cycle=event.cycle, + gross_pct=event.gross_pct, + net_pct=event.net_pct, + est_profit=event.est_profit, + ) + ) + except Exception as exc: + _LOG.error( + "opportunity_write_failed", + error=str(exc), + cycle=event.cycle, + updated_pair=event.updated_pair, + ) + finally: + self._queue.task_done() diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index 8c96309..6633dc4 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -18,6 +18,16 @@ class MarketSnapshotRecord: latency_ms: float | None +@dataclass(slots=True) +class OpportunityRecord: + detected_at: datetime + cycle: str + gross_pct: float + net_pct: float + est_profit: float + executed: bool = False + + class MarketSnapshotRepository: def __init__(self, store: DuckDBStore) -> None: self._store = store @@ -37,3 +47,32 @@ class MarketSnapshotRepository: record.latency_ms, ], ) + + +class OpportunityRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def insert(self, record: OpportunityRecord) -> None: + with self._store.connect() as conn: + conn.execute( + """ + INSERT INTO opportunities ( + detected_at, + cycle, + gross_pct, + net_pct, + est_profit, + executed + ) + VALUES (?, ?, ?, ?, ?, ?) + """, + [ + record.detected_at, + record.cycle, + record.gross_pct, + record.net_pct, + record.est_profit, + record.executed, + ], + ) diff --git a/tests/unit/test_incremental_detector.py b/tests/unit/test_incremental_detector.py index 48cd999..204ef9f 100644 --- a/tests/unit/test_incremental_detector.py +++ b/tests/unit/test_incremental_detector.py @@ -1,3 +1,7 @@ +from datetime import UTC, datetime, timedelta + +import pytest + from arbitrade.detection.engine import IncrementalCycleDetector from arbitrade.detection.graph import CurrencyGraph, TriangularCycle from arbitrade.exchange.models import BookLevel @@ -11,6 +15,15 @@ def _make_book(*, bid: float, ask: float) -> OrderBook: return book +def _make_book_levels( + *, bids: list[tuple[float, float]], asks: list[tuple[float, float]] +) -> OrderBook: + book = OrderBook() + book.apply_bids([BookLevel(price=price, volume=volume) for price, volume in bids]) + book.apply_asks([BookLevel(price=price, volume=volume) for price, volume in asks]) + return book + + def test_incremental_detector_scores_only_cycles_touched_by_pair() -> None: cycle_a = TriangularCycle( currencies=("USD", "BTC", "ETH"), @@ -63,4 +76,205 @@ def test_incremental_detector_uses_best_bid_ask_for_gross_rate() -> None: assert len(scores) == 1 assert scores[0].gross_rate == 1.04 + assert scores[0].net_rate == 1.04 assert scores[0].is_profitable + + +def test_incremental_detector_applies_fees_to_net_rate() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector( + CurrencyGraph.index_cycles_by_pair([cycle]), + fee_rate=0.001, + ) + + books = { + "BTC/USD": _make_book(bid=99.9, ask=100.0), + "ETH/BTC": _make_book(bid=0.049, ask=0.05), + "ETH/USD": _make_book(bid=5.20, ask=5.21), + } + + scores = detector.score_updated_pair("ETH/BTC", books) + + assert len(scores) == 1 + assert scores[0].gross_rate == 1.04 + assert scores[0].net_rate < scores[0].gross_rate + + +def test_incremental_detector_uses_depth_and_slippage() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector( + CurrencyGraph.index_cycles_by_pair([cycle]), + max_depth_levels=2, + ) + + books = { + "BTC/USD": _make_book_levels( + bids=[(99.9, 5.0)], + asks=[(100.0, 0.002), (101.0, 0.020)], + ), + "ETH/BTC": _make_book_levels( + bids=[(0.049, 5.0)], + asks=[(0.05, 0.5)], + ), + "ETH/USD": _make_book_levels( + bids=[(5.2, 5.0)], + asks=[(5.21, 5.0)], + ), + } + + scores = detector.score_updated_pair("BTC/USD", books) + + assert len(scores) == 1 + assert scores[0].gross_rate < 1.04 + + +def test_incremental_detector_returns_no_score_on_insufficient_depth() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector( + CurrencyGraph.index_cycles_by_pair([cycle]), + max_depth_levels=1, + ) + + books = { + "BTC/USD": _make_book_levels( + bids=[(99.9, 5.0)], + asks=[(100.0, 0.001)], + ), + "ETH/BTC": _make_book(bid=0.049, ask=0.05), + "ETH/USD": _make_book(bid=5.2, ask=5.21), + } + + scores = detector.score_updated_pair("BTC/USD", books) + + assert scores == [] + + +def test_incremental_detector_filters_below_profit_threshold() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector( + CurrencyGraph.index_cycles_by_pair([cycle]), + min_profit_threshold=0.05, + ) + + books = { + "BTC/USD": _make_book(bid=99.9, ask=100.0), + "ETH/BTC": _make_book(bid=0.049, ask=0.05), + "ETH/USD": _make_book(bid=5.20, ask=5.21), + } + + scores = detector.score_updated_pair("ETH/BTC", books) + + assert scores == [] + + +def test_incremental_detector_enforces_min_order_size_by_pair() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector( + CurrencyGraph.index_cycles_by_pair([cycle]), + min_order_size_by_pair={"BTC/USD": 0.02}, + ) + + books = { + "BTC/USD": _make_book_levels( + bids=[(99.9, 5.0)], + asks=[(100.0, 0.005), (101.0, 0.005), (102.0, 0.005)], + ), + "ETH/BTC": _make_book(bid=0.049, ask=0.05), + "ETH/USD": _make_book(bid=5.2, ask=5.21), + } + + scores = detector.score_updated_pair("BTC/USD", books) + + assert scores == [] + + +def test_incremental_detector_rejects_stale_books() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector( + CurrencyGraph.index_cycles_by_pair([cycle]), + max_book_age_seconds=1.0, + ) + + books = { + "BTC/USD": _make_book(bid=99.9, ask=100.0), + "ETH/BTC": _make_book(bid=0.049, ask=0.05), + "ETH/USD": _make_book(bid=5.20, ask=5.21), + } + books["ETH/BTC"]._updated_at = datetime.now(UTC) - timedelta(seconds=5) + + scores = detector.score_updated_pair("ETH/BTC", books) + + assert scores == [] + + +def test_incremental_detector_accepts_fresh_books_with_staleness_enabled() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector( + CurrencyGraph.index_cycles_by_pair([cycle]), + max_book_age_seconds=5.0, + ) + + books = { + "BTC/USD": _make_book(bid=99.9, ask=100.0), + "ETH/BTC": _make_book(bid=0.049, ask=0.05), + "ETH/USD": _make_book(bid=5.20, ask=5.21), + } + now = datetime.now(UTC) + for book in books.values(): + book._updated_at = now - timedelta(seconds=0.2) + + scores = detector.score_updated_pair("ETH/BTC", books) + + assert len(scores) == 1 + + +def test_incremental_detector_emits_structured_opportunity_event() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector( + CurrencyGraph.index_cycles_by_pair([cycle]), + min_profit_threshold=0.01, + ) + + books = { + "BTC/USD": _make_book(bid=99.9, ask=100.0), + "ETH/BTC": _make_book(bid=0.049, ask=0.05), + "ETH/USD": _make_book(bid=5.20, ask=5.21), + } + + opportunities = detector.opportunities_for_updated_pair( + "ETH/BTC", + books, + base_capital=500.0, + ) + + assert len(opportunities) == 1 + event = opportunities[0] + assert event.cycle == "USD->BTC->ETH->USD" + assert event.updated_pair == "ETH/BTC" + assert event.gross_pct == pytest.approx(4.0) + assert event.net_pct == pytest.approx(4.0) + assert event.est_profit == pytest.approx(20.0) diff --git a/tests/unit/test_market_data_feed.py b/tests/unit/test_market_data_feed.py new file mode 100644 index 0000000..43198c5 --- /dev/null +++ b/tests/unit/test_market_data_feed.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import UTC, datetime +from types import SimpleNamespace + +import pytest + +from arbitrade.detection.engine import OpportunityEvent +from arbitrade.exchange.models import BookDelta, BookLevel +from arbitrade.market_data.feed import MarketDataFeed + + +@dataclass(slots=True) +class _FakeWsClient: + delta: BookDelta + + async def connect_stream(self): + yield SimpleNamespace(payload={"channel": "book"}) + + def parse_book_delta(self, _payload: dict[str, object]) -> BookDelta: + return self.delta + + +class _FakeSnapshotWriter: + def __init__(self) -> None: + self.items: list[object] = [] + + async def enqueue(self, snapshot: object) -> None: + self.items.append(snapshot) + + +class _FakeOpportunityWriter: + def __init__(self) -> None: + self.items: list[OpportunityEvent] = [] + + async def enqueue(self, event: OpportunityEvent) -> None: + self.items.append(event) + + +class _FakeDetector: + def __init__(self, event: OpportunityEvent) -> None: + self._event = event + + def opportunities_for_updated_pair(self, _updated_pair: str, _books: dict[str, object]): + return [self._event] + + +class _FakeExecutor: + def __init__(self) -> None: + self.calls: list[OpportunityEvent] = [] + + async def execute(self, event: OpportunityEvent) -> None: + self.calls.append(event) + + +def _sample_event() -> OpportunityEvent: + return OpportunityEvent( + detected_at=datetime.now(UTC), + cycle="USD->BTC->ETH->USD", + updated_pair="BTC/USD", + gross_rate=1.04, + net_rate=1.03, + gross_pct=4.0, + net_pct=3.0, + est_profit=0.03, + ) + + +def _sample_delta() -> BookDelta: + return BookDelta( + symbol="BTC/USD", + bids=[BookLevel(price=100.0, volume=1.0)], + asks=[BookLevel(price=100.5, volume=1.0)], + ) + + +@pytest.mark.asyncio +async def test_market_data_feed_dry_run_does_not_execute_orders() -> None: + event = _sample_event() + executor = _FakeExecutor() + feed = MarketDataFeed( + ws_client=_FakeWsClient(_sample_delta()), + snapshot_writer=_FakeSnapshotWriter(), + detector=_FakeDetector(event), + opportunity_writer=_FakeOpportunityWriter(), + paper_trading_mode=True, + opportunity_executor=executor.execute, + ) + + await feed.run() + + assert executor.calls == [] + + +@pytest.mark.asyncio +async def test_market_data_feed_live_mode_executes_orders() -> None: + event = _sample_event() + executor = _FakeExecutor() + feed = MarketDataFeed( + ws_client=_FakeWsClient(_sample_delta()), + snapshot_writer=_FakeSnapshotWriter(), + detector=_FakeDetector(event), + opportunity_writer=_FakeOpportunityWriter(), + paper_trading_mode=False, + opportunity_executor=executor.execute, + ) + + await feed.run() + + assert len(executor.calls) == 1 + assert executor.calls[0].cycle == "USD->BTC->ETH->USD" diff --git a/tests/unit/test_opportunity_writer.py b/tests/unit/test_opportunity_writer.py new file mode 100644 index 0000000..cccacf1 --- /dev/null +++ b/tests/unit/test_opportunity_writer.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest + +from arbitrade.config.settings import Settings +from arbitrade.detection.engine import OpportunityEvent +from arbitrade.storage.db import DuckDBStore +from arbitrade.storage.opportunities import AsyncOpportunityWriter +from arbitrade.storage.repositories import OpportunityRepository + + +@pytest.mark.asyncio +async def test_async_opportunity_writer_persists_events(tmp_path) -> None: + settings = Settings(_env_file=None, DUCKDB_PATH=tmp_path / "test.duckdb") + store = DuckDBStore(settings) + store.migrate() + + repository = OpportunityRepository(store) + writer = AsyncOpportunityWriter(repository, max_queue_size=10) + await writer.start() + + event = OpportunityEvent( + detected_at=datetime.now(UTC), + cycle="USD->BTC->ETH->USD", + updated_pair="BTC/USD", + gross_rate=1.04, + net_rate=1.03, + gross_pct=4.0, + net_pct=3.0, + est_profit=0.03, + ) + + await writer.enqueue(event) + await writer.stop() + + with store.connect() as conn: + rows = conn.execute( + "SELECT cycle, gross_pct, net_pct, est_profit, executed FROM opportunities" + ).fetchall() + + assert len(rows) == 1 + assert rows[0][0] == "USD->BTC->ETH->USD" + assert rows[0][1] == 4.0 + assert rows[0][2] == 3.0 + assert rows[0][3] == 0.03 + assert rows[0][4] is False