diff --git a/src/arbitrade/detection/__init__.py b/src/arbitrade/detection/__init__.py index cac25be..73bc859 100644 --- a/src/arbitrade/detection/__init__.py +++ b/src/arbitrade/detection/__init__.py @@ -1 +1,11 @@ """Arbitrage detection package.""" + +from arbitrade.detection.engine import CycleScore, IncrementalCycleDetector +from arbitrade.detection.graph import CurrencyGraph, TriangularCycle + +__all__ = [ + "CurrencyGraph", + "TriangularCycle", + "CycleScore", + "IncrementalCycleDetector", +] diff --git a/src/arbitrade/detection/engine.py b/src/arbitrade/detection/engine.py new file mode 100644 index 0000000..f7b1d97 --- /dev/null +++ b/src/arbitrade/detection/engine.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass +from datetime import UTC, datetime + +from arbitrade.detection.graph import TriangularCycle +from arbitrade.market_data.order_book import OrderBook + + +def _normalize_pair_symbol(symbol: str) -> str: + if "/" not in symbol: + return symbol.upper() + + base, quote = symbol.split("/", 1) + return f"{base.upper()}/{quote.upper()}" + + +@dataclass(frozen=True, slots=True) +class CycleScore: + cycle: TriangularCycle + gross_rate: float + updated_pair: str + scored_at: datetime + + @property + def is_profitable(self) -> bool: + return self.gross_rate > 1.0 + + +class IncrementalCycleDetector: + def __init__(self, cycles_by_pair: Mapping[str, list[TriangularCycle]]) -> None: + self._cycles_by_pair = { + _normalize_pair_symbol(pair): list(cycles) for pair, cycles in cycles_by_pair.items() + } + + def score_updated_pair( + self, + updated_pair: str, + books: Mapping[str, OrderBook], + ) -> list[CycleScore]: + normalized_pair = _normalize_pair_symbol(updated_pair) + impacted_cycles = self._cycles_by_pair.get(normalized_pair, []) + + normalized_books = {_normalize_pair_symbol( + symbol): book for symbol, book in books.items()} + + scores: list[CycleScore] = [] + scored_at = datetime.now(UTC) + for cycle in impacted_cycles: + gross_rate = self._score_cycle(cycle, normalized_books) + if gross_rate is None: + continue + scores.append( + CycleScore( + cycle=cycle, + gross_rate=gross_rate, + updated_pair=normalized_pair, + scored_at=scored_at, + ) + ) + + return scores + + def _score_cycle( + self, + cycle: TriangularCycle, + books: Mapping[str, OrderBook], + ) -> float | None: + a, b, c = cycle.currencies + amount = 1.0 + + amount_ab = self._convert(amount, a, b, cycle, books) + if amount_ab is None: + return None + amount = amount_ab + + amount_bc = self._convert(amount, b, c, cycle, books) + if amount_bc is None: + return None + amount = amount_bc + + amount_ca = self._convert(amount, c, a, cycle, books) + if amount_ca is None: + return None + return amount_ca + + @staticmethod + def _pair_for_edge(cycle: TriangularCycle, from_currency: str, to_currency: str) -> str | None: + for pair in cycle.pairs: + if "/" not in pair: + continue + base, quote = pair.split("/", 1) + base = base.upper() + quote = quote.upper() + if {base, quote} == {from_currency, to_currency}: + return f"{base}/{quote}" + return None + + def _convert( + self, + amount: float, + from_currency: str, + to_currency: str, + cycle: TriangularCycle, + books: Mapping[str, OrderBook], + ) -> float | None: + pair = self._pair_for_edge(cycle, from_currency, to_currency) + if pair is None: + return None + + book = books.get(pair) + if book is None: + return None + + base, quote = pair.split("/", 1) + base = base.upper() + quote = quote.upper() + + if from_currency == base and to_currency == quote: + best_bid = book.best_bid() + if best_bid is None: + return None + return amount * best_bid.price + + if from_currency == quote and to_currency == base: + best_ask = book.best_ask() + if best_ask is None or best_ask.price <= 0.0: + return None + return amount / best_ask.price + + return None diff --git a/src/arbitrade/detection/graph.py b/src/arbitrade/detection/graph.py index 7e6b460..47a1286 100644 --- a/src/arbitrade/detection/graph.py +++ b/src/arbitrade/detection/graph.py @@ -30,13 +30,10 @@ class CurrencyGraph: def add_pair(self, base: str, quote: str, pair_symbol: str | None = None) -> None: normalized_base = base.upper() normalized_quote = quote.upper() - symbol = pair_symbol or _canonical_pair( - normalized_base, normalized_quote) + symbol = pair_symbol or _canonical_pair(normalized_base, normalized_quote) - self._adjacency.setdefault( - normalized_base, set()).add(normalized_quote) - self._adjacency.setdefault( - normalized_quote, set()).add(normalized_base) + self._adjacency.setdefault(normalized_base, set()).add(normalized_quote) + self._adjacency.setdefault(normalized_quote, set()).add(normalized_base) self._pair_by_direction[(normalized_base, normalized_quote)] = symbol self._pair_by_direction[(normalized_quote, normalized_base)] = symbol @@ -80,8 +77,7 @@ class CurrencyGraph: p_ca = self._pair_by_direction[(c, a)] key = (a, b, c) - found[key] = TriangularCycle( - currencies=key, pairs=(p_ab, p_bc, p_ca)) + found[key] = TriangularCycle(currencies=key, pairs=(p_ab, p_bc, p_ca)) return list(found.values()) diff --git a/src/arbitrade/market_data/feed.py b/src/arbitrade/market_data/feed.py index 0da1265..9c06ff4 100644 --- a/src/arbitrade/market_data/feed.py +++ b/src/arbitrade/market_data/feed.py @@ -5,6 +5,7 @@ from datetime import UTC, datetime import structlog +from arbitrade.detection.engine import IncrementalCycleDetector from arbitrade.exchange.kraken_ws import KrakenWsClient from arbitrade.market_data.order_book import OrderBook from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter, MarketSnapshot @@ -17,10 +18,12 @@ class MarketDataFeed: self, ws_client: KrakenWsClient, snapshot_writer: AsyncMarketSnapshotWriter, + detector: IncrementalCycleDetector | None = None, ) -> None: self._ws_client = ws_client self._snapshot_writer = snapshot_writer self._books: dict[str, OrderBook] = {} + self._detector = detector @property def books(self) -> dict[str, OrderBook]: @@ -58,6 +61,14 @@ class MarketDataFeed: source_latency_ms=source_latency_ms, ) + if self._detector is not None: + scores = self._detector.score_updated_pair(delta.symbol, self._books) + _LOG.debug( + "incremental_cycle_scores", + symbol=delta.symbol, + impacted_scores=len(scores), + ) + await self._snapshot_writer.enqueue( MarketSnapshot( snapshot_at=datetime.now(UTC), diff --git a/tests/unit/test_incremental_detector.py b/tests/unit/test_incremental_detector.py new file mode 100644 index 0000000..48cd999 --- /dev/null +++ b/tests/unit/test_incremental_detector.py @@ -0,0 +1,66 @@ +from arbitrade.detection.engine import IncrementalCycleDetector +from arbitrade.detection.graph import CurrencyGraph, TriangularCycle +from arbitrade.exchange.models import BookLevel +from arbitrade.market_data.order_book import OrderBook + + +def _make_book(*, bid: float, ask: float) -> OrderBook: + book = OrderBook() + book.apply_bids([BookLevel(price=bid, volume=1.0)]) + book.apply_asks([BookLevel(price=ask, volume=1.0)]) + return book + + +def test_incremental_detector_scores_only_cycles_touched_by_pair() -> None: + cycle_a = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + cycle_b = TriangularCycle( + currencies=("USD", "BTC", "LTC"), + pairs=("BTC/USD", "LTC/BTC", "LTC/USD"), + ) + cycle_c = TriangularCycle( + currencies=("USD", "SOL", "ADA"), + pairs=("SOL/USD", "ADA/SOL", "ADA/USD"), + ) + + cycles = [cycle_a, cycle_b, cycle_c] + index = CurrencyGraph.index_cycles_by_pair(cycles) + detector = IncrementalCycleDetector(index) + + books = { + "BTC/USD": _make_book(bid=100.0, ask=100.0), + "ETH/BTC": _make_book(bid=0.05, ask=0.05), + "ETH/USD": _make_book(bid=5.20, ask=5.21), + "LTC/BTC": _make_book(bid=0.01, ask=0.01), + "LTC/USD": _make_book(bid=1.02, ask=1.03), + "SOL/USD": _make_book(bid=20.0, ask=20.1), + "ADA/SOL": _make_book(bid=0.02, ask=0.021), + "ADA/USD": _make_book(bid=0.42, ask=0.43), + } + + scores = detector.score_updated_pair("BTC/USD", books) + + assert len(scores) == 2 + assert {score.cycle for score in scores} == {cycle_a, cycle_b} + + +def test_incremental_detector_uses_best_bid_ask_for_gross_rate() -> None: + cycle = TriangularCycle( + currencies=("USD", "BTC", "ETH"), + pairs=("BTC/USD", "ETH/BTC", "ETH/USD"), + ) + detector = IncrementalCycleDetector(CurrencyGraph.index_cycles_by_pair([cycle])) + + books = { + "BTC/USD": _make_book(bid=99.9, ask=100.0), + "ETH/BTC": _make_book(bid=0.049, ask=0.05), + "ETH/USD": _make_book(bid=5.20, ask=5.21), + } + + scores = detector.score_updated_pair("ETH/BTC", books) + + assert len(scores) == 1 + assert scores[0].gross_rate == 1.04 + assert scores[0].is_profitable