Implement Kraken integration with REST and WebSocket clients, add market data handling, and enhance settings configuration
This commit is contained in:
@@ -7,3 +7,11 @@ DUCKDB_PATH=./data/arbitrade.duckdb
|
||||
FERNET_KEY=
|
||||
KRAKEN_API_KEY=
|
||||
KRAKEN_API_SECRET=
|
||||
KRAKEN_REST_URL=https://api.kraken.com
|
||||
KRAKEN_WS_URL=wss://ws.kraken.com/v2
|
||||
KRAKEN_PRIVATE_RATE_LIMIT_SECONDS=1.0
|
||||
KRAKEN_HTTP_TIMEOUT_SECONDS=10.0
|
||||
KRAKEN_RETRY_ATTEMPTS=3
|
||||
KRAKEN_RETRY_BASE_DELAY_SECONDS=0.25
|
||||
WS_HEARTBEAT_TIMEOUT_SECONDS=20.0
|
||||
WS_MAX_STALENESS_SECONDS=5.0
|
||||
|
||||
+3
-1
@@ -33,7 +33,9 @@ dev = [
|
||||
"pre-commit>=3.8.0",
|
||||
"pytest>=8.3.0",
|
||||
"pytest-asyncio>=0.24.0",
|
||||
"respx>=0.21.1",
|
||||
"ruff>=0.6.0",
|
||||
"vcrpy>=6.0.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
@@ -63,7 +65,7 @@ pretty = true
|
||||
mypy_path = "src"
|
||||
|
||||
[[tool.mypy.overrides]]
|
||||
module = ["duckdb", "keyring", "uvloop"]
|
||||
module = ["duckdb", "keyring", "sortedcontainers"]
|
||||
ignore_missing_imports = true
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
|
||||
@@ -19,6 +19,21 @@ class Settings(BaseSettings):
|
||||
|
||||
duckdb_path: Path = Field(default=Path("./data/arbitrade.duckdb"), alias="DUCKDB_PATH")
|
||||
|
||||
kraken_rest_url: str = Field(default="https://api.kraken.com", alias="KRAKEN_REST_URL")
|
||||
kraken_ws_url: str = Field(default="wss://ws.kraken.com/v2", alias="KRAKEN_WS_URL")
|
||||
kraken_private_rate_limit_seconds: float = Field(
|
||||
default=1.0, alias="KRAKEN_PRIVATE_RATE_LIMIT_SECONDS"
|
||||
)
|
||||
kraken_http_timeout_seconds: float = Field(default=10.0, alias="KRAKEN_HTTP_TIMEOUT_SECONDS")
|
||||
kraken_retry_attempts: int = Field(default=3, alias="KRAKEN_RETRY_ATTEMPTS")
|
||||
kraken_retry_base_delay_seconds: float = Field(
|
||||
default=0.25, alias="KRAKEN_RETRY_BASE_DELAY_SECONDS"
|
||||
)
|
||||
kraken_api_key: str | None = Field(default=None, alias="KRAKEN_API_KEY")
|
||||
kraken_api_secret: str | None = Field(default=None, alias="KRAKEN_API_SECRET")
|
||||
ws_heartbeat_timeout_seconds: float = Field(default=20.0, alias="WS_HEARTBEAT_TIMEOUT_SECONDS")
|
||||
ws_max_staleness_seconds: float = Field(default=5.0, alias="WS_MAX_STALENESS_SECONDS")
|
||||
|
||||
fernet_key: str | None = Field(default=None, alias="FERNET_KEY")
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
"""Arbitrage detection package."""
|
||||
@@ -0,0 +1,94 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class TriangularCycle:
|
||||
currencies: tuple[str, str, str]
|
||||
pairs: tuple[str, str, str]
|
||||
|
||||
|
||||
def _canonical_pair(base: str, quote: str) -> str:
|
||||
return f"{base}/{quote}"
|
||||
|
||||
|
||||
class CurrencyGraph:
|
||||
def __init__(self) -> None:
|
||||
self._adjacency: dict[str, set[str]] = {}
|
||||
self._pair_by_direction: dict[tuple[str, str], str] = {}
|
||||
|
||||
@property
|
||||
def adjacency(self) -> dict[str, set[str]]:
|
||||
return self._adjacency
|
||||
|
||||
@property
|
||||
def pair_by_direction(self) -> dict[tuple[str, str], str]:
|
||||
return self._pair_by_direction
|
||||
|
||||
def add_pair(self, base: str, quote: str, pair_symbol: str | None = None) -> None:
|
||||
normalized_base = base.upper()
|
||||
normalized_quote = quote.upper()
|
||||
symbol = pair_symbol or _canonical_pair(
|
||||
normalized_base, normalized_quote)
|
||||
|
||||
self._adjacency.setdefault(
|
||||
normalized_base, set()).add(normalized_quote)
|
||||
self._adjacency.setdefault(
|
||||
normalized_quote, set()).add(normalized_base)
|
||||
|
||||
self._pair_by_direction[(normalized_base, normalized_quote)] = symbol
|
||||
self._pair_by_direction[(normalized_quote, normalized_base)] = symbol
|
||||
|
||||
@classmethod
|
||||
def from_kraken_asset_pairs(cls, asset_pairs: dict[str, Any]) -> CurrencyGraph:
|
||||
graph = cls()
|
||||
for value in asset_pairs.values():
|
||||
if not isinstance(value, dict):
|
||||
continue
|
||||
|
||||
wsname = value.get("wsname")
|
||||
if isinstance(wsname, str) and "/" in wsname:
|
||||
base, quote = wsname.split("/", 1)
|
||||
graph.add_pair(base, quote, wsname)
|
||||
continue
|
||||
|
||||
raw_base = value.get("base")
|
||||
raw_quote = value.get("quote")
|
||||
if isinstance(raw_base, str) and isinstance(raw_quote, str):
|
||||
graph.add_pair(raw_base, raw_quote)
|
||||
|
||||
return graph
|
||||
|
||||
def triangular_cycles(self) -> list[TriangularCycle]:
|
||||
found: dict[tuple[str, str, str], TriangularCycle] = {}
|
||||
|
||||
for a, neighbors_a in self._adjacency.items():
|
||||
for b in neighbors_a:
|
||||
if a >= b:
|
||||
continue
|
||||
neighbors_b = self._adjacency.get(b, set())
|
||||
for c in neighbors_b:
|
||||
if b >= c:
|
||||
continue
|
||||
if a not in self._adjacency.get(c, set()):
|
||||
continue
|
||||
|
||||
p_ab = self._pair_by_direction[(a, b)]
|
||||
p_bc = self._pair_by_direction[(b, c)]
|
||||
p_ca = self._pair_by_direction[(c, a)]
|
||||
|
||||
key = (a, b, c)
|
||||
found[key] = TriangularCycle(
|
||||
currencies=key, pairs=(p_ab, p_bc, p_ca))
|
||||
|
||||
return list(found.values())
|
||||
|
||||
@staticmethod
|
||||
def index_cycles_by_pair(cycles: list[TriangularCycle]) -> dict[str, list[TriangularCycle]]:
|
||||
index: dict[str, list[TriangularCycle]] = {}
|
||||
for cycle in cycles:
|
||||
for pair in cycle.pairs:
|
||||
index.setdefault(pair, []).append(cycle)
|
||||
return index
|
||||
@@ -0,0 +1 @@
|
||||
"""Kraken exchange integration package."""
|
||||
@@ -0,0 +1,187 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Any
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from arbitrade.config.settings import Settings
|
||||
from arbitrade.exchange.models import KrakenApiResult, LatencySample
|
||||
from arbitrade.exchange.signing import sign_kraken_private_path
|
||||
|
||||
_LOG = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
def _result_dict(payload: dict[str, Any]) -> dict[str, Any]:
|
||||
result = payload.get("result", {})
|
||||
if isinstance(result, dict):
|
||||
return result
|
||||
return {}
|
||||
|
||||
|
||||
class KrakenRestClient:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self._settings = settings
|
||||
self._client = httpx.AsyncClient(
|
||||
base_url=settings.kraken_rest_url,
|
||||
timeout=settings.kraken_http_timeout_seconds,
|
||||
limits=httpx.Limits(max_keepalive_connections=10, max_connections=50),
|
||||
headers={"User-Agent": "arbitrade/0.1.0"},
|
||||
)
|
||||
self._private_lock = asyncio.Lock()
|
||||
|
||||
issues = self.validate_compliance()
|
||||
if issues:
|
||||
_LOG.warning("kraken_compliance_issues", issues=issues)
|
||||
else:
|
||||
_LOG.info("kraken_compliance_ok")
|
||||
|
||||
def validate_compliance(self) -> list[str]:
|
||||
issues: list[str] = []
|
||||
|
||||
if not self._settings.kraken_rest_url.startswith("https://"):
|
||||
issues.append("KRAKEN_REST_URL should use https://")
|
||||
|
||||
if self._settings.kraken_private_rate_limit_seconds < 1.0:
|
||||
issues.append("KRAKEN_PRIVATE_RATE_LIMIT_SECONDS below 1.0 may violate Kraken limits")
|
||||
|
||||
if self._settings.kraken_retry_attempts < 1:
|
||||
issues.append("KRAKEN_RETRY_ATTEMPTS must be >= 1")
|
||||
|
||||
if self._settings.kraken_retry_base_delay_seconds < 0:
|
||||
issues.append("KRAKEN_RETRY_BASE_DELAY_SECONDS must be >= 0")
|
||||
|
||||
return issues
|
||||
|
||||
async def close(self) -> None:
|
||||
await self._client.aclose()
|
||||
|
||||
async def warm_connection_pool(self) -> None:
|
||||
await self.server_time()
|
||||
|
||||
async def _request_with_retry(
|
||||
self,
|
||||
endpoint: str,
|
||||
params: dict[str, Any] | None = None,
|
||||
) -> KrakenApiResult:
|
||||
attempts = self._settings.kraken_retry_attempts
|
||||
delay = self._settings.kraken_retry_base_delay_seconds
|
||||
params = params or {}
|
||||
|
||||
for attempt in range(1, attempts + 1):
|
||||
t0 = time.perf_counter()
|
||||
try:
|
||||
response = await self._client.get(endpoint, params=params)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
if payload.get("error"):
|
||||
raise RuntimeError(f"Kraken error: {payload['error']}")
|
||||
|
||||
latency = (time.perf_counter() - t0) * 1000
|
||||
_LOG.info(
|
||||
"kraken_rest_request_ok",
|
||||
endpoint=endpoint,
|
||||
attempt=attempt,
|
||||
latency_ms=latency,
|
||||
sample=LatencySample.now("rest_request", latency_ms=latency).latency_ms,
|
||||
)
|
||||
return KrakenApiResult(endpoint=endpoint, payload=payload)
|
||||
except Exception as exc:
|
||||
latency = (time.perf_counter() - t0) * 1000
|
||||
_LOG.warning(
|
||||
"kraken_rest_request_failed",
|
||||
endpoint=endpoint,
|
||||
attempt=attempt,
|
||||
latency_ms=latency,
|
||||
error=str(exc),
|
||||
)
|
||||
if attempt >= attempts:
|
||||
raise
|
||||
await asyncio.sleep(delay * (2 ** (attempt - 1)))
|
||||
|
||||
raise RuntimeError("unreachable retry loop")
|
||||
|
||||
async def _private_post_with_retry(
|
||||
self,
|
||||
endpoint: str,
|
||||
data: dict[str, str] | None = None,
|
||||
) -> KrakenApiResult:
|
||||
api_key = self._settings.kraken_api_key
|
||||
api_secret = self._settings.kraken_api_secret
|
||||
if not api_key or not api_secret:
|
||||
raise RuntimeError("Missing Kraken API credentials for private endpoint")
|
||||
|
||||
attempts = self._settings.kraken_retry_attempts
|
||||
delay = self._settings.kraken_retry_base_delay_seconds
|
||||
|
||||
for attempt in range(1, attempts + 1):
|
||||
t0 = time.perf_counter()
|
||||
try:
|
||||
nonce = str(int(time.time() * 1000))
|
||||
payload = {"nonce": nonce}
|
||||
if data is not None:
|
||||
payload.update(data)
|
||||
|
||||
encoded = urlencode(payload)
|
||||
signature = sign_kraken_private_path(endpoint, nonce, encoded, api_secret)
|
||||
|
||||
response = await self._client.post(
|
||||
endpoint,
|
||||
data=payload,
|
||||
headers={
|
||||
"API-Key": api_key,
|
||||
"API-Sign": signature,
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
body = response.json()
|
||||
if body.get("error"):
|
||||
raise RuntimeError(f"Kraken error: {body['error']}")
|
||||
|
||||
latency = (time.perf_counter() - t0) * 1000
|
||||
_LOG.info(
|
||||
"kraken_private_rest_request_ok",
|
||||
endpoint=endpoint,
|
||||
attempt=attempt,
|
||||
latency_ms=latency,
|
||||
sample=LatencySample.now("private_rest_request", latency_ms=latency).latency_ms,
|
||||
)
|
||||
return KrakenApiResult(endpoint=endpoint, payload=body)
|
||||
except Exception as exc:
|
||||
latency = (time.perf_counter() - t0) * 1000
|
||||
_LOG.warning(
|
||||
"kraken_private_rest_request_failed",
|
||||
endpoint=endpoint,
|
||||
attempt=attempt,
|
||||
latency_ms=latency,
|
||||
error=str(exc),
|
||||
)
|
||||
if attempt >= attempts:
|
||||
raise
|
||||
await asyncio.sleep(delay * (2 ** (attempt - 1)))
|
||||
|
||||
raise RuntimeError("unreachable retry loop")
|
||||
|
||||
async def server_time(self) -> dict[str, Any]:
|
||||
result = await self._request_with_retry("/0/public/Time")
|
||||
return _result_dict(result.payload)
|
||||
|
||||
async def assets(self) -> dict[str, Any]:
|
||||
result = await self._request_with_retry("/0/public/Assets")
|
||||
return _result_dict(result.payload)
|
||||
|
||||
async def asset_pairs(self) -> dict[str, Any]:
|
||||
result = await self._request_with_retry("/0/public/AssetPairs")
|
||||
return _result_dict(result.payload)
|
||||
|
||||
async def _throttled_private_call(self, endpoint: str) -> dict[str, Any]:
|
||||
async with self._private_lock:
|
||||
result = await self._private_post_with_retry(endpoint)
|
||||
await asyncio.sleep(self._settings.kraken_private_rate_limit_seconds)
|
||||
return _result_dict(result.payload)
|
||||
|
||||
async def balances(self) -> dict[str, Any]:
|
||||
return await self._throttled_private_call("/0/private/Balance")
|
||||
@@ -0,0 +1,119 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from collections.abc import AsyncIterator
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
import orjson
|
||||
import structlog
|
||||
import websockets
|
||||
|
||||
from arbitrade.config.settings import Settings
|
||||
from arbitrade.exchange.models import BookDelta, BookLevel
|
||||
|
||||
_LOG = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class WsMessage:
|
||||
received_at: datetime
|
||||
payload: dict[str, Any]
|
||||
|
||||
|
||||
class KrakenWsClient:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self._settings = settings
|
||||
self._last_message_at: datetime | None = None
|
||||
self._stop = asyncio.Event()
|
||||
|
||||
@property
|
||||
def is_stale(self) -> bool:
|
||||
if self._last_message_at is None:
|
||||
return True
|
||||
return (
|
||||
datetime.now(UTC) - self._last_message_at
|
||||
).total_seconds() > self._settings.ws_max_staleness_seconds
|
||||
|
||||
async def stop(self) -> None:
|
||||
self._stop.set()
|
||||
|
||||
async def connect_stream(self) -> AsyncIterator[WsMessage]:
|
||||
delay = 1.0
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
async with websockets.connect(
|
||||
self._settings.kraken_ws_url, max_size=2_000_000
|
||||
) as ws:
|
||||
_LOG.info("kraken_ws_connected", url=self._settings.kraken_ws_url)
|
||||
delay = 1.0
|
||||
async for raw in self._recv_loop(ws):
|
||||
yield raw
|
||||
except Exception as exc:
|
||||
_LOG.warning("kraken_ws_disconnected", error=str(exc), reconnect_in=delay)
|
||||
await asyncio.sleep(delay)
|
||||
delay = min(delay * 2, 30.0)
|
||||
|
||||
async def _recv_loop(self, ws: Any) -> AsyncIterator[WsMessage]:
|
||||
while not self._stop.is_set():
|
||||
t0 = time.perf_counter()
|
||||
raw = await asyncio.wait_for(
|
||||
ws.recv(), timeout=self._settings.ws_heartbeat_timeout_seconds
|
||||
)
|
||||
parse_start = time.perf_counter()
|
||||
payload = orjson.loads(raw)
|
||||
self._last_message_at = datetime.now(UTC)
|
||||
|
||||
_LOG.debug(
|
||||
"kraken_ws_message",
|
||||
recv_latency_ms=(parse_start - t0) * 1000,
|
||||
parse_latency_ms=(time.perf_counter() - parse_start) * 1000,
|
||||
)
|
||||
if isinstance(payload, dict):
|
||||
yield WsMessage(received_at=self._last_message_at, payload=payload)
|
||||
|
||||
@staticmethod
|
||||
def parse_book_delta(message: dict[str, Any]) -> BookDelta | None:
|
||||
# Kraken v2 book update shape can vary by channel; keep parser defensive.
|
||||
channel = str(message.get("channel", ""))
|
||||
if "book" not in channel:
|
||||
return None
|
||||
|
||||
symbol = str(message.get("symbol", ""))
|
||||
data = message.get("data")
|
||||
if not isinstance(data, list) or not data:
|
||||
return None
|
||||
|
||||
first = data[0]
|
||||
if not isinstance(first, dict):
|
||||
return None
|
||||
|
||||
bids = [
|
||||
BookLevel(price=float(level["price"]), volume=float(level["qty"]))
|
||||
for level in first.get("bids", [])
|
||||
if isinstance(level, dict) and "price" in level and "qty" in level
|
||||
]
|
||||
asks = [
|
||||
BookLevel(price=float(level["price"]), volume=float(level["qty"]))
|
||||
for level in first.get("asks", [])
|
||||
if isinstance(level, dict) and "price" in level and "qty" in level
|
||||
]
|
||||
|
||||
checksum: int | None = None
|
||||
raw_checksum = first.get("checksum")
|
||||
if isinstance(raw_checksum, int):
|
||||
checksum = raw_checksum
|
||||
|
||||
source_timestamp_ms: int | None = None
|
||||
if isinstance(first.get("timestamp"), int):
|
||||
source_timestamp_ms = first["timestamp"]
|
||||
|
||||
return BookDelta(
|
||||
symbol=symbol,
|
||||
bids=bids,
|
||||
asks=asks,
|
||||
checksum=checksum,
|
||||
source_timestamp_ms=source_timestamp_ms,
|
||||
)
|
||||
@@ -0,0 +1,37 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class KrakenApiResult:
|
||||
endpoint: str
|
||||
payload: dict[str, Any]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class LatencySample:
|
||||
stage: str
|
||||
at: datetime
|
||||
latency_ms: float
|
||||
|
||||
@classmethod
|
||||
def now(cls, stage: str, latency_ms: float) -> LatencySample:
|
||||
return cls(stage=stage, at=datetime.now(UTC), latency_ms=latency_ms)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class BookLevel:
|
||||
price: float
|
||||
volume: float
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class BookDelta:
|
||||
symbol: str
|
||||
bids: list[BookLevel]
|
||||
asks: list[BookLevel]
|
||||
checksum: int | None = None
|
||||
source_timestamp_ms: int | None = None
|
||||
@@ -0,0 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
from functools import lru_cache
|
||||
|
||||
|
||||
@lru_cache(maxsize=2048)
|
||||
def sign_kraken_private_path(path: str, nonce: str, post_data: str, api_secret: str) -> str:
|
||||
message = nonce.encode("utf-8") + post_data.encode("utf-8")
|
||||
sha256 = hashlib.sha256(message).digest()
|
||||
mac = hmac.new(base64.b64decode(api_secret), path.encode("utf-8") + sha256, hashlib.sha512)
|
||||
return base64.b64encode(mac.digest()).decode("utf-8")
|
||||
@@ -0,0 +1 @@
|
||||
"""Market data ingestion and book cache package."""
|
||||
@@ -0,0 +1,69 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import structlog
|
||||
|
||||
from arbitrade.exchange.kraken_ws import KrakenWsClient
|
||||
from arbitrade.market_data.order_book import OrderBook
|
||||
from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter, MarketSnapshot
|
||||
|
||||
_LOG = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
class MarketDataFeed:
|
||||
def __init__(
|
||||
self,
|
||||
ws_client: KrakenWsClient,
|
||||
snapshot_writer: AsyncMarketSnapshotWriter,
|
||||
) -> None:
|
||||
self._ws_client = ws_client
|
||||
self._snapshot_writer = snapshot_writer
|
||||
self._books: dict[str, OrderBook] = {}
|
||||
|
||||
@property
|
||||
def books(self) -> dict[str, OrderBook]:
|
||||
return self._books
|
||||
|
||||
async def run(self) -> None:
|
||||
async for message in self._ws_client.connect_stream():
|
||||
parse_start = time.perf_counter()
|
||||
delta = self._ws_client.parse_book_delta(message.payload)
|
||||
if delta is None:
|
||||
continue
|
||||
|
||||
book = self._books.setdefault(delta.symbol, OrderBook())
|
||||
book.apply_bids(delta.bids)
|
||||
book.apply_asks(delta.asks)
|
||||
|
||||
checksum_ok = True
|
||||
if delta.checksum is not None:
|
||||
checksum_ok = book.compute_checksum() == delta.checksum
|
||||
|
||||
apply_latency_ms = (time.perf_counter() - parse_start) * 1000
|
||||
source_latency_ms: float | None = None
|
||||
if delta.source_timestamp_ms is not None:
|
||||
source_latency_ms = datetime.now(UTC).timestamp() * 1000 - float(
|
||||
delta.source_timestamp_ms
|
||||
)
|
||||
|
||||
_LOG.info(
|
||||
"book_delta_applied",
|
||||
symbol=delta.symbol,
|
||||
bids=len(delta.bids),
|
||||
asks=len(delta.asks),
|
||||
checksum_ok=checksum_ok,
|
||||
apply_latency_ms=apply_latency_ms,
|
||||
source_latency_ms=source_latency_ms,
|
||||
)
|
||||
|
||||
await self._snapshot_writer.enqueue(
|
||||
MarketSnapshot(
|
||||
snapshot_at=datetime.now(UTC),
|
||||
symbol=delta.symbol,
|
||||
source="kraken_ws",
|
||||
payload=message.payload,
|
||||
latency_ms=source_latency_ms,
|
||||
)
|
||||
)
|
||||
@@ -0,0 +1,104 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from collections.abc import Iterable
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from sortedcontainers import SortedDict
|
||||
|
||||
from arbitrade.exchange.models import BookLevel
|
||||
|
||||
ZERO_CLEAN_RE = re.compile(r"^0+", re.ASCII)
|
||||
|
||||
|
||||
def _normalize_price_for_checksum(value: float) -> str:
|
||||
text = f"{value:.10f}".replace(".", "")
|
||||
text = text.rstrip("0")
|
||||
stripped = ZERO_CLEAN_RE.sub("", text)
|
||||
return stripped or "0"
|
||||
|
||||
|
||||
def _normalize_volume_for_checksum(value: float) -> str:
|
||||
text = f"{value:.10f}".replace(".", "")
|
||||
text = text.rstrip("0")
|
||||
stripped = ZERO_CLEAN_RE.sub("", text)
|
||||
return stripped or "0"
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class BookView:
|
||||
best_bid: BookLevel | None
|
||||
best_ask: BookLevel | None
|
||||
updated_at: datetime
|
||||
|
||||
|
||||
class OrderBook:
|
||||
def __init__(self) -> None:
|
||||
self._bids: SortedDict[float, float] = SortedDict()
|
||||
self._asks: SortedDict[float, float] = SortedDict()
|
||||
self._updated_at: datetime = datetime.now(UTC)
|
||||
|
||||
@property
|
||||
def updated_at(self) -> datetime:
|
||||
return self._updated_at
|
||||
|
||||
def apply_bids(self, updates: Iterable[BookLevel]) -> None:
|
||||
for level in updates:
|
||||
if level.volume <= 0:
|
||||
self._bids.pop(level.price, None)
|
||||
else:
|
||||
self._bids[level.price] = level.volume
|
||||
self._updated_at = datetime.now(UTC)
|
||||
|
||||
def apply_asks(self, updates: Iterable[BookLevel]) -> None:
|
||||
for level in updates:
|
||||
if level.volume <= 0:
|
||||
self._asks.pop(level.price, None)
|
||||
else:
|
||||
self._asks[level.price] = level.volume
|
||||
self._updated_at = datetime.now(UTC)
|
||||
|
||||
def best_bid(self) -> BookLevel | None:
|
||||
if not self._bids:
|
||||
return None
|
||||
price = self._bids.peekitem(-1)[0]
|
||||
return BookLevel(price=price, volume=self._bids[price])
|
||||
|
||||
def best_ask(self) -> BookLevel | None:
|
||||
if not self._asks:
|
||||
return None
|
||||
price = self._asks.peekitem(0)[0]
|
||||
return BookLevel(price=price, volume=self._asks[price])
|
||||
|
||||
def snapshot(self) -> BookView:
|
||||
return BookView(
|
||||
best_bid=self.best_bid(),
|
||||
best_ask=self.best_ask(),
|
||||
updated_at=self._updated_at,
|
||||
)
|
||||
|
||||
def top_levels(self, depth: int = 10) -> tuple[list[BookLevel], list[BookLevel]]:
|
||||
bid_keys = list(self._bids.keys())
|
||||
ask_keys = list(self._asks.keys())
|
||||
|
||||
bids = [
|
||||
BookLevel(price=price, volume=self._bids[price])
|
||||
for price in reversed(bid_keys[-depth:])
|
||||
]
|
||||
asks = [BookLevel(price=price, volume=self._asks[price]) for price in ask_keys[:depth]]
|
||||
return bids, asks
|
||||
|
||||
def compute_checksum(self, depth: int = 10) -> int:
|
||||
bids, asks = self.top_levels(depth)
|
||||
combined: list[str] = []
|
||||
for level in bids:
|
||||
combined.append(_normalize_price_for_checksum(level.price))
|
||||
combined.append(_normalize_volume_for_checksum(level.volume))
|
||||
for level in asks:
|
||||
combined.append(_normalize_price_for_checksum(level.price))
|
||||
combined.append(_normalize_volume_for_checksum(level.volume))
|
||||
|
||||
import zlib
|
||||
|
||||
return zlib.crc32("".join(combined).encode("utf-8"))
|
||||
@@ -38,6 +38,14 @@ CREATE TABLE IF NOT EXISTS portfolio_snapshots (
|
||||
balances JSON,
|
||||
total_value_usd DOUBLE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS market_snapshots (
|
||||
snapshot_at TIMESTAMP NOT NULL,
|
||||
symbol VARCHAR NOT NULL,
|
||||
source VARCHAR NOT NULL,
|
||||
payload JSON NOT NULL,
|
||||
latency_ms DOUBLE
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
|
||||
from arbitrade.storage.repositories import MarketSnapshotRecord, MarketSnapshotRepository
|
||||
|
||||
_LOG = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MarketSnapshot:
|
||||
snapshot_at: datetime
|
||||
symbol: str
|
||||
source: str
|
||||
payload: dict[str, Any]
|
||||
latency_ms: float | None
|
||||
|
||||
|
||||
class AsyncMarketSnapshotWriter:
|
||||
def __init__(self, repository: MarketSnapshotRepository, max_queue_size: int = 50_000) -> None:
|
||||
self._repository = repository
|
||||
self._queue: asyncio.Queue[MarketSnapshot] = asyncio.Queue(maxsize=max_queue_size)
|
||||
self._task: asyncio.Task[None] | None = None
|
||||
self._stop = asyncio.Event()
|
||||
|
||||
async def start(self) -> None:
|
||||
if self._task is None or self._task.done():
|
||||
self._stop.clear()
|
||||
self._task = asyncio.create_task(self._run(), name="market-snapshot-writer")
|
||||
|
||||
async def stop(self) -> None:
|
||||
self._stop.set()
|
||||
if self._task is not None:
|
||||
await self._task
|
||||
|
||||
async def enqueue(self, snapshot: MarketSnapshot) -> None:
|
||||
await self._queue.put(snapshot)
|
||||
|
||||
async def _run(self) -> None:
|
||||
while not (self._stop.is_set() and self._queue.empty()):
|
||||
try:
|
||||
item = await asyncio.wait_for(self._queue.get(), timeout=0.5)
|
||||
except TimeoutError:
|
||||
continue
|
||||
|
||||
try:
|
||||
self._repository.insert(
|
||||
MarketSnapshotRecord(
|
||||
snapshot_at=item.snapshot_at,
|
||||
symbol=item.symbol,
|
||||
source=item.source,
|
||||
payload=item.payload,
|
||||
latency_ms=item.latency_ms,
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
_LOG.error("market_snapshot_write_failed", error=str(exc), symbol=item.symbol)
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
@@ -0,0 +1,39 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import orjson
|
||||
|
||||
from arbitrade.storage.db import DuckDBStore
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MarketSnapshotRecord:
|
||||
snapshot_at: datetime
|
||||
symbol: str
|
||||
source: str
|
||||
payload: dict[str, Any]
|
||||
latency_ms: float | None
|
||||
|
||||
|
||||
class MarketSnapshotRepository:
|
||||
def __init__(self, store: DuckDBStore) -> None:
|
||||
self._store = store
|
||||
|
||||
def insert(self, record: MarketSnapshotRecord) -> None:
|
||||
with self._store.connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO market_snapshots (snapshot_at, symbol, source, payload, latency_ms)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
[
|
||||
record.snapshot_at,
|
||||
record.symbol,
|
||||
record.source,
|
||||
orjson.dumps(record.payload).decode("utf-8"),
|
||||
record.latency_ms,
|
||||
],
|
||||
)
|
||||
@@ -0,0 +1,48 @@
|
||||
from arbitrade.detection.graph import CurrencyGraph
|
||||
|
||||
|
||||
def test_currency_graph_from_kraken_pairs_builds_adjacency() -> None:
|
||||
asset_pairs = {
|
||||
"XXBTZUSD": {"wsname": "BTC/USD"},
|
||||
"XETHXXBT": {"wsname": "ETH/BTC"},
|
||||
"XETHZUSD": {"wsname": "ETH/USD"},
|
||||
}
|
||||
|
||||
graph = CurrencyGraph.from_kraken_asset_pairs(asset_pairs)
|
||||
|
||||
assert "USD" in graph.adjacency
|
||||
assert "BTC" in graph.adjacency["USD"]
|
||||
assert "ETH" in graph.adjacency["USD"]
|
||||
|
||||
|
||||
def test_triangular_cycles_detected_once() -> None:
|
||||
asset_pairs = {
|
||||
"XXBTZUSD": {"wsname": "BTC/USD"},
|
||||
"XETHXXBT": {"wsname": "ETH/BTC"},
|
||||
"XETHZUSD": {"wsname": "ETH/USD"},
|
||||
}
|
||||
|
||||
graph = CurrencyGraph.from_kraken_asset_pairs(asset_pairs)
|
||||
cycles = graph.triangular_cycles()
|
||||
|
||||
assert len(cycles) == 1
|
||||
cycle = cycles[0]
|
||||
assert cycle.currencies == ("BTC", "ETH", "USD")
|
||||
assert set(cycle.pairs) == {"BTC/USD", "ETH/BTC", "ETH/USD"}
|
||||
|
||||
|
||||
def test_cycles_indexed_by_pair() -> None:
|
||||
asset_pairs = {
|
||||
"XXBTZUSD": {"wsname": "BTC/USD"},
|
||||
"XETHXXBT": {"wsname": "ETH/BTC"},
|
||||
"XETHZUSD": {"wsname": "ETH/USD"},
|
||||
}
|
||||
|
||||
graph = CurrencyGraph.from_kraken_asset_pairs(asset_pairs)
|
||||
cycles = graph.triangular_cycles()
|
||||
index = graph.index_cycles_by_pair(cycles)
|
||||
|
||||
assert "BTC/USD" in index
|
||||
assert "ETH/BTC" in index
|
||||
assert "ETH/USD" in index
|
||||
assert len(index["BTC/USD"]) == 1
|
||||
@@ -0,0 +1,112 @@
|
||||
import httpx
|
||||
import pytest
|
||||
import respx
|
||||
|
||||
from arbitrade.config.settings import Settings
|
||||
from arbitrade.exchange.kraken_rest import KrakenRestClient
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_server_time_success() -> None:
|
||||
settings = Settings(_env_file=None)
|
||||
client = KrakenRestClient(settings)
|
||||
|
||||
with respx.mock(base_url=settings.kraken_rest_url) as mock_router:
|
||||
mock_router.get("/0/public/Time").respond(
|
||||
200,
|
||||
json={"error": [], "result": {"unixtime": 1}},
|
||||
)
|
||||
|
||||
payload = await client.server_time()
|
||||
|
||||
await client.close()
|
||||
assert payload["unixtime"] == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retry_then_success() -> None:
|
||||
settings = Settings(
|
||||
_env_file=None,
|
||||
kraken_retry_attempts=2,
|
||||
kraken_retry_base_delay_seconds=0.0,
|
||||
)
|
||||
client = KrakenRestClient(settings)
|
||||
|
||||
with respx.mock(base_url=settings.kraken_rest_url) as mock_router:
|
||||
route = mock_router.get("/0/public/Time")
|
||||
route.side_effect = [
|
||||
httpx.ConnectError("boom"),
|
||||
httpx.Response(200, json={"error": [], "result": {"unixtime": 2}}),
|
||||
]
|
||||
|
||||
payload = await client.server_time()
|
||||
|
||||
await client.close()
|
||||
assert payload["unixtime"] == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_balances_private_call_uses_headers() -> None:
|
||||
settings = Settings(
|
||||
_env_file=None,
|
||||
KRAKEN_API_KEY="key",
|
||||
KRAKEN_API_SECRET="c2VjcmV0", # base64("secret")
|
||||
kraken_private_rate_limit_seconds=0.0,
|
||||
)
|
||||
client = KrakenRestClient(settings)
|
||||
|
||||
with respx.mock(base_url=settings.kraken_rest_url) as mock_router:
|
||||
route = mock_router.post("/0/private/Balance").respond(
|
||||
200,
|
||||
json={"error": [], "result": {"ZUSD": "10.0"}},
|
||||
)
|
||||
payload = await client.balances()
|
||||
|
||||
await client.close()
|
||||
request = route.calls.last.request
|
||||
assert request.headers.get("API-Key") == "key"
|
||||
assert request.headers.get("API-Sign")
|
||||
assert payload["ZUSD"] == "10.0"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_balances_requires_credentials() -> None:
|
||||
settings = Settings(
|
||||
_env_file=None,
|
||||
KRAKEN_API_KEY=None,
|
||||
KRAKEN_API_SECRET=None,
|
||||
kraken_private_rate_limit_seconds=0.0,
|
||||
)
|
||||
client = KrakenRestClient(settings)
|
||||
|
||||
with pytest.raises(RuntimeError, match="Missing Kraken API credentials"):
|
||||
await client.balances()
|
||||
|
||||
await client.close()
|
||||
|
||||
|
||||
def test_compliance_default_ok() -> None:
|
||||
settings = Settings(_env_file=None)
|
||||
client = KrakenRestClient(settings)
|
||||
|
||||
issues = client.validate_compliance()
|
||||
|
||||
assert issues == []
|
||||
|
||||
|
||||
def test_compliance_detects_insecure_config() -> None:
|
||||
settings = Settings(
|
||||
_env_file=None,
|
||||
KRAKEN_REST_URL="http://api.kraken.com",
|
||||
KRAKEN_PRIVATE_RATE_LIMIT_SECONDS=0.0,
|
||||
KRAKEN_RETRY_ATTEMPTS=0,
|
||||
KRAKEN_RETRY_BASE_DELAY_SECONDS=-1.0,
|
||||
)
|
||||
client = KrakenRestClient(settings)
|
||||
|
||||
issues = client.validate_compliance()
|
||||
|
||||
assert any("https://" in issue for issue in issues)
|
||||
assert any("below 1.0" in issue for issue in issues)
|
||||
assert any("ATTEMPTS" in issue for issue in issues)
|
||||
assert any("BASE_DELAY" in issue for issue in issues)
|
||||
@@ -0,0 +1,26 @@
|
||||
from arbitrade.config.settings import Settings
|
||||
from arbitrade.exchange.kraken_ws import KrakenWsClient
|
||||
|
||||
|
||||
def test_parse_book_delta() -> None:
|
||||
client = KrakenWsClient(Settings())
|
||||
message = {
|
||||
"channel": "book",
|
||||
"symbol": "BTC/USD",
|
||||
"data": [
|
||||
{
|
||||
"bids": [{"price": "100.0", "qty": "1.2"}],
|
||||
"asks": [{"price": "100.5", "qty": "0.8"}],
|
||||
"checksum": 123,
|
||||
"timestamp": 1717232000000,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
delta = client.parse_book_delta(message)
|
||||
|
||||
assert delta is not None
|
||||
assert delta.symbol == "BTC/USD"
|
||||
assert len(delta.bids) == 1
|
||||
assert len(delta.asks) == 1
|
||||
assert delta.checksum == 123
|
||||
@@ -0,0 +1,27 @@
|
||||
from arbitrade.exchange.models import BookLevel
|
||||
from arbitrade.market_data.order_book import OrderBook
|
||||
|
||||
|
||||
def test_order_book_apply_and_best_levels() -> None:
|
||||
book = OrderBook()
|
||||
book.apply_bids([BookLevel(price=100.0, volume=1.0), BookLevel(price=99.5, volume=2.0)])
|
||||
book.apply_asks([BookLevel(price=100.5, volume=1.1), BookLevel(price=101.0, volume=0.9)])
|
||||
|
||||
best_bid = book.best_bid()
|
||||
best_ask = book.best_ask()
|
||||
|
||||
assert best_bid is not None
|
||||
assert best_ask is not None
|
||||
assert best_bid.price == 100.0
|
||||
assert best_ask.price == 100.5
|
||||
|
||||
|
||||
def test_order_book_checksum_matches_self() -> None:
|
||||
book = OrderBook()
|
||||
book.apply_bids([BookLevel(price=100.0, volume=1.0)])
|
||||
book.apply_asks([BookLevel(price=100.5, volume=1.0)])
|
||||
|
||||
checksum = book.compute_checksum()
|
||||
|
||||
assert isinstance(checksum, int)
|
||||
assert checksum == book.compute_checksum()
|
||||
Reference in New Issue
Block a user