diff --git a/.env.example b/.env.example index cf04bea..910ec2f 100644 --- a/.env.example +++ b/.env.example @@ -7,3 +7,11 @@ DUCKDB_PATH=./data/arbitrade.duckdb FERNET_KEY= KRAKEN_API_KEY= KRAKEN_API_SECRET= +KRAKEN_REST_URL=https://api.kraken.com +KRAKEN_WS_URL=wss://ws.kraken.com/v2 +KRAKEN_PRIVATE_RATE_LIMIT_SECONDS=1.0 +KRAKEN_HTTP_TIMEOUT_SECONDS=10.0 +KRAKEN_RETRY_ATTEMPTS=3 +KRAKEN_RETRY_BASE_DELAY_SECONDS=0.25 +WS_HEARTBEAT_TIMEOUT_SECONDS=20.0 +WS_MAX_STALENESS_SECONDS=5.0 diff --git a/pyproject.toml b/pyproject.toml index b9eb060..eb6ad7b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,9 @@ dev = [ "pre-commit>=3.8.0", "pytest>=8.3.0", "pytest-asyncio>=0.24.0", + "respx>=0.21.1", "ruff>=0.6.0", + "vcrpy>=6.0.0", ] [project.scripts] @@ -63,7 +65,7 @@ pretty = true mypy_path = "src" [[tool.mypy.overrides]] -module = ["duckdb", "keyring", "uvloop"] +module = ["duckdb", "keyring", "sortedcontainers"] ignore_missing_imports = true [tool.pytest.ini_options] diff --git a/src/arbitrade/config/settings.py b/src/arbitrade/config/settings.py index 407be65..7987332 100644 --- a/src/arbitrade/config/settings.py +++ b/src/arbitrade/config/settings.py @@ -19,6 +19,21 @@ class Settings(BaseSettings): duckdb_path: Path = Field(default=Path("./data/arbitrade.duckdb"), alias="DUCKDB_PATH") + kraken_rest_url: str = Field(default="https://api.kraken.com", alias="KRAKEN_REST_URL") + kraken_ws_url: str = Field(default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL") + kraken_private_rate_limit_seconds: float = Field( + default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS" + ) + kraken_http_timeout_seconds: float = Field(default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS") + kraken_retry_attempts: int = Field(default=3, alias="KRAKEN_RETRY_ATTEMPTS") + kraken_retry_base_delay_seconds: float = Field( + default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS" + ) + kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY") + kraken_api_secret: str | None = Field(default=None, alias="KRAKEN_API_SECRET") + ws_heartbeat_timeout_seconds: float = Field(default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS") + ws_max_staleness_seconds: float = Field(default=5.0, alias="WS_MAX_STALENESS_SECONDS") + fernet_key: str | None = Field(default=None, alias="FERNET_KEY") diff --git a/src/arbitrade/detection/__init__.py b/src/arbitrade/detection/__init__.py new file mode 100644 index 0000000..cac25be --- /dev/null +++ b/src/arbitrade/detection/__init__.py @@ -0,0 +1 @@ +"""Arbitrage detection package.""" diff --git a/src/arbitrade/detection/graph.py b/src/arbitrade/detection/graph.py new file mode 100644 index 0000000..7e6b460 --- /dev/null +++ b/src/arbitrade/detection/graph.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True, slots=True) +class TriangularCycle: + currencies: tuple[str, str, str] + pairs: tuple[str, str, str] + + +def _canonical_pair(base: str, quote: str) -> str: + return f"{base}/{quote}" + + +class CurrencyGraph: + def __init__(self) -> None: + self._adjacency: dict[str, set[str]] = {} + self._pair_by_direction: dict[tuple[str, str], str] = {} + + @property + def adjacency(self) -> dict[str, set[str]]: + return self._adjacency + + @property + def pair_by_direction(self) -> dict[tuple[str, str], str]: + return self._pair_by_direction + + def add_pair(self, base: str, quote: str, pair_symbol: str | None = None) -> None: + normalized_base = base.upper() + normalized_quote = quote.upper() + symbol = pair_symbol or _canonical_pair( + normalized_base, normalized_quote) + + self._adjacency.setdefault( + normalized_base, set()).add(normalized_quote) + self._adjacency.setdefault( + normalized_quote, set()).add(normalized_base) + + self._pair_by_direction[(normalized_base, normalized_quote)] = symbol + self._pair_by_direction[(normalized_quote, normalized_base)] = symbol + + @classmethod + def from_kraken_asset_pairs(cls, asset_pairs: dict[str, Any]) -> CurrencyGraph: + graph = cls() + for value in asset_pairs.values(): + if not isinstance(value, dict): + continue + + wsname = value.get("wsname") + if isinstance(wsname, str) and "/" in wsname: + base, quote = wsname.split("/", 1) + graph.add_pair(base, quote, wsname) + continue + + raw_base = value.get("base") + raw_quote = value.get("quote") + if isinstance(raw_base, str) and isinstance(raw_quote, str): + graph.add_pair(raw_base, raw_quote) + + return graph + + def triangular_cycles(self) -> list[TriangularCycle]: + found: dict[tuple[str, str, str], TriangularCycle] = {} + + for a, neighbors_a in self._adjacency.items(): + for b in neighbors_a: + if a >= b: + continue + neighbors_b = self._adjacency.get(b, set()) + for c in neighbors_b: + if b >= c: + continue + if a not in self._adjacency.get(c, set()): + continue + + p_ab = self._pair_by_direction[(a, b)] + p_bc = self._pair_by_direction[(b, c)] + p_ca = self._pair_by_direction[(c, a)] + + key = (a, b, c) + found[key] = TriangularCycle( + currencies=key, pairs=(p_ab, p_bc, p_ca)) + + return list(found.values()) + + @staticmethod + def index_cycles_by_pair(cycles: list[TriangularCycle]) -> dict[str, list[TriangularCycle]]: + index: dict[str, list[TriangularCycle]] = {} + for cycle in cycles: + for pair in cycle.pairs: + index.setdefault(pair, []).append(cycle) + return index diff --git a/src/arbitrade/exchange/__init__.py b/src/arbitrade/exchange/__init__.py new file mode 100644 index 0000000..26b16d5 --- /dev/null +++ b/src/arbitrade/exchange/__init__.py @@ -0,0 +1 @@ +"""Kraken exchange integration package.""" diff --git a/src/arbitrade/exchange/kraken_rest.py b/src/arbitrade/exchange/kraken_rest.py new file mode 100644 index 0000000..1262cc5 --- /dev/null +++ b/src/arbitrade/exchange/kraken_rest.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +import asyncio +import time +from typing import Any +from urllib.parse import urlencode + +import httpx +import structlog + +from arbitrade.config.settings import Settings +from arbitrade.exchange.models import KrakenApiResult, LatencySample +from arbitrade.exchange.signing import sign_kraken_private_path + +_LOG = structlog.get_logger(__name__) + + +def _result_dict(payload: dict[str, Any]) -> dict[str, Any]: + result = payload.get("result", {}) + if isinstance(result, dict): + return result + return {} + + +class KrakenRestClient: + def __init__(self, settings: Settings) -> None: + self._settings = settings + self._client = httpx.AsyncClient( + base_url=settings.kraken_rest_url, + timeout=settings.kraken_http_timeout_seconds, + limits=httpx.Limits(max_keepalive_connections=10, max_connections=50), + headers={"User-Agent": "arbitrade/0.1.0"}, + ) + self._private_lock = asyncio.Lock() + + issues = self.validate_compliance() + if issues: + _LOG.warning("kraken_compliance_issues", issues=issues) + else: + _LOG.info("kraken_compliance_ok") + + def validate_compliance(self) -> list[str]: + issues: list[str] = [] + + if not self._settings.kraken_rest_url.startswith("https://"): + issues.append("KRAKEN_REST_URL should use https://") + + if self._settings.kraken_private_rate_limit_seconds < 1.0: + issues.append("KRAKEN_PRIVATE_RATE_LIMIT_SECONDS below 1.0 may violate Kraken limits") + + if self._settings.kraken_retry_attempts < 1: + issues.append("KRAKEN_RETRY_ATTEMPTS must be >= 1") + + if self._settings.kraken_retry_base_delay_seconds < 0: + issues.append("KRAKEN_RETRY_BASE_DELAY_SECONDS must be >= 0") + + return issues + + async def close(self) -> None: + await self._client.aclose() + + async def warm_connection_pool(self) -> None: + await self.server_time() + + async def _request_with_retry( + self, + endpoint: str, + params: dict[str, Any] | None = None, + ) -> KrakenApiResult: + attempts = self._settings.kraken_retry_attempts + delay = self._settings.kraken_retry_base_delay_seconds + params = params or {} + + for attempt in range(1, attempts + 1): + t0 = time.perf_counter() + try: + response = await self._client.get(endpoint, params=params) + response.raise_for_status() + payload = response.json() + if payload.get("error"): + raise RuntimeError(f"Kraken error: {payload['error']}") + + latency = (time.perf_counter() - t0) * 1000 + _LOG.info( + "kraken_rest_request_ok", + endpoint=endpoint, + attempt=attempt, + latency_ms=latency, + sample=LatencySample.now("rest_request", latency_ms=latency).latency_ms, + ) + return KrakenApiResult(endpoint=endpoint, payload=payload) + except Exception as exc: + latency = (time.perf_counter() - t0) * 1000 + _LOG.warning( + "kraken_rest_request_failed", + endpoint=endpoint, + attempt=attempt, + latency_ms=latency, + error=str(exc), + ) + if attempt >= attempts: + raise + await asyncio.sleep(delay * (2 ** (attempt - 1))) + + raise RuntimeError("unreachable retry loop") + + async def _private_post_with_retry( + self, + endpoint: str, + data: dict[str, str] | None = None, + ) -> KrakenApiResult: + api_key = self._settings.kraken_api_key + api_secret = self._settings.kraken_api_secret + if not api_key or not api_secret: + raise RuntimeError("Missing Kraken API credentials for private endpoint") + + attempts = self._settings.kraken_retry_attempts + delay = self._settings.kraken_retry_base_delay_seconds + + for attempt in range(1, attempts + 1): + t0 = time.perf_counter() + try: + nonce = str(int(time.time() * 1000)) + payload = {"nonce": nonce} + if data is not None: + payload.update(data) + + encoded = urlencode(payload) + signature = sign_kraken_private_path(endpoint, nonce, encoded, api_secret) + + response = await self._client.post( + endpoint, + data=payload, + headers={ + "API-Key": api_key, + "API-Sign": signature, + }, + ) + response.raise_for_status() + body = response.json() + if body.get("error"): + raise RuntimeError(f"Kraken error: {body['error']}") + + latency = (time.perf_counter() - t0) * 1000 + _LOG.info( + "kraken_private_rest_request_ok", + endpoint=endpoint, + attempt=attempt, + latency_ms=latency, + sample=LatencySample.now("private_rest_request", latency_ms=latency).latency_ms, + ) + return KrakenApiResult(endpoint=endpoint, payload=body) + except Exception as exc: + latency = (time.perf_counter() - t0) * 1000 + _LOG.warning( + "kraken_private_rest_request_failed", + endpoint=endpoint, + attempt=attempt, + latency_ms=latency, + error=str(exc), + ) + if attempt >= attempts: + raise + await asyncio.sleep(delay * (2 ** (attempt - 1))) + + raise RuntimeError("unreachable retry loop") + + async def server_time(self) -> dict[str, Any]: + result = await self._request_with_retry("/0/public/Time") + return _result_dict(result.payload) + + async def assets(self) -> dict[str, Any]: + result = await self._request_with_retry("/0/public/Assets") + return _result_dict(result.payload) + + async def asset_pairs(self) -> dict[str, Any]: + result = await self._request_with_retry("/0/public/AssetPairs") + return _result_dict(result.payload) + + async def _throttled_private_call(self, endpoint: str) -> dict[str, Any]: + async with self._private_lock: + result = await self._private_post_with_retry(endpoint) + await asyncio.sleep(self._settings.kraken_private_rate_limit_seconds) + return _result_dict(result.payload) + + async def balances(self) -> dict[str, Any]: + return await self._throttled_private_call("/0/private/Balance") diff --git a/src/arbitrade/exchange/kraken_ws.py b/src/arbitrade/exchange/kraken_ws.py new file mode 100644 index 0000000..2c473de --- /dev/null +++ b/src/arbitrade/exchange/kraken_ws.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import asyncio +import time +from collections.abc import AsyncIterator +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Any + +import orjson +import structlog +import websockets + +from arbitrade.config.settings import Settings +from arbitrade.exchange.models import BookDelta, BookLevel + +_LOG = structlog.get_logger(__name__) + + +@dataclass(slots=True) +class WsMessage: + received_at: datetime + payload: dict[str, Any] + + +class KrakenWsClient: + def __init__(self, settings: Settings) -> None: + self._settings = settings + self._last_message_at: datetime | None = None + self._stop = asyncio.Event() + + @property + def is_stale(self) -> bool: + if self._last_message_at is None: + return True + return ( + datetime.now(UTC) - self._last_message_at + ).total_seconds() > self._settings.ws_max_staleness_seconds + + async def stop(self) -> None: + self._stop.set() + + async def connect_stream(self) -> AsyncIterator[WsMessage]: + delay = 1.0 + while not self._stop.is_set(): + try: + async with websockets.connect( + self._settings.kraken_ws_url, max_size=2_000_000 + ) as ws: + _LOG.info("kraken_ws_connected", url=self._settings.kraken_ws_url) + delay = 1.0 + async for raw in self._recv_loop(ws): + yield raw + except Exception as exc: + _LOG.warning("kraken_ws_disconnected", error=str(exc), reconnect_in=delay) + await asyncio.sleep(delay) + delay = min(delay * 2, 30.0) + + async def _recv_loop(self, ws: Any) -> AsyncIterator[WsMessage]: + while not self._stop.is_set(): + t0 = time.perf_counter() + raw = await asyncio.wait_for( + ws.recv(), timeout=self._settings.ws_heartbeat_timeout_seconds + ) + parse_start = time.perf_counter() + payload = orjson.loads(raw) + self._last_message_at = datetime.now(UTC) + + _LOG.debug( + "kraken_ws_message", + recv_latency_ms=(parse_start - t0) * 1000, + parse_latency_ms=(time.perf_counter() - parse_start) * 1000, + ) + if isinstance(payload, dict): + yield WsMessage(received_at=self._last_message_at, payload=payload) + + @staticmethod + def parse_book_delta(message: dict[str, Any]) -> BookDelta | None: + # Kraken v2 book update shape can vary by channel; keep parser defensive. + channel = str(message.get("channel", "")) + if "book" not in channel: + return None + + symbol = str(message.get("symbol", "")) + data = message.get("data") + if not isinstance(data, list) or not data: + return None + + first = data[0] + if not isinstance(first, dict): + return None + + bids = [ + BookLevel(price=float(level["price"]), volume=float(level["qty"])) + for level in first.get("bids", []) + if isinstance(level, dict) and "price" in level and "qty" in level + ] + asks = [ + BookLevel(price=float(level["price"]), volume=float(level["qty"])) + for level in first.get("asks", []) + if isinstance(level, dict) and "price" in level and "qty" in level + ] + + checksum: int | None = None + raw_checksum = first.get("checksum") + if isinstance(raw_checksum, int): + checksum = raw_checksum + + source_timestamp_ms: int | None = None + if isinstance(first.get("timestamp"), int): + source_timestamp_ms = first["timestamp"] + + return BookDelta( + symbol=symbol, + bids=bids, + asks=asks, + checksum=checksum, + source_timestamp_ms=source_timestamp_ms, + ) diff --git a/src/arbitrade/exchange/models.py b/src/arbitrade/exchange/models.py new file mode 100644 index 0000000..27b414d --- /dev/null +++ b/src/arbitrade/exchange/models.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Any + + +@dataclass(slots=True) +class KrakenApiResult: + endpoint: str + payload: dict[str, Any] + + +@dataclass(slots=True) +class LatencySample: + stage: str + at: datetime + latency_ms: float + + @classmethod + def now(cls, stage: str, latency_ms: float) -> LatencySample: + return cls(stage=stage, at=datetime.now(UTC), latency_ms=latency_ms) + + +@dataclass(slots=True) +class BookLevel: + price: float + volume: float + + +@dataclass(slots=True) +class BookDelta: + symbol: str + bids: list[BookLevel] + asks: list[BookLevel] + checksum: int | None = None + source_timestamp_ms: int | None = None diff --git a/src/arbitrade/exchange/signing.py b/src/arbitrade/exchange/signing.py new file mode 100644 index 0000000..64648d9 --- /dev/null +++ b/src/arbitrade/exchange/signing.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +import base64 +import hashlib +import hmac +from functools import lru_cache + + +@lru_cache(maxsize=2048) +def sign_kraken_private_path(path: str, nonce: str, post_data: str, api_secret: str) -> str: + message = nonce.encode("utf-8") + post_data.encode("utf-8") + sha256 = hashlib.sha256(message).digest() + mac = hmac.new(base64.b64decode(api_secret), path.encode("utf-8") + sha256, hashlib.sha512) + return base64.b64encode(mac.digest()).decode("utf-8") diff --git a/src/arbitrade/market_data/__init__.py b/src/arbitrade/market_data/__init__.py new file mode 100644 index 0000000..1202ad1 --- /dev/null +++ b/src/arbitrade/market_data/__init__.py @@ -0,0 +1 @@ +"""Market data ingestion and book cache package.""" diff --git a/src/arbitrade/market_data/feed.py b/src/arbitrade/market_data/feed.py new file mode 100644 index 0000000..0da1265 --- /dev/null +++ b/src/arbitrade/market_data/feed.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import time +from datetime import UTC, datetime + +import structlog + +from arbitrade.exchange.kraken_ws import KrakenWsClient +from arbitrade.market_data.order_book import OrderBook +from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter, MarketSnapshot + +_LOG = structlog.get_logger(__name__) + + +class MarketDataFeed: + def __init__( + self, + ws_client: KrakenWsClient, + snapshot_writer: AsyncMarketSnapshotWriter, + ) -> None: + self._ws_client = ws_client + self._snapshot_writer = snapshot_writer + self._books: dict[str, OrderBook] = {} + + @property + def books(self) -> dict[str, OrderBook]: + return self._books + + async def run(self) -> None: + async for message in self._ws_client.connect_stream(): + parse_start = time.perf_counter() + delta = self._ws_client.parse_book_delta(message.payload) + if delta is None: + continue + + book = self._books.setdefault(delta.symbol, OrderBook()) + book.apply_bids(delta.bids) + book.apply_asks(delta.asks) + + checksum_ok = True + if delta.checksum is not None: + checksum_ok = book.compute_checksum() == delta.checksum + + apply_latency_ms = (time.perf_counter() - parse_start) * 1000 + source_latency_ms: float | None = None + if delta.source_timestamp_ms is not None: + source_latency_ms = datetime.now(UTC).timestamp() * 1000 - float( + delta.source_timestamp_ms + ) + + _LOG.info( + "book_delta_applied", + symbol=delta.symbol, + bids=len(delta.bids), + asks=len(delta.asks), + checksum_ok=checksum_ok, + apply_latency_ms=apply_latency_ms, + source_latency_ms=source_latency_ms, + ) + + await self._snapshot_writer.enqueue( + MarketSnapshot( + snapshot_at=datetime.now(UTC), + symbol=delta.symbol, + source="kraken_ws", + payload=message.payload, + latency_ms=source_latency_ms, + ) + ) diff --git a/src/arbitrade/market_data/order_book.py b/src/arbitrade/market_data/order_book.py new file mode 100644 index 0000000..a4ba86a --- /dev/null +++ b/src/arbitrade/market_data/order_book.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import re +from collections.abc import Iterable +from dataclasses import dataclass +from datetime import UTC, datetime + +from sortedcontainers import SortedDict + +from arbitrade.exchange.models import BookLevel + +ZERO_CLEAN_RE = re.compile(r"^0+", re.ASCII) + + +def _normalize_price_for_checksum(value: float) -> str: + text = f"{value:.10f}".replace(".", "") + text = text.rstrip("0") + stripped = ZERO_CLEAN_RE.sub("", text) + return stripped or "0" + + +def _normalize_volume_for_checksum(value: float) -> str: + text = f"{value:.10f}".replace(".", "") + text = text.rstrip("0") + stripped = ZERO_CLEAN_RE.sub("", text) + return stripped or "0" + + +@dataclass(slots=True) +class BookView: + best_bid: BookLevel | None + best_ask: BookLevel | None + updated_at: datetime + + +class OrderBook: + def __init__(self) -> None: + self._bids: SortedDict[float, float] = SortedDict() + self._asks: SortedDict[float, float] = SortedDict() + self._updated_at: datetime = datetime.now(UTC) + + @property + def updated_at(self) -> datetime: + return self._updated_at + + def apply_bids(self, updates: Iterable[BookLevel]) -> None: + for level in updates: + if level.volume <= 0: + self._bids.pop(level.price, None) + else: + self._bids[level.price] = level.volume + self._updated_at = datetime.now(UTC) + + def apply_asks(self, updates: Iterable[BookLevel]) -> None: + for level in updates: + if level.volume <= 0: + self._asks.pop(level.price, None) + else: + self._asks[level.price] = level.volume + self._updated_at = datetime.now(UTC) + + def best_bid(self) -> BookLevel | None: + if not self._bids: + return None + price = self._bids.peekitem(-1)[0] + return BookLevel(price=price, volume=self._bids[price]) + + def best_ask(self) -> BookLevel | None: + if not self._asks: + return None + price = self._asks.peekitem(0)[0] + return BookLevel(price=price, volume=self._asks[price]) + + def snapshot(self) -> BookView: + return BookView( + best_bid=self.best_bid(), + best_ask=self.best_ask(), + updated_at=self._updated_at, + ) + + def top_levels(self, depth: int = 10) -> tuple[list[BookLevel], list[BookLevel]]: + bid_keys = list(self._bids.keys()) + ask_keys = list(self._asks.keys()) + + bids = [ + BookLevel(price=price, volume=self._bids[price]) + for price in reversed(bid_keys[-depth:]) + ] + asks = [BookLevel(price=price, volume=self._asks[price]) for price in ask_keys[:depth]] + return bids, asks + + def compute_checksum(self, depth: int = 10) -> int: + bids, asks = self.top_levels(depth) + combined: list[str] = [] + for level in bids: + combined.append(_normalize_price_for_checksum(level.price)) + combined.append(_normalize_volume_for_checksum(level.volume)) + for level in asks: + combined.append(_normalize_price_for_checksum(level.price)) + combined.append(_normalize_volume_for_checksum(level.volume)) + + import zlib + + return zlib.crc32("".join(combined).encode("utf-8")) diff --git a/src/arbitrade/storage/db.py b/src/arbitrade/storage/db.py index 84dc5c3..93d4268 100644 --- a/src/arbitrade/storage/db.py +++ b/src/arbitrade/storage/db.py @@ -38,6 +38,14 @@ CREATE TABLE IF NOT EXISTS portfolio_snapshots ( balances JSON, total_value_usd DOUBLE ); + +CREATE TABLE IF NOT EXISTS market_snapshots ( + snapshot_at TIMESTAMP NOT NULL, + symbol VARCHAR NOT NULL, + source VARCHAR NOT NULL, + payload JSON NOT NULL, + latency_ms DOUBLE +); """ diff --git a/src/arbitrade/storage/market_snapshots.py b/src/arbitrade/storage/market_snapshots.py new file mode 100644 index 0000000..c87c529 --- /dev/null +++ b/src/arbitrade/storage/market_snapshots.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +import structlog + +from arbitrade.storage.repositories import MarketSnapshotRecord, MarketSnapshotRepository + +_LOG = structlog.get_logger(__name__) + + +@dataclass(slots=True) +class MarketSnapshot: + snapshot_at: datetime + symbol: str + source: str + payload: dict[str, Any] + latency_ms: float | None + + +class AsyncMarketSnapshotWriter: + def __init__(self, repository: MarketSnapshotRepository, max_queue_size: int = 50_000) -> None: + self._repository = repository + self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue(maxsize=max_queue_size) + self._task: asyncio.Task[None] | None = None + self._stop = asyncio.Event() + + async def start(self) -> None: + if self._task is None or self._task.done(): + self._stop.clear() + self._task = asyncio.create_task(self._run(), name="market-snapshot-writer") + + async def stop(self) -> None: + self._stop.set() + if self._task is not None: + await self._task + + async def enqueue(self, snapshot: MarketSnapshot) -> None: + await self._queue.put(snapshot) + + async def _run(self) -> None: + while not (self._stop.is_set() and self._queue.empty()): + try: + item = await asyncio.wait_for(self._queue.get(), timeout=0.5) + except TimeoutError: + continue + + try: + self._repository.insert( + MarketSnapshotRecord( + snapshot_at=item.snapshot_at, + symbol=item.symbol, + source=item.source, + payload=item.payload, + latency_ms=item.latency_ms, + ) + ) + except Exception as exc: + _LOG.error("market_snapshot_write_failed", error=str(exc), symbol=item.symbol) + finally: + self._queue.task_done() diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py new file mode 100644 index 0000000..8c96309 --- /dev/null +++ b/src/arbitrade/storage/repositories.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +import orjson + +from arbitrade.storage.db import DuckDBStore + + +@dataclass(slots=True) +class MarketSnapshotRecord: + snapshot_at: datetime + symbol: str + source: str + payload: dict[str, Any] + latency_ms: float | None + + +class MarketSnapshotRepository: + def __init__(self, store: DuckDBStore) -> None: + self._store = store + + def insert(self, record: MarketSnapshotRecord) -> None: + with self._store.connect() as conn: + conn.execute( + """ + INSERT INTO market_snapshots (snapshot_at, symbol, source, payload, latency_ms) + VALUES (?, ?, ?, ?, ?) + """, + [ + record.snapshot_at, + record.symbol, + record.source, + orjson.dumps(record.payload).decode("utf-8"), + record.latency_ms, + ], + ) diff --git a/tests/unit/test_currency_graph.py b/tests/unit/test_currency_graph.py new file mode 100644 index 0000000..3871025 --- /dev/null +++ b/tests/unit/test_currency_graph.py @@ -0,0 +1,48 @@ +from arbitrade.detection.graph import CurrencyGraph + + +def test_currency_graph_from_kraken_pairs_builds_adjacency() -> None: + asset_pairs = { + "XXBTZUSD": {"wsname": "BTC/USD"}, + "XETHXXBT": {"wsname": "ETH/BTC"}, + "XETHZUSD": {"wsname": "ETH/USD"}, + } + + graph = CurrencyGraph.from_kraken_asset_pairs(asset_pairs) + + assert "USD" in graph.adjacency + assert "BTC" in graph.adjacency["USD"] + assert "ETH" in graph.adjacency["USD"] + + +def test_triangular_cycles_detected_once() -> None: + asset_pairs = { + "XXBTZUSD": {"wsname": "BTC/USD"}, + "XETHXXBT": {"wsname": "ETH/BTC"}, + "XETHZUSD": {"wsname": "ETH/USD"}, + } + + graph = CurrencyGraph.from_kraken_asset_pairs(asset_pairs) + cycles = graph.triangular_cycles() + + assert len(cycles) == 1 + cycle = cycles[0] + assert cycle.currencies == ("BTC", "ETH", "USD") + assert set(cycle.pairs) == {"BTC/USD", "ETH/BTC", "ETH/USD"} + + +def test_cycles_indexed_by_pair() -> None: + asset_pairs = { + "XXBTZUSD": {"wsname": "BTC/USD"}, + "XETHXXBT": {"wsname": "ETH/BTC"}, + "XETHZUSD": {"wsname": "ETH/USD"}, + } + + graph = CurrencyGraph.from_kraken_asset_pairs(asset_pairs) + cycles = graph.triangular_cycles() + index = graph.index_cycles_by_pair(cycles) + + assert "BTC/USD" in index + assert "ETH/BTC" in index + assert "ETH/USD" in index + assert len(index["BTC/USD"]) == 1 diff --git a/tests/unit/test_kraken_rest.py b/tests/unit/test_kraken_rest.py new file mode 100644 index 0000000..ccaa78b --- /dev/null +++ b/tests/unit/test_kraken_rest.py @@ -0,0 +1,112 @@ +import httpx +import pytest +import respx + +from arbitrade.config.settings import Settings +from arbitrade.exchange.kraken_rest import KrakenRestClient + + +@pytest.mark.asyncio +async def test_server_time_success() -> None: + settings = Settings(_env_file=None) + client = KrakenRestClient(settings) + + with respx.mock(base_url=settings.kraken_rest_url) as mock_router: + mock_router.get("/0/public/Time").respond( + 200, + json={"error": [], "result": {"unixtime": 1}}, + ) + + payload = await client.server_time() + + await client.close() + assert payload["unixtime"] == 1 + + +@pytest.mark.asyncio +async def test_retry_then_success() -> None: + settings = Settings( + _env_file=None, + kraken_retry_attempts=2, + kraken_retry_base_delay_seconds=0.0, + ) + client = KrakenRestClient(settings) + + with respx.mock(base_url=settings.kraken_rest_url) as mock_router: + route = mock_router.get("/0/public/Time") + route.side_effect = [ + httpx.ConnectError("boom"), + httpx.Response(200, json={"error": [], "result": {"unixtime": 2}}), + ] + + payload = await client.server_time() + + await client.close() + assert payload["unixtime"] == 2 + + +@pytest.mark.asyncio +async def test_balances_private_call_uses_headers() -> None: + settings = Settings( + _env_file=None, + KRAKEN_API_KEY="key", + KRAKEN_API_SECRET="c2VjcmV0", # base64("secret") + kraken_private_rate_limit_seconds=0.0, + ) + client = KrakenRestClient(settings) + + with respx.mock(base_url=settings.kraken_rest_url) as mock_router: + route = mock_router.post("/0/private/Balance").respond( + 200, + json={"error": [], "result": {"ZUSD": "10.0"}}, + ) + payload = await client.balances() + + await client.close() + request = route.calls.last.request + assert request.headers.get("API-Key") == "key" + assert request.headers.get("API-Sign") + assert payload["ZUSD"] == "10.0" + + +@pytest.mark.asyncio +async def test_balances_requires_credentials() -> None: + settings = Settings( + _env_file=None, + KRAKEN_API_KEY=None, + KRAKEN_API_SECRET=None, + kraken_private_rate_limit_seconds=0.0, + ) + client = KrakenRestClient(settings) + + with pytest.raises(RuntimeError, match="Missing Kraken API credentials"): + await client.balances() + + await client.close() + + +def test_compliance_default_ok() -> None: + settings = Settings(_env_file=None) + client = KrakenRestClient(settings) + + issues = client.validate_compliance() + + assert issues == [] + + +def test_compliance_detects_insecure_config() -> None: + settings = Settings( + _env_file=None, + KRAKEN_REST_URL="http://api.kraken.com", + KRAKEN_PRIVATE_RATE_LIMIT_SECONDS=0.0, + KRAKEN_RETRY_ATTEMPTS=0, + KRAKEN_RETRY_BASE_DELAY_SECONDS=-1.0, + ) + client = KrakenRestClient(settings) + + issues = client.validate_compliance() + + assert any("https://" in issue for issue in issues) + assert any("below 1.0" in issue for issue in issues) + assert any("ATTEMPTS" in issue for issue in issues) + assert any("BASE_DELAY" in issue for issue in issues) diff --git a/tests/unit/test_kraken_ws.py b/tests/unit/test_kraken_ws.py new file mode 100644 index 0000000..a6c348a --- /dev/null +++ b/tests/unit/test_kraken_ws.py @@ -0,0 +1,26 @@ +from arbitrade.config.settings import Settings +from arbitrade.exchange.kraken_ws import KrakenWsClient + + +def test_parse_book_delta() -> None: + client = KrakenWsClient(Settings()) + message = { + "channel": "book", + "symbol": "BTC/USD", + "data": [ + { + "bids": [{"price": "100.0", "qty": "1.2"}], + "asks": [{"price": "100.5", "qty": "0.8"}], + "checksum": 123, + "timestamp": 1717232000000, + } + ], + } + + delta = client.parse_book_delta(message) + + assert delta is not None + assert delta.symbol == "BTC/USD" + assert len(delta.bids) == 1 + assert len(delta.asks) == 1 + assert delta.checksum == 123 diff --git a/tests/unit/test_order_book.py b/tests/unit/test_order_book.py new file mode 100644 index 0000000..1eecd19 --- /dev/null +++ b/tests/unit/test_order_book.py @@ -0,0 +1,27 @@ +from arbitrade.exchange.models import BookLevel +from arbitrade.market_data.order_book import OrderBook + + +def test_order_book_apply_and_best_levels() -> None: + book = OrderBook() + book.apply_bids([BookLevel(price=100.0, volume=1.0), BookLevel(price=99.5, volume=2.0)]) + book.apply_asks([BookLevel(price=100.5, volume=1.1), BookLevel(price=101.0, volume=0.9)]) + + best_bid = book.best_bid() + best_ask = book.best_ask() + + assert best_bid is not None + assert best_ask is not None + assert best_bid.price == 100.0 + assert best_ask.price == 100.5 + + +def test_order_book_checksum_matches_self() -> None: + book = OrderBook() + book.apply_bids([BookLevel(price=100.0, volume=1.0)]) + book.apply_asks([BookLevel(price=100.5, volume=1.0)]) + + checksum = book.compute_checksum() + + assert isinstance(checksum, int) + assert checksum == book.compute_checksum()