Add IncrementalCycleDetector and related classes for cycle scoring

- Implement IncrementalCycleDetector for scoring based on updated market data.
- Introduce CycleScore class to encapsulate cycle scoring details.
- Update CurrencyGraph and MarketDataFeed to integrate cycle detection.
- Add unit tests for IncrementalCycleDetector functionality.
This commit is contained in:
2026-06-01 10:36:35 +02:00
parent 7d3071463e
commit 652b20274a
5 changed files with 223 additions and 8 deletions
+10
View File
@@ -1 +1,11 @@
"""Arbitrage detection package."""
from arbitrade.detection.engine import CycleScore, IncrementalCycleDetector
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
__all__ = [
"CurrencyGraph",
"TriangularCycle",
"CycleScore",
"IncrementalCycleDetector",
]
+132
View File
@@ -0,0 +1,132 @@
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import UTC, datetime
from arbitrade.detection.graph import TriangularCycle
from arbitrade.market_data.order_book import OrderBook
def _normalize_pair_symbol(symbol: str) -> str:
if "/" not in symbol:
return symbol.upper()
base, quote = symbol.split("/", 1)
return f"{base.upper()}/{quote.upper()}"
@dataclass(frozen=True, slots=True)
class CycleScore:
cycle: TriangularCycle
gross_rate: float
updated_pair: str
scored_at: datetime
@property
def is_profitable(self) -> bool:
return self.gross_rate > 1.0
class IncrementalCycleDetector:
def __init__(self, cycles_by_pair: Mapping[str, list[TriangularCycle]]) -> None:
self._cycles_by_pair = {
_normalize_pair_symbol(pair): list(cycles) for pair, cycles in cycles_by_pair.items()
}
def score_updated_pair(
self,
updated_pair: str,
books: Mapping[str, OrderBook],
) -> list[CycleScore]:
normalized_pair = _normalize_pair_symbol(updated_pair)
impacted_cycles = self._cycles_by_pair.get(normalized_pair, [])
normalized_books = {_normalize_pair_symbol(
symbol): book for symbol, book in books.items()}
scores: list[CycleScore] = []
scored_at = datetime.now(UTC)
for cycle in impacted_cycles:
gross_rate = self._score_cycle(cycle, normalized_books)
if gross_rate is None:
continue
scores.append(
CycleScore(
cycle=cycle,
gross_rate=gross_rate,
updated_pair=normalized_pair,
scored_at=scored_at,
)
)
return scores
def _score_cycle(
self,
cycle: TriangularCycle,
books: Mapping[str, OrderBook],
) -> float | None:
a, b, c = cycle.currencies
amount = 1.0
amount_ab = self._convert(amount, a, b, cycle, books)
if amount_ab is None:
return None
amount = amount_ab
amount_bc = self._convert(amount, b, c, cycle, books)
if amount_bc is None:
return None
amount = amount_bc
amount_ca = self._convert(amount, c, a, cycle, books)
if amount_ca is None:
return None
return amount_ca
@staticmethod
def _pair_for_edge(cycle: TriangularCycle, from_currency: str, to_currency: str) -> str | None:
for pair in cycle.pairs:
if "/" not in pair:
continue
base, quote = pair.split("/", 1)
base = base.upper()
quote = quote.upper()
if {base, quote} == {from_currency, to_currency}:
return f"{base}/{quote}"
return None
def _convert(
self,
amount: float,
from_currency: str,
to_currency: str,
cycle: TriangularCycle,
books: Mapping[str, OrderBook],
) -> float | None:
pair = self._pair_for_edge(cycle, from_currency, to_currency)
if pair is None:
return None
book = books.get(pair)
if book is None:
return None
base, quote = pair.split("/", 1)
base = base.upper()
quote = quote.upper()
if from_currency == base and to_currency == quote:
best_bid = book.best_bid()
if best_bid is None:
return None
return amount * best_bid.price
if from_currency == quote and to_currency == base:
best_ask = book.best_ask()
if best_ask is None or best_ask.price <= 0.0:
return None
return amount / best_ask.price
return None
+4 -8
View File
@@ -30,13 +30,10 @@ class CurrencyGraph:
def add_pair(self, base: str, quote: str, pair_symbol: str | None = None) -> None:
normalized_base = base.upper()
normalized_quote = quote.upper()
symbol = pair_symbol or _canonical_pair(
normalized_base, normalized_quote)
symbol = pair_symbol or _canonical_pair(normalized_base, normalized_quote)
self._adjacency.setdefault(
normalized_base, set()).add(normalized_quote)
self._adjacency.setdefault(
normalized_quote, set()).add(normalized_base)
self._adjacency.setdefault(normalized_base, set()).add(normalized_quote)
self._adjacency.setdefault(normalized_quote, set()).add(normalized_base)
self._pair_by_direction[(normalized_base, normalized_quote)] = symbol
self._pair_by_direction[(normalized_quote, normalized_base)] = symbol
@@ -80,8 +77,7 @@ class CurrencyGraph:
p_ca = self._pair_by_direction[(c, a)]
key = (a, b, c)
found[key] = TriangularCycle(
currencies=key, pairs=(p_ab, p_bc, p_ca))
found[key] = TriangularCycle(currencies=key, pairs=(p_ab, p_bc, p_ca))
return list(found.values())
+11
View File
@@ -5,6 +5,7 @@ from datetime import UTC, datetime
import structlog
from arbitrade.detection.engine import IncrementalCycleDetector
from arbitrade.exchange.kraken_ws import KrakenWsClient
from arbitrade.market_data.order_book import OrderBook
from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter, MarketSnapshot
@@ -17,10 +18,12 @@ class MarketDataFeed:
self,
ws_client: KrakenWsClient,
snapshot_writer: AsyncMarketSnapshotWriter,
detector: IncrementalCycleDetector | None = None,
) -> None:
self._ws_client = ws_client
self._snapshot_writer = snapshot_writer
self._books: dict[str, OrderBook] = {}
self._detector = detector
@property
def books(self) -> dict[str, OrderBook]:
@@ -58,6 +61,14 @@ class MarketDataFeed:
source_latency_ms=source_latency_ms,
)
if self._detector is not None:
scores = self._detector.score_updated_pair(delta.symbol, self._books)
_LOG.debug(
"incremental_cycle_scores",
symbol=delta.symbol,
impacted_scores=len(scores),
)
await self._snapshot_writer.enqueue(
MarketSnapshot(
snapshot_at=datetime.now(UTC),
+66
View File
@@ -0,0 +1,66 @@
from arbitrade.detection.engine import IncrementalCycleDetector
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
from arbitrade.exchange.models import BookLevel
from arbitrade.market_data.order_book import OrderBook
def _make_book(*, bid: float, ask: float) -> OrderBook:
book = OrderBook()
book.apply_bids([BookLevel(price=bid, volume=1.0)])
book.apply_asks([BookLevel(price=ask, volume=1.0)])
return book
def test_incremental_detector_scores_only_cycles_touched_by_pair() -> None:
cycle_a = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
cycle_b = TriangularCycle(
currencies=("USD", "BTC", "LTC"),
pairs=("BTC/USD", "LTC/BTC", "LTC/USD"),
)
cycle_c = TriangularCycle(
currencies=("USD", "SOL", "ADA"),
pairs=("SOL/USD", "ADA/SOL", "ADA/USD"),
)
cycles = [cycle_a, cycle_b, cycle_c]
index = CurrencyGraph.index_cycles_by_pair(cycles)
detector = IncrementalCycleDetector(index)
books = {
"BTC/USD": _make_book(bid=100.0, ask=100.0),
"ETH/BTC": _make_book(bid=0.05, ask=0.05),
"ETH/USD": _make_book(bid=5.20, ask=5.21),
"LTC/BTC": _make_book(bid=0.01, ask=0.01),
"LTC/USD": _make_book(bid=1.02, ask=1.03),
"SOL/USD": _make_book(bid=20.0, ask=20.1),
"ADA/SOL": _make_book(bid=0.02, ask=0.021),
"ADA/USD": _make_book(bid=0.42, ask=0.43),
}
scores = detector.score_updated_pair("BTC/USD", books)
assert len(scores) == 2
assert {score.cycle for score in scores} == {cycle_a, cycle_b}
def test_incremental_detector_uses_best_bid_ask_for_gross_rate() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(CurrencyGraph.index_cycles_by_pair([cycle]))
books = {
"BTC/USD": _make_book(bid=99.9, ask=100.0),
"ETH/BTC": _make_book(bid=0.049, ask=0.05),
"ETH/USD": _make_book(bid=5.20, ask=5.21),
}
scores = detector.score_updated_pair("ETH/BTC", books)
assert len(scores) == 1
assert scores[0].gross_rate == 1.04
assert scores[0].is_profitable