Files
arbitrade/build/lib/arbitrade/detection/engine.py
T
zwitschi 1df4b11aef
CI / lint-test-build (push) Failing after 1m7s
Add HTML templates for dashboard, metrics, overview, and backtesting
- Introduced new HTML templates for the dashboard, metrics, overview, and backtesting functionalities.
- Implemented partial templates for metrics, overview, audit, controls, and charts to enhance modularity.
- Updated the Jinja2 template resolution logic to support different deployment environments.
- Added a health check template to display the service status.
- Included a test suite to verify the template resolution logic.
- Updated `pyproject.toml` to include new HTML templates in the package data.
2026-06-02 14:16:42 +02:00

296 lines
9.4 KiB
Python

from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import UTC, datetime
from arbitrade.detection.graph import TriangularCycle
from arbitrade.exchange.models import BookLevel
from arbitrade.market_data.order_book import OrderBook
def _normalize_pair_symbol(symbol: str) -> str:
if "/" not in symbol:
return symbol.upper()
base, quote = symbol.split("/", 1)
return f"{base.upper()}/{quote.upper()}"
@dataclass(frozen=True, slots=True)
class CycleScore:
cycle: TriangularCycle
gross_rate: float
net_rate: float
min_profit_threshold: float
updated_pair: str
scored_at: datetime
@property
def is_profitable(self) -> bool:
return (self.net_rate - 1.0) >= self.min_profit_threshold
@dataclass(frozen=True, slots=True)
class OpportunityEvent:
detected_at: datetime
cycle: str
updated_pair: str
gross_rate: float
net_rate: float
gross_pct: float
net_pct: float
est_profit: float
allocated_capital: float = 1.0
@classmethod
def from_cycle_score(cls, score: CycleScore, base_capital: float = 1.0) -> OpportunityEvent:
gross_pct = (score.gross_rate - 1.0) * 100.0
net_pct = (score.net_rate - 1.0) * 100.0
est_profit = (score.net_rate - 1.0) * base_capital
a, b, c = score.cycle.currencies
cycle = f"{a}->{b}->{c}->{a}"
return cls(
detected_at=score.scored_at,
cycle=cycle,
updated_pair=score.updated_pair,
gross_rate=score.gross_rate,
net_rate=score.net_rate,
gross_pct=gross_pct,
net_pct=net_pct,
est_profit=est_profit,
allocated_capital=base_capital,
)
class IncrementalCycleDetector:
def __init__(
self,
cycles_by_pair: Mapping[str, list[TriangularCycle]],
*,
fee_rate: float = 0.0,
max_depth_levels: int = 10,
min_profit_threshold: float = 0.0,
min_order_size_by_pair: Mapping[str, float] | None = None,
max_book_age_seconds: float | None = None,
) -> None:
self._cycles_by_pair = {
_normalize_pair_symbol(pair): list(cycles) for pair, cycles in cycles_by_pair.items()
}
self._fee_multiplier = 1.0 - fee_rate
self._max_depth_levels = max_depth_levels
self._min_profit_threshold = min_profit_threshold
self._max_book_age_seconds = max_book_age_seconds
self._min_order_size_by_pair = {
_normalize_pair_symbol(pair): float(min_size)
for pair, min_size in (min_order_size_by_pair or {}).items()
}
if self._fee_multiplier < 0.0:
raise ValueError("fee_rate must be <= 1.0")
if self._max_depth_levels <= 0:
raise ValueError("max_depth_levels must be > 0")
if self._min_profit_threshold < 0.0:
raise ValueError("min_profit_threshold must be >= 0.0")
if self._max_book_age_seconds is not None and self._max_book_age_seconds <= 0.0:
raise ValueError("max_book_age_seconds must be > 0.0")
for min_size in self._min_order_size_by_pair.values():
if min_size <= 0.0:
raise ValueError("minimum order size must be > 0.0")
def score_updated_pair(
self,
updated_pair: str,
books: Mapping[str, OrderBook],
) -> list[CycleScore]:
normalized_pair = _normalize_pair_symbol(updated_pair)
impacted_cycles = self._cycles_by_pair.get(normalized_pair, [])
normalized_books = {_normalize_pair_symbol(symbol): book for symbol, book in books.items()}
scores: list[CycleScore] = []
scored_at = datetime.now(UTC)
for cycle in impacted_cycles:
rates = self._score_cycle(cycle, normalized_books, scored_at)
if rates is None:
continue
gross_rate, net_rate = rates
if (net_rate - 1.0) < self._min_profit_threshold:
continue
scores.append(
CycleScore(
cycle=cycle,
gross_rate=gross_rate,
net_rate=net_rate,
min_profit_threshold=self._min_profit_threshold,
updated_pair=normalized_pair,
scored_at=scored_at,
)
)
return scores
def opportunities_for_updated_pair(
self,
updated_pair: str,
books: Mapping[str, OrderBook],
*,
base_capital: float = 1.0,
) -> list[OpportunityEvent]:
if base_capital <= 0.0:
raise ValueError("base_capital must be > 0.0")
scores = self.score_updated_pair(updated_pair, books)
return [OpportunityEvent.from_cycle_score(score, base_capital) for score in scores]
def _score_cycle(
self,
cycle: TriangularCycle,
books: Mapping[str, OrderBook],
scored_at: datetime,
) -> tuple[float, float] | None:
if not self._is_cycle_fresh(cycle, books, scored_at):
return None
a, b, c = cycle.currencies
gross_amount = 1.0
gross_ab = self._convert(gross_amount, a, b, cycle, books)
if gross_ab is None:
return None
net_ab = gross_ab * self._fee_multiplier
gross_bc = self._convert(gross_ab, b, c, cycle, books)
if gross_bc is None:
return None
net_bc = self._convert(net_ab, b, c, cycle, books)
if net_bc is None:
return None
net_bc *= self._fee_multiplier
gross_ca = self._convert(gross_bc, c, a, cycle, books)
if gross_ca is None:
return None
net_ca = self._convert(net_bc, c, a, cycle, books)
if net_ca is None:
return None
net_ca *= self._fee_multiplier
return gross_ca, net_ca
def _is_cycle_fresh(
self,
cycle: TriangularCycle,
books: Mapping[str, OrderBook],
scored_at: datetime,
) -> bool:
if self._max_book_age_seconds is None:
return True
for pair in cycle.pairs:
normalized_pair = _normalize_pair_symbol(pair)
book = books.get(normalized_pair)
if book is None:
return False
age_seconds = (scored_at - book.updated_at).total_seconds()
if age_seconds > self._max_book_age_seconds:
return False
return True
@staticmethod
def _pair_for_edge(cycle: TriangularCycle, from_currency: str, to_currency: str) -> str | None:
for pair in cycle.pairs:
if "/" not in pair:
continue
base, quote = pair.split("/", 1)
base = base.upper()
quote = quote.upper()
if {base, quote} == {from_currency, to_currency}:
return f"{base}/{quote}"
return None
def _convert(
self,
amount: float,
from_currency: str,
to_currency: str,
cycle: TriangularCycle,
books: Mapping[str, OrderBook],
) -> float | None:
pair = self._pair_for_edge(cycle, from_currency, to_currency)
if pair is None:
return None
book = books.get(pair)
if book is None:
return None
bids, asks = book.top_levels(depth=self._max_depth_levels)
base, quote = pair.split("/", 1)
base = base.upper()
quote = quote.upper()
if from_currency == base and to_currency == quote:
quote_out = self._sell_base_for_quote(amount, bids)
if quote_out is None:
return None
if not self._is_min_order_size_satisfied(pair, amount):
return None
return quote_out
if from_currency == quote and to_currency == base:
base_out = self._buy_base_with_quote(amount, asks)
if base_out is None:
return None
if not self._is_min_order_size_satisfied(pair, base_out):
return None
return base_out
return None
def _is_min_order_size_satisfied(self, pair: str, base_amount: float) -> bool:
min_size = self._min_order_size_by_pair.get(pair)
if min_size is None:
return True
return base_amount >= min_size
@staticmethod
def _sell_base_for_quote(amount_base: float, bids: list[BookLevel]) -> float | None:
remaining = amount_base
quote_out = 0.0
for level in bids:
if remaining <= 0.0:
break
if level.price <= 0.0 or level.volume <= 0.0:
continue
executed = min(remaining, level.volume)
quote_out += executed * level.price
remaining -= executed
if remaining > 0.0:
return None
return quote_out
@staticmethod
def _buy_base_with_quote(amount_quote: float, asks: list[BookLevel]) -> float | None:
remaining_quote = amount_quote
base_out = 0.0
for level in asks:
if remaining_quote <= 0.0:
break
if level.price <= 0.0 or level.volume <= 0.0:
continue
level_quote_capacity = level.volume * level.price
spend = min(remaining_quote, level_quote_capacity)
base_out += spend / level.price
remaining_quote -= spend
if remaining_quote > 0.0:
return None
return base_out