Files
arbitrade/build/lib/arbitrade/exchange/kraken_rest.py
T
zwitschi 1df4b11aef
CI / lint-test-build (push) Failing after 1m7s
Add HTML templates for dashboard, metrics, overview, and backtesting
- Introduced new HTML templates for the dashboard, metrics, overview, and backtesting functionalities.
- Implemented partial templates for metrics, overview, audit, controls, and charts to enhance modularity.
- Updated the Jinja2 template resolution logic to support different deployment environments.
- Added a health check template to display the service status.
- Included a test suite to verify the template resolution logic.
- Updated `pyproject.toml` to include new HTML templates in the package data.
2026-06-02 14:16:42 +02:00

282 lines
9.4 KiB
Python

from __future__ import annotations
import asyncio
import time
from typing import Any
from urllib.parse import urlencode
import httpx
import structlog
from arbitrade.config.settings import Settings
from arbitrade.exchange.models import KrakenApiResult, LatencySample
from arbitrade.exchange.signing import sign_kraken_private_path
_LOG = structlog.get_logger(__name__)
def _result_dict(payload: dict[str, Any]) -> dict[str, Any]:
result = payload.get("result", {})
if isinstance(result, dict):
return result
return {}
class KrakenRestClient:
def __init__(self, settings: Settings) -> None:
self._settings = settings
self._client = httpx.AsyncClient(
base_url=settings.kraken_rest_url,
timeout=settings.kraken_http_timeout_seconds,
limits=httpx.Limits(max_keepalive_connections=10, max_connections=50),
headers={"User-Agent": "arbitrade/0.1.0"},
)
self._private_lock = asyncio.Lock()
issues = self.validate_compliance()
if issues:
_LOG.warning("kraken_compliance_issues", issues=issues)
else:
_LOG.info("kraken_compliance_ok")
def validate_compliance(self) -> list[str]:
issues: list[str] = []
if not self._settings.kraken_rest_url.startswith("https://"):
issues.append("KRAKEN_REST_URL should use https://")
if self._settings.kraken_private_rate_limit_seconds < 1.0:
issues.append("KRAKEN_PRIVATE_RATE_LIMIT_SECONDS below 1.0 may violate Kraken limits")
if self._settings.kraken_retry_attempts < 1:
issues.append("KRAKEN_RETRY_ATTEMPTS must be >= 1")
if self._settings.kraken_retry_base_delay_seconds < 0:
issues.append("KRAKEN_RETRY_BASE_DELAY_SECONDS must be >= 0")
return issues
async def close(self) -> None:
await self._client.aclose()
async def warm_connection_pool(self) -> None:
await self.server_time()
async def _request_with_retry(
self,
endpoint: str,
params: dict[str, Any] | None = None,
) -> KrakenApiResult:
attempts = self._settings.kraken_retry_attempts
delay = self._settings.kraken_retry_base_delay_seconds
params = params or {}
for attempt in range(1, attempts + 1):
t0 = time.perf_counter()
try:
response = await self._client.get(endpoint, params=params)
response.raise_for_status()
payload = response.json()
if payload.get("error"):
raise RuntimeError(f"Kraken error: {payload['error']}")
latency = (time.perf_counter() - t0) * 1000
_LOG.info(
"kraken_rest_request_ok",
endpoint=endpoint,
attempt=attempt,
latency_ms=latency,
sample=LatencySample.now("rest_request", latency_ms=latency).latency_ms,
)
return KrakenApiResult(endpoint=endpoint, payload=payload)
except Exception as exc:
latency = (time.perf_counter() - t0) * 1000
_LOG.warning(
"kraken_rest_request_failed",
endpoint=endpoint,
attempt=attempt,
latency_ms=latency,
error=str(exc),
)
if attempt >= attempts:
raise
await asyncio.sleep(delay * (2 ** (attempt - 1)))
raise RuntimeError("unreachable retry loop")
async def _private_post_with_retry(
self,
endpoint: str,
data: dict[str, str] | None = None,
) -> KrakenApiResult:
api_key = self._settings.kraken_api_key
api_secret = self._settings.kraken_api_secret
if not api_key or not api_secret:
raise RuntimeError("Missing Kraken API credentials for private endpoint")
attempts = self._settings.kraken_retry_attempts
delay = self._settings.kraken_retry_base_delay_seconds
for attempt in range(1, attempts + 1):
t0 = time.perf_counter()
try:
nonce = str(int(time.time() * 1000))
payload = {"nonce": nonce}
if data is not None:
payload.update(data)
encoded = urlencode(payload)
signature = sign_kraken_private_path(endpoint, nonce, encoded, api_secret)
response = await self._client.post(
endpoint,
data=payload,
headers={
"API-Key": api_key,
"API-Sign": signature,
},
)
response.raise_for_status()
body = response.json()
if body.get("error"):
raise RuntimeError(f"Kraken error: {body['error']}")
latency = (time.perf_counter() - t0) * 1000
_LOG.info(
"kraken_private_rest_request_ok",
endpoint=endpoint,
attempt=attempt,
latency_ms=latency,
sample=LatencySample.now("private_rest_request", latency_ms=latency).latency_ms,
)
return KrakenApiResult(endpoint=endpoint, payload=body)
except Exception as exc:
latency = (time.perf_counter() - t0) * 1000
_LOG.warning(
"kraken_private_rest_request_failed",
endpoint=endpoint,
attempt=attempt,
latency_ms=latency,
error=str(exc),
)
if attempt >= attempts:
raise
await asyncio.sleep(delay * (2 ** (attempt - 1)))
raise RuntimeError("unreachable retry loop")
async def server_time(self) -> dict[str, Any]:
result = await self._request_with_retry("/0/public/Time")
return _result_dict(result.payload)
async def assets(self) -> dict[str, Any]:
result = await self._request_with_retry("/0/public/Assets")
return _result_dict(result.payload)
async def asset_pairs(self) -> dict[str, Any]:
result = await self._request_with_retry("/0/public/AssetPairs")
return _result_dict(result.payload)
async def _throttled_private_call(
self,
endpoint: str,
data: dict[str, str] | None = None,
) -> dict[str, Any]:
async with self._private_lock:
result = await self._private_post_with_retry(endpoint, data=data)
await asyncio.sleep(self._settings.kraken_private_rate_limit_seconds)
return _result_dict(result.payload)
async def balances(self) -> dict[str, Any]:
return await self._throttled_private_call("/0/private/Balance")
async def place_market_order(
self,
*,
pair: str,
side: str,
volume: float,
user_ref: int | None = None,
) -> dict[str, Any]:
normalized_side = side.lower()
if normalized_side not in {"buy", "sell"}:
raise ValueError("side must be 'buy' or 'sell'")
if volume <= 0.0:
raise ValueError("volume must be > 0.0")
if user_ref is not None and user_ref < 0:
raise ValueError("user_ref must be >= 0")
data = {
"pair": pair,
"type": normalized_side,
"ordertype": "market",
"volume": str(volume),
}
if user_ref is not None:
data["userref"] = str(user_ref)
return await self._throttled_private_call(
"/0/private/AddOrder",
data=data,
)
async def place_limit_order(
self,
*,
pair: str,
side: str,
volume: float,
price: float,
user_ref: int | None = None,
) -> dict[str, Any]:
normalized_side = side.lower()
if normalized_side not in {"buy", "sell"}:
raise ValueError("side must be 'buy' or 'sell'")
if volume <= 0.0:
raise ValueError("volume must be > 0.0")
if price <= 0.0:
raise ValueError("price must be > 0.0")
if user_ref is not None and user_ref < 0:
raise ValueError("user_ref must be >= 0")
data = {
"pair": pair,
"type": normalized_side,
"ordertype": "limit",
"price": str(price),
"volume": str(volume),
}
if user_ref is not None:
data["userref"] = str(user_ref)
return await self._throttled_private_call(
"/0/private/AddOrder",
data=data,
)
async def query_order(
self,
*,
order_id: str,
include_trades: bool = True,
) -> dict[str, Any]:
if not order_id.strip():
raise ValueError("order_id must be non-empty")
return await self._throttled_private_call(
"/0/private/QueryOrders",
data={
"txid": order_id,
"trades": "true" if include_trades else "false",
},
)
async def cancel_order(self, *, order_id: str) -> dict[str, Any]:
if not order_id.strip():
raise ValueError("order_id must be non-empty")
return await self._throttled_private_call(
"/0/private/CancelOrder",
data={"txid": order_id},
)