Add opportunity detection and storage functionality with async processing

- Introduced OpportunityEvent class for structured opportunity data.
- Enhanced IncrementalCycleDetector to generate opportunities based on updated pairs.
- Implemented AsyncOpportunityWriter for persisting opportunities to the database.
- Updated MarketDataFeed to handle opportunity detection and execution in both paper and live trading modes.
- Added unit tests for opportunity detection and persistence.
This commit is contained in:
2026-06-01 10:59:09 +02:00
parent 652b20274a
commit a89886186f
10 changed files with 728 additions and 39 deletions
+1
View File
@@ -15,3 +15,4 @@ KRAKEN_RETRY_ATTEMPTS=3
KRAKEN_RETRY_BASE_DELAY_SECONDS=0.25 KRAKEN_RETRY_BASE_DELAY_SECONDS=0.25
WS_HEARTBEAT_TIMEOUT_SECONDS=20.0 WS_HEARTBEAT_TIMEOUT_SECONDS=20.0
WS_MAX_STALENESS_SECONDS=5.0 WS_MAX_STALENESS_SECONDS=5.0
PAPER_TRADING_MODE=true
+19 -9
View File
@@ -8,7 +8,8 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings): class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", extra="ignore")
app_env: str = Field(default="dev", alias="APP_ENV") app_env: str = Field(default="dev", alias="APP_ENV")
app_host: str = Field(default="0.0.0.0", alias="APP_HOST") app_host: str = Field(default="0.0.0.0", alias="APP_HOST")
@@ -17,22 +18,31 @@ class Settings(BaseSettings):
log_level: str = Field(default="INFO", alias="LOG_LEVEL") log_level: str = Field(default="INFO", alias="LOG_LEVEL")
log_json: bool = Field(default=True, alias="LOG_JSON") log_json: bool = Field(default=True, alias="LOG_JSON")
duckdb_path: Path = Field(default=Path("./data/arbitrade.duckdb"), alias="DUCKDB_PATH") duckdb_path: Path = Field(default=Path(
"./data/arbitrade.duckdb"), alias="DUCKDB_PATH")
kraken_rest_url: str = Field(default="https://api.kraken.com", alias="KRAKEN_REST_URL") kraken_rest_url: str = Field(
kraken_ws_url: str = Field(default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL") default="https://api.kraken.com", alias="KRAKEN_REST_URL")
kraken_ws_url: str = Field(
default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL")
kraken_private_rate_limit_seconds: float = Field( kraken_private_rate_limit_seconds: float = Field(
default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS" default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS"
) )
kraken_http_timeout_seconds: float = Field(default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS") kraken_http_timeout_seconds: float = Field(
kraken_retry_attempts: int = Field(default=3, alias="KRAKEN_RETRY_ATTEMPTS") default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS")
kraken_retry_attempts: int = Field(
default=3, alias="KRAKEN_RETRY_ATTEMPTS")
kraken_retry_base_delay_seconds: float = Field( kraken_retry_base_delay_seconds: float = Field(
default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS" default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS"
) )
kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY") kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY")
kraken_api_secret: str | None = Field(default=None, alias="KRAKEN_API_SECRET") kraken_api_secret: str | None = Field(
ws_heartbeat_timeout_seconds: float = Field(default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS") default=None, alias="KRAKEN_API_SECRET")
ws_max_staleness_seconds: float = Field(default=5.0, alias="WS_MAX_STALENESS_SECONDS") ws_heartbeat_timeout_seconds: float = Field(
default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS")
ws_max_staleness_seconds: float = Field(
default=5.0, alias="WS_MAX_STALENESS_SECONDS")
paper_trading_mode: bool = Field(default=True, alias="PAPER_TRADING_MODE")
fernet_key: str | None = Field(default=None, alias="FERNET_KEY") fernet_key: str | None = Field(default=None, alias="FERNET_KEY")
+2 -1
View File
@@ -1,11 +1,12 @@
"""Arbitrage detection package.""" """Arbitrage detection package."""
from arbitrade.detection.engine import CycleScore, IncrementalCycleDetector from arbitrade.detection.engine import CycleScore, IncrementalCycleDetector, OpportunityEvent
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
__all__ = [ __all__ = [
"CurrencyGraph", "CurrencyGraph",
"TriangularCycle", "TriangularCycle",
"CycleScore", "CycleScore",
"OpportunityEvent",
"IncrementalCycleDetector", "IncrementalCycleDetector",
] ]
+184 -23
View File
@@ -5,6 +5,7 @@ from dataclasses import dataclass
from datetime import UTC, datetime from datetime import UTC, datetime
from arbitrade.detection.graph import TriangularCycle from arbitrade.detection.graph import TriangularCycle
from arbitrade.exchange.models import BookLevel
from arbitrade.market_data.order_book import OrderBook from arbitrade.market_data.order_book import OrderBook
@@ -20,19 +21,80 @@ def _normalize_pair_symbol(symbol: str) -> str:
class CycleScore: class CycleScore:
cycle: TriangularCycle cycle: TriangularCycle
gross_rate: float gross_rate: float
net_rate: float
min_profit_threshold: float
updated_pair: str updated_pair: str
scored_at: datetime scored_at: datetime
@property @property
def is_profitable(self) -> bool: def is_profitable(self) -> bool:
return self.gross_rate > 1.0 return (self.net_rate - 1.0) >= self.min_profit_threshold
@dataclass(frozen=True, slots=True)
class OpportunityEvent:
detected_at: datetime
cycle: str
updated_pair: str
gross_rate: float
net_rate: float
gross_pct: float
net_pct: float
est_profit: float
@classmethod
def from_cycle_score(cls, score: CycleScore, base_capital: float = 1.0) -> OpportunityEvent:
gross_pct = (score.gross_rate - 1.0) * 100.0
net_pct = (score.net_rate - 1.0) * 100.0
est_profit = (score.net_rate - 1.0) * base_capital
a, b, c = score.cycle.currencies
cycle = f"{a}->{b}->{c}->{a}"
return cls(
detected_at=score.scored_at,
cycle=cycle,
updated_pair=score.updated_pair,
gross_rate=score.gross_rate,
net_rate=score.net_rate,
gross_pct=gross_pct,
net_pct=net_pct,
est_profit=est_profit,
)
class IncrementalCycleDetector: class IncrementalCycleDetector:
def __init__(self, cycles_by_pair: Mapping[str, list[TriangularCycle]]) -> None: def __init__(
self,
cycles_by_pair: Mapping[str, list[TriangularCycle]],
*,
fee_rate: float = 0.0,
max_depth_levels: int = 10,
min_profit_threshold: float = 0.0,
min_order_size_by_pair: Mapping[str, float] | None = None,
max_book_age_seconds: float | None = None,
) -> None:
self._cycles_by_pair = { self._cycles_by_pair = {
_normalize_pair_symbol(pair): list(cycles) for pair, cycles in cycles_by_pair.items() _normalize_pair_symbol(pair): list(cycles) for pair, cycles in cycles_by_pair.items()
} }
self._fee_multiplier = 1.0 - fee_rate
self._max_depth_levels = max_depth_levels
self._min_profit_threshold = min_profit_threshold
self._max_book_age_seconds = max_book_age_seconds
self._min_order_size_by_pair = {
_normalize_pair_symbol(pair): float(min_size)
for pair, min_size in (min_order_size_by_pair or {}).items()
}
if self._fee_multiplier < 0.0:
raise ValueError("fee_rate must be <= 1.0")
if self._max_depth_levels <= 0:
raise ValueError("max_depth_levels must be > 0")
if self._min_profit_threshold < 0.0:
raise ValueError("min_profit_threshold must be >= 0.0")
if self._max_book_age_seconds is not None and self._max_book_age_seconds <= 0.0:
raise ValueError("max_book_age_seconds must be > 0.0")
for min_size in self._min_order_size_by_pair.values():
if min_size <= 0.0:
raise ValueError("minimum order size must be > 0.0")
def score_updated_pair( def score_updated_pair(
self, self,
@@ -42,19 +104,23 @@ class IncrementalCycleDetector:
normalized_pair = _normalize_pair_symbol(updated_pair) normalized_pair = _normalize_pair_symbol(updated_pair)
impacted_cycles = self._cycles_by_pair.get(normalized_pair, []) impacted_cycles = self._cycles_by_pair.get(normalized_pair, [])
normalized_books = {_normalize_pair_symbol( normalized_books = {_normalize_pair_symbol(symbol): book for symbol, book in books.items()}
symbol): book for symbol, book in books.items()}
scores: list[CycleScore] = [] scores: list[CycleScore] = []
scored_at = datetime.now(UTC) scored_at = datetime.now(UTC)
for cycle in impacted_cycles: for cycle in impacted_cycles:
gross_rate = self._score_cycle(cycle, normalized_books) rates = self._score_cycle(cycle, normalized_books, scored_at)
if gross_rate is None: if rates is None:
continue
gross_rate, net_rate = rates
if (net_rate - 1.0) < self._min_profit_threshold:
continue continue
scores.append( scores.append(
CycleScore( CycleScore(
cycle=cycle, cycle=cycle,
gross_rate=gross_rate, gross_rate=gross_rate,
net_rate=net_rate,
min_profit_threshold=self._min_profit_threshold,
updated_pair=normalized_pair, updated_pair=normalized_pair,
scored_at=scored_at, scored_at=scored_at,
) )
@@ -62,28 +128,74 @@ class IncrementalCycleDetector:
return scores return scores
def opportunities_for_updated_pair(
self,
updated_pair: str,
books: Mapping[str, OrderBook],
*,
base_capital: float = 1.0,
) -> list[OpportunityEvent]:
if base_capital <= 0.0:
raise ValueError("base_capital must be > 0.0")
scores = self.score_updated_pair(updated_pair, books)
return [OpportunityEvent.from_cycle_score(score, base_capital) for score in scores]
def _score_cycle( def _score_cycle(
self, self,
cycle: TriangularCycle, cycle: TriangularCycle,
books: Mapping[str, OrderBook], books: Mapping[str, OrderBook],
) -> float | None: scored_at: datetime,
) -> tuple[float, float] | None:
if not self._is_cycle_fresh(cycle, books, scored_at):
return None
a, b, c = cycle.currencies a, b, c = cycle.currencies
amount = 1.0 gross_amount = 1.0
amount_ab = self._convert(amount, a, b, cycle, books) gross_ab = self._convert(gross_amount, a, b, cycle, books)
if amount_ab is None: if gross_ab is None:
return None return None
amount = amount_ab net_ab = gross_ab * self._fee_multiplier
amount_bc = self._convert(amount, b, c, cycle, books) gross_bc = self._convert(gross_ab, b, c, cycle, books)
if amount_bc is None: if gross_bc is None:
return None return None
amount = amount_bc net_bc = self._convert(net_ab, b, c, cycle, books)
if net_bc is None:
return None
net_bc *= self._fee_multiplier
amount_ca = self._convert(amount, c, a, cycle, books) gross_ca = self._convert(gross_bc, c, a, cycle, books)
if amount_ca is None: if gross_ca is None:
return None return None
return amount_ca net_ca = self._convert(net_bc, c, a, cycle, books)
if net_ca is None:
return None
net_ca *= self._fee_multiplier
return gross_ca, net_ca
def _is_cycle_fresh(
self,
cycle: TriangularCycle,
books: Mapping[str, OrderBook],
scored_at: datetime,
) -> bool:
if self._max_book_age_seconds is None:
return True
for pair in cycle.pairs:
normalized_pair = _normalize_pair_symbol(pair)
book = books.get(normalized_pair)
if book is None:
return False
age_seconds = (scored_at - book.updated_at).total_seconds()
if age_seconds > self._max_book_age_seconds:
return False
return True
@staticmethod @staticmethod
def _pair_for_edge(cycle: TriangularCycle, from_currency: str, to_currency: str) -> str | None: def _pair_for_edge(cycle: TriangularCycle, from_currency: str, to_currency: str) -> str | None:
@@ -113,20 +225,69 @@ class IncrementalCycleDetector:
if book is None: if book is None:
return None return None
bids, asks = book.top_levels(depth=self._max_depth_levels)
base, quote = pair.split("/", 1) base, quote = pair.split("/", 1)
base = base.upper() base = base.upper()
quote = quote.upper() quote = quote.upper()
if from_currency == base and to_currency == quote: if from_currency == base and to_currency == quote:
best_bid = book.best_bid() quote_out = self._sell_base_for_quote(amount, bids)
if best_bid is None: if quote_out is None:
return None return None
return amount * best_bid.price if not self._is_min_order_size_satisfied(pair, amount):
return None
return quote_out
if from_currency == quote and to_currency == base: if from_currency == quote and to_currency == base:
best_ask = book.best_ask() base_out = self._buy_base_with_quote(amount, asks)
if best_ask is None or best_ask.price <= 0.0: if base_out is None:
return None return None
return amount / best_ask.price if not self._is_min_order_size_satisfied(pair, base_out):
return None
return base_out
return None return None
def _is_min_order_size_satisfied(self, pair: str, base_amount: float) -> bool:
min_size = self._min_order_size_by_pair.get(pair)
if min_size is None:
return True
return base_amount >= min_size
@staticmethod
def _sell_base_for_quote(amount_base: float, bids: list[BookLevel]) -> float | None:
remaining = amount_base
quote_out = 0.0
for level in bids:
if remaining <= 0.0:
break
if level.price <= 0.0 or level.volume <= 0.0:
continue
executed = min(remaining, level.volume)
quote_out += executed * level.price
remaining -= executed
if remaining > 0.0:
return None
return quote_out
@staticmethod
def _buy_base_with_quote(amount_quote: float, asks: list[BookLevel]) -> float | None:
remaining_quote = amount_quote
base_out = 0.0
for level in asks:
if remaining_quote <= 0.0:
break
if level.price <= 0.0 or level.volume <= 0.0:
continue
level_quote_capacity = level.volume * level.price
spend = min(remaining_quote, level_quote_capacity)
base_out += spend / level.price
remaining_quote -= spend
if remaining_quote > 0.0:
return None
return base_out
+51 -6
View File
@@ -1,14 +1,16 @@
from __future__ import annotations from __future__ import annotations
import time import time
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime from datetime import UTC, datetime
import structlog import structlog
from arbitrade.detection.engine import IncrementalCycleDetector from arbitrade.detection.engine import IncrementalCycleDetector, OpportunityEvent
from arbitrade.exchange.kraken_ws import KrakenWsClient from arbitrade.exchange.kraken_ws import KrakenWsClient
from arbitrade.market_data.order_book import OrderBook from arbitrade.market_data.order_book import OrderBook
from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter, MarketSnapshot from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter, MarketSnapshot
from arbitrade.storage.opportunities import AsyncOpportunityWriter
_LOG = structlog.get_logger(__name__) _LOG = structlog.get_logger(__name__)
@@ -19,11 +21,18 @@ class MarketDataFeed:
ws_client: KrakenWsClient, ws_client: KrakenWsClient,
snapshot_writer: AsyncMarketSnapshotWriter, snapshot_writer: AsyncMarketSnapshotWriter,
detector: IncrementalCycleDetector | None = None, detector: IncrementalCycleDetector | None = None,
opportunity_writer: AsyncOpportunityWriter | None = None,
paper_trading_mode: bool = True,
opportunity_executor: Callable[[
OpportunityEvent], Awaitable[None]] | None = None,
) -> None: ) -> None:
self._ws_client = ws_client self._ws_client = ws_client
self._snapshot_writer = snapshot_writer self._snapshot_writer = snapshot_writer
self._books: dict[str, OrderBook] = {} self._books: dict[str, OrderBook] = {}
self._detector = detector self._detector = detector
self._opportunity_writer = opportunity_writer
self._paper_trading_mode = paper_trading_mode
self._opportunity_executor = opportunity_executor
@property @property
def books(self) -> dict[str, OrderBook]: def books(self) -> dict[str, OrderBook]:
@@ -62,12 +71,48 @@ class MarketDataFeed:
) )
if self._detector is not None: if self._detector is not None:
scores = self._detector.score_updated_pair(delta.symbol, self._books) opportunities = self._detector.opportunities_for_updated_pair(
_LOG.debug( delta.symbol,
"incremental_cycle_scores", self._books,
symbol=delta.symbol,
impacted_scores=len(scores),
) )
_LOG.debug(
"incremental_opportunity_scores",
symbol=delta.symbol,
opportunities=len(opportunities),
)
for event in opportunities:
_LOG.info(
"opportunity_detected",
cycle=event.cycle,
updated_pair=event.updated_pair,
gross_pct=event.gross_pct,
net_pct=event.net_pct,
est_profit=event.est_profit,
mode="paper" if self._paper_trading_mode else "live",
)
if self._opportunity_writer is not None:
await self._opportunity_writer.enqueue(event)
if self._paper_trading_mode:
_LOG.info(
"paper_trade_simulated",
cycle=event.cycle,
updated_pair=event.updated_pair,
net_pct=event.net_pct,
)
continue
if self._opportunity_executor is None:
_LOG.warning(
"live_trade_skipped_no_executor",
cycle=event.cycle,
updated_pair=event.updated_pair,
)
continue
await self._opportunity_executor(event)
await self._snapshot_writer.enqueue( await self._snapshot_writer.enqueue(
MarketSnapshot( MarketSnapshot(
+58
View File
@@ -0,0 +1,58 @@
from __future__ import annotations
import asyncio
import structlog
from arbitrade.detection.engine import OpportunityEvent
from arbitrade.storage.repositories import OpportunityRecord, OpportunityRepository
_LOG = structlog.get_logger(__name__)
class AsyncOpportunityWriter:
def __init__(self, repository: OpportunityRepository, max_queue_size: int = 50_000) -> None:
self._repository = repository
self._queue: asyncio.Queue[OpportunityEvent] = asyncio.Queue(maxsize=max_queue_size)
self._task: asyncio.Task[None] | None = None
self._stop = asyncio.Event()
async def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(self._run(), name="opportunity-writer")
async def stop(self) -> None:
self._stop.set()
if self._task is not None:
await self._task
async def enqueue(self, event: OpportunityEvent) -> None:
await self._queue.put(event)
async def _run(self) -> None:
while not (self._stop.is_set() and self._queue.empty()):
try:
event = await asyncio.wait_for(self._queue.get(), timeout=0.5)
except TimeoutError:
continue
try:
self._repository.insert(
OpportunityRecord(
detected_at=event.detected_at,
cycle=event.cycle,
gross_pct=event.gross_pct,
net_pct=event.net_pct,
est_profit=event.est_profit,
)
)
except Exception as exc:
_LOG.error(
"opportunity_write_failed",
error=str(exc),
cycle=event.cycle,
updated_pair=event.updated_pair,
)
finally:
self._queue.task_done()
+39
View File
@@ -18,6 +18,16 @@ class MarketSnapshotRecord:
latency_ms: float | None latency_ms: float | None
@dataclass(slots=True)
class OpportunityRecord:
detected_at: datetime
cycle: str
gross_pct: float
net_pct: float
est_profit: float
executed: bool = False
class MarketSnapshotRepository: class MarketSnapshotRepository:
def __init__(self, store: DuckDBStore) -> None: def __init__(self, store: DuckDBStore) -> None:
self._store = store self._store = store
@@ -37,3 +47,32 @@ class MarketSnapshotRepository:
record.latency_ms, record.latency_ms,
], ],
) )
class OpportunityRepository:
def __init__(self, store: DuckDBStore) -> None:
self._store = store
def insert(self, record: OpportunityRecord) -> None:
with self._store.connect() as conn:
conn.execute(
"""
INSERT INTO opportunities (
detected_at,
cycle,
gross_pct,
net_pct,
est_profit,
executed
)
VALUES (?, ?, ?, ?, ?, ?)
""",
[
record.detected_at,
record.cycle,
record.gross_pct,
record.net_pct,
record.est_profit,
record.executed,
],
)
+214
View File
@@ -1,3 +1,7 @@
from datetime import UTC, datetime, timedelta
import pytest
from arbitrade.detection.engine import IncrementalCycleDetector from arbitrade.detection.engine import IncrementalCycleDetector
from arbitrade.detection.graph import CurrencyGraph, TriangularCycle from arbitrade.detection.graph import CurrencyGraph, TriangularCycle
from arbitrade.exchange.models import BookLevel from arbitrade.exchange.models import BookLevel
@@ -11,6 +15,15 @@ def _make_book(*, bid: float, ask: float) -> OrderBook:
return book return book
def _make_book_levels(
*, bids: list[tuple[float, float]], asks: list[tuple[float, float]]
) -> OrderBook:
book = OrderBook()
book.apply_bids([BookLevel(price=price, volume=volume) for price, volume in bids])
book.apply_asks([BookLevel(price=price, volume=volume) for price, volume in asks])
return book
def test_incremental_detector_scores_only_cycles_touched_by_pair() -> None: def test_incremental_detector_scores_only_cycles_touched_by_pair() -> None:
cycle_a = TriangularCycle( cycle_a = TriangularCycle(
currencies=("USD", "BTC", "ETH"), currencies=("USD", "BTC", "ETH"),
@@ -63,4 +76,205 @@ def test_incremental_detector_uses_best_bid_ask_for_gross_rate() -> None:
assert len(scores) == 1 assert len(scores) == 1
assert scores[0].gross_rate == 1.04 assert scores[0].gross_rate == 1.04
assert scores[0].net_rate == 1.04
assert scores[0].is_profitable assert scores[0].is_profitable
def test_incremental_detector_applies_fees_to_net_rate() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(
CurrencyGraph.index_cycles_by_pair([cycle]),
fee_rate=0.001,
)
books = {
"BTC/USD": _make_book(bid=99.9, ask=100.0),
"ETH/BTC": _make_book(bid=0.049, ask=0.05),
"ETH/USD": _make_book(bid=5.20, ask=5.21),
}
scores = detector.score_updated_pair("ETH/BTC", books)
assert len(scores) == 1
assert scores[0].gross_rate == 1.04
assert scores[0].net_rate < scores[0].gross_rate
def test_incremental_detector_uses_depth_and_slippage() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(
CurrencyGraph.index_cycles_by_pair([cycle]),
max_depth_levels=2,
)
books = {
"BTC/USD": _make_book_levels(
bids=[(99.9, 5.0)],
asks=[(100.0, 0.002), (101.0, 0.020)],
),
"ETH/BTC": _make_book_levels(
bids=[(0.049, 5.0)],
asks=[(0.05, 0.5)],
),
"ETH/USD": _make_book_levels(
bids=[(5.2, 5.0)],
asks=[(5.21, 5.0)],
),
}
scores = detector.score_updated_pair("BTC/USD", books)
assert len(scores) == 1
assert scores[0].gross_rate < 1.04
def test_incremental_detector_returns_no_score_on_insufficient_depth() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(
CurrencyGraph.index_cycles_by_pair([cycle]),
max_depth_levels=1,
)
books = {
"BTC/USD": _make_book_levels(
bids=[(99.9, 5.0)],
asks=[(100.0, 0.001)],
),
"ETH/BTC": _make_book(bid=0.049, ask=0.05),
"ETH/USD": _make_book(bid=5.2, ask=5.21),
}
scores = detector.score_updated_pair("BTC/USD", books)
assert scores == []
def test_incremental_detector_filters_below_profit_threshold() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(
CurrencyGraph.index_cycles_by_pair([cycle]),
min_profit_threshold=0.05,
)
books = {
"BTC/USD": _make_book(bid=99.9, ask=100.0),
"ETH/BTC": _make_book(bid=0.049, ask=0.05),
"ETH/USD": _make_book(bid=5.20, ask=5.21),
}
scores = detector.score_updated_pair("ETH/BTC", books)
assert scores == []
def test_incremental_detector_enforces_min_order_size_by_pair() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(
CurrencyGraph.index_cycles_by_pair([cycle]),
min_order_size_by_pair={"BTC/USD": 0.02},
)
books = {
"BTC/USD": _make_book_levels(
bids=[(99.9, 5.0)],
asks=[(100.0, 0.005), (101.0, 0.005), (102.0, 0.005)],
),
"ETH/BTC": _make_book(bid=0.049, ask=0.05),
"ETH/USD": _make_book(bid=5.2, ask=5.21),
}
scores = detector.score_updated_pair("BTC/USD", books)
assert scores == []
def test_incremental_detector_rejects_stale_books() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(
CurrencyGraph.index_cycles_by_pair([cycle]),
max_book_age_seconds=1.0,
)
books = {
"BTC/USD": _make_book(bid=99.9, ask=100.0),
"ETH/BTC": _make_book(bid=0.049, ask=0.05),
"ETH/USD": _make_book(bid=5.20, ask=5.21),
}
books["ETH/BTC"]._updated_at = datetime.now(UTC) - timedelta(seconds=5)
scores = detector.score_updated_pair("ETH/BTC", books)
assert scores == []
def test_incremental_detector_accepts_fresh_books_with_staleness_enabled() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(
CurrencyGraph.index_cycles_by_pair([cycle]),
max_book_age_seconds=5.0,
)
books = {
"BTC/USD": _make_book(bid=99.9, ask=100.0),
"ETH/BTC": _make_book(bid=0.049, ask=0.05),
"ETH/USD": _make_book(bid=5.20, ask=5.21),
}
now = datetime.now(UTC)
for book in books.values():
book._updated_at = now - timedelta(seconds=0.2)
scores = detector.score_updated_pair("ETH/BTC", books)
assert len(scores) == 1
def test_incremental_detector_emits_structured_opportunity_event() -> None:
cycle = TriangularCycle(
currencies=("USD", "BTC", "ETH"),
pairs=("BTC/USD", "ETH/BTC", "ETH/USD"),
)
detector = IncrementalCycleDetector(
CurrencyGraph.index_cycles_by_pair([cycle]),
min_profit_threshold=0.01,
)
books = {
"BTC/USD": _make_book(bid=99.9, ask=100.0),
"ETH/BTC": _make_book(bid=0.049, ask=0.05),
"ETH/USD": _make_book(bid=5.20, ask=5.21),
}
opportunities = detector.opportunities_for_updated_pair(
"ETH/BTC",
books,
base_capital=500.0,
)
assert len(opportunities) == 1
event = opportunities[0]
assert event.cycle == "USD->BTC->ETH->USD"
assert event.updated_pair == "ETH/BTC"
assert event.gross_pct == pytest.approx(4.0)
assert event.net_pct == pytest.approx(4.0)
assert event.est_profit == pytest.approx(20.0)
+112
View File
@@ -0,0 +1,112 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from types import SimpleNamespace
import pytest
from arbitrade.detection.engine import OpportunityEvent
from arbitrade.exchange.models import BookDelta, BookLevel
from arbitrade.market_data.feed import MarketDataFeed
@dataclass(slots=True)
class _FakeWsClient:
delta: BookDelta
async def connect_stream(self):
yield SimpleNamespace(payload={"channel": "book"})
def parse_book_delta(self, _payload: dict[str, object]) -> BookDelta:
return self.delta
class _FakeSnapshotWriter:
def __init__(self) -> None:
self.items: list[object] = []
async def enqueue(self, snapshot: object) -> None:
self.items.append(snapshot)
class _FakeOpportunityWriter:
def __init__(self) -> None:
self.items: list[OpportunityEvent] = []
async def enqueue(self, event: OpportunityEvent) -> None:
self.items.append(event)
class _FakeDetector:
def __init__(self, event: OpportunityEvent) -> None:
self._event = event
def opportunities_for_updated_pair(self, _updated_pair: str, _books: dict[str, object]):
return [self._event]
class _FakeExecutor:
def __init__(self) -> None:
self.calls: list[OpportunityEvent] = []
async def execute(self, event: OpportunityEvent) -> None:
self.calls.append(event)
def _sample_event() -> OpportunityEvent:
return OpportunityEvent(
detected_at=datetime.now(UTC),
cycle="USD->BTC->ETH->USD",
updated_pair="BTC/USD",
gross_rate=1.04,
net_rate=1.03,
gross_pct=4.0,
net_pct=3.0,
est_profit=0.03,
)
def _sample_delta() -> BookDelta:
return BookDelta(
symbol="BTC/USD",
bids=[BookLevel(price=100.0, volume=1.0)],
asks=[BookLevel(price=100.5, volume=1.0)],
)
@pytest.mark.asyncio
async def test_market_data_feed_dry_run_does_not_execute_orders() -> None:
event = _sample_event()
executor = _FakeExecutor()
feed = MarketDataFeed(
ws_client=_FakeWsClient(_sample_delta()),
snapshot_writer=_FakeSnapshotWriter(),
detector=_FakeDetector(event),
opportunity_writer=_FakeOpportunityWriter(),
paper_trading_mode=True,
opportunity_executor=executor.execute,
)
await feed.run()
assert executor.calls == []
@pytest.mark.asyncio
async def test_market_data_feed_live_mode_executes_orders() -> None:
event = _sample_event()
executor = _FakeExecutor()
feed = MarketDataFeed(
ws_client=_FakeWsClient(_sample_delta()),
snapshot_writer=_FakeSnapshotWriter(),
detector=_FakeDetector(event),
opportunity_writer=_FakeOpportunityWriter(),
paper_trading_mode=False,
opportunity_executor=executor.execute,
)
await feed.run()
assert len(executor.calls) == 1
assert executor.calls[0].cycle == "USD->BTC->ETH->USD"
+48
View File
@@ -0,0 +1,48 @@
from __future__ import annotations
from datetime import UTC, datetime
import pytest
from arbitrade.config.settings import Settings
from arbitrade.detection.engine import OpportunityEvent
from arbitrade.storage.db import DuckDBStore
from arbitrade.storage.opportunities import AsyncOpportunityWriter
from arbitrade.storage.repositories import OpportunityRepository
@pytest.mark.asyncio
async def test_async_opportunity_writer_persists_events(tmp_path) -> None:
settings = Settings(_env_file=None, DUCKDB_PATH=tmp_path / "test.duckdb")
store = DuckDBStore(settings)
store.migrate()
repository = OpportunityRepository(store)
writer = AsyncOpportunityWriter(repository, max_queue_size=10)
await writer.start()
event = OpportunityEvent(
detected_at=datetime.now(UTC),
cycle="USD->BTC->ETH->USD",
updated_pair="BTC/USD",
gross_rate=1.04,
net_rate=1.03,
gross_pct=4.0,
net_pct=3.0,
est_profit=0.03,
)
await writer.enqueue(event)
await writer.stop()
with store.connect() as conn:
rows = conn.execute(
"SELECT cycle, gross_pct, net_pct, est_profit, executed FROM opportunities"
).fetchall()
assert len(rows) == 1
assert rows[0][0] == "USD->BTC->ETH->USD"
assert rows[0][1] == 4.0
assert rows[0][2] == 3.0
assert rows[0][3] == 0.03
assert rows[0][4] is False