From ef22e217c7dfd44526430e0c7e25e0a623610971 Mon Sep 17 00:00:00 2001 From: zwitschi Date: Sun, 7 Jun 2026 14:50:55 +0200 Subject: [PATCH] feat: update environment configuration and improve repository handling - Added PG_PASSWORD to .env.example for database connection. - Removed unnecessary imports and streamlined code in various modules. - Enhanced error handling in ConfigSettingRepository and ConfigPairingRepository. - Updated test files to remove unused imports and improve clarity. --- .env.example | 5 +++ README.md | 1 - src/arbitrade/api/app.py | 9 ++++- src/arbitrade/execution/sequencer.py | 25 ++++++++----- src/arbitrade/market_data/feed.py | 37 +++++++++++-------- src/arbitrade/storage/repositories.py | 20 ++++++++-- tests/integration/test_audit_repository.py | 1 - .../integration/test_execution_persistence.py | 2 +- tests/integration/test_metrics.py | 1 - tests/integration/test_opportunity_writer.py | 2 +- tests/integration/test_postgresql_schema.py | 3 +- 11 files changed, 69 insertions(+), 37 deletions(-) diff --git a/.env.example b/.env.example index a0185df..c8a78fc 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,11 @@ APP_ENV=dev APP_HOST=0.0.0.0 APP_PORT=8000 +PG_HOST=192.168.88.35 +PG_PORT=5432 +PG_DATABASE=arbitrade +PG_USER=arbitrade +PG_PASSWORD=arbitrade LOG_LEVEL=INFO LOG_JSON=true ALERTS_ENABLED=true diff --git a/README.md b/README.md index 1c34304..c92dbf6 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,6 @@ Current stack: - Native Kraken WebSocket planned for market-data hot path - Gitea Actions + Gitea container registry -Project plan lives in [PLAN.md](PLAN.md). Task checklist lives in [.github/instructions/TODO.md](.github/instructions/TODO.md). Coolify deployment runbooks live in [docs/DEPLOYMENT.md](docs/DEPLOYMENT.md). diff --git a/src/arbitrade/api/app.py b/src/arbitrade/api/app.py index 02f8ff0..e63232b 100644 --- a/src/arbitrade/api/app.py +++ b/src/arbitrade/api/app.py @@ -25,10 +25,15 @@ from arbitrade.market_data.feed_builder import ( ) from arbitrade.metrics import MetricsCalculator from arbitrade.runtime.lifecycle import graceful_shutdown, restore_runtime_state -from arbitrade.storage.pg_store import PgStore from arbitrade.storage.market_snapshots import AsyncMarketSnapshotWriter from arbitrade.storage.opportunities import AsyncOpportunityWriter -from arbitrade.storage.repositories import AuditRepository, RuntimeStateRepository, MarketSnapshotRepository, OpportunityRepository +from arbitrade.storage.pg_store import PgStore +from arbitrade.storage.repositories import ( + AuditRepository, + MarketSnapshotRepository, + OpportunityRepository, + RuntimeStateRepository, +) _LOG = structlog.get_logger(__name__) diff --git a/src/arbitrade/execution/sequencer.py b/src/arbitrade/execution/sequencer.py index 35f7236..9d9fc2f 100644 --- a/src/arbitrade/execution/sequencer.py +++ b/src/arbitrade/execution/sequencer.py @@ -47,13 +47,15 @@ class TriangularExecutionSequencer: rest_client: SupportsOrderPlacement, *, available_pairs: Sequence[str], - volume_for_leg: Callable[[OpportunityEvent, ExecutionLeg, int], float] | None = None, + volume_for_leg: Callable[[OpportunityEvent, + ExecutionLeg, int], float] | None = None, execution_writer: AsyncExecutionWriter | None = None, alert_notifier: SupportsAlerts | None = None, audit_repository: AuditRepository | None = None, ) -> None: self._rest_client = rest_client - self._available_pairs = {self._normalize_pair(pair) for pair in available_pairs} + self._available_pairs = {self._normalize_pair( + pair) for pair in available_pairs} self._volume_for_leg = volume_for_leg or self._default_volume_for_leg self._execution_writer = execution_writer self._alert_notifier = alert_notifier @@ -100,12 +102,15 @@ class TriangularExecutionSequencer: raise ValueError(f"No tradable pair for leg {from_cur}->{to_cur}") def _build_legs(self, event: OpportunityEvent) -> tuple[ExecutionLeg, ...]: - currencies = [part.strip().upper() for part in event.cycle.split("->") if part.strip()] + currencies = [part.strip().upper() + for part in event.cycle.split("->") if part.strip()] if len(currencies) < 4 or currencies[0] != currencies[-1]: - raise ValueError("cycle must be a closed triangular path like A->B->C->A") + raise ValueError( + "cycle must be a closed triangular path like A->B->C->A") if len(currencies) != 4: - raise ValueError("cycle must contain exactly three unique currencies") + raise ValueError( + "cycle must contain exactly three unique currencies") legs: list[ExecutionLeg] = [] for idx in range(3): @@ -120,7 +125,8 @@ class TriangularExecutionSequencer: ) volume = self._volume_for_leg(event, placeholder_leg, idx) if volume <= 0.0: - raise ValueError("volume_for_leg must return a positive volume") + raise ValueError( + "volume_for_leg must return a positive volume") legs.append(self._resolve_leg(from_currency, to_currency, volume)) return tuple(legs) @@ -158,7 +164,7 @@ class TriangularExecutionSequencer: ) except Exception as exc: if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="execution_engine", @@ -209,7 +215,8 @@ class TriangularExecutionSequencer: responses.append(response) if self._execution_writer is not None: - order_ref = self._order_ref_from_response(response, f"leg-{idx}") + order_ref = self._order_ref_from_response( + response, f"leg-{idx}") await self._execution_writer.enqueue( OrderRecord( trade_ref=trade_ref, @@ -265,7 +272,7 @@ class TriangularExecutionSequencer: ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="execution_engine", diff --git a/src/arbitrade/market_data/feed.py b/src/arbitrade/market_data/feed.py index a633789..9cd5f66 100644 --- a/src/arbitrade/market_data/feed.py +++ b/src/arbitrade/market_data/feed.py @@ -38,7 +38,8 @@ class MarketDataFeed: opportunity_writer: AsyncOpportunityWriter | None = None, paper_trading_mode: bool = True, opportunity_executor: ( - Callable[[OpportunityEvent], Awaitable[ExecutionOutcome | float | None]] | None + Callable[[OpportunityEvent], + Awaitable[ExecutionOutcome | float | None]] | None ) = None, trade_capital: float = 1.0, max_trade_capital: float | None = None, @@ -92,7 +93,8 @@ class MarketDataFeed: return {} start = currencies[0] - exposure_assets = {currency for currency in currencies[1:] if currency != start} + exposure_assets = { + currency for currency in currencies[1:] if currency != start} return {asset: event.allocated_capital for asset in exposure_assets} async def run(self) -> None: @@ -144,7 +146,7 @@ class MarketDataFeed: symbol=delta.symbol, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="risk_manager", @@ -172,7 +174,7 @@ class MarketDataFeed: for event in opportunities: if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="detector", @@ -207,7 +209,7 @@ class MarketDataFeed: net_pct=event.net_pct, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="execution_engine", @@ -228,7 +230,7 @@ class MarketDataFeed: updated_pair=event.updated_pair, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="execution_engine", @@ -250,7 +252,7 @@ class MarketDataFeed: reason=self._kill_switch.reason, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="risk_manager", @@ -275,7 +277,7 @@ class MarketDataFeed: reason=self._stop_conditions_guard.halted_reason, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="risk_manager", @@ -298,7 +300,7 @@ class MarketDataFeed: reason=self._loss_limit_guard.halted_reason, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="risk_manager", @@ -313,7 +315,8 @@ class MarketDataFeed: continue if self._pre_trade_validator is not None and self._balance_provider is not None: - required_balances = {self._quote_balance_asset: event.allocated_capital} + required_balances = { + self._quote_balance_asset: event.allocated_capital} balances = { asset.upper(): amount for asset, amount in self._balance_provider().items() @@ -329,7 +332,7 @@ class MarketDataFeed: required_by_asset=required_balances, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="risk_manager", @@ -358,7 +361,7 @@ class MarketDataFeed: exposure_by_asset=exposure_by_asset, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="risk_manager", @@ -381,7 +384,8 @@ class MarketDataFeed: outcome = await self._opportunity_executor(event) except Exception as exc: if self._trade_limits_guard is not None: - self._trade_limits_guard.close_trade(exposure_by_asset) + self._trade_limits_guard.close_trade( + exposure_by_asset) dispatch_alert_nowait( self._alert_notifier, @@ -420,7 +424,7 @@ class MarketDataFeed: updated_pair=event.updated_pair, ) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="execution_engine", @@ -447,7 +451,8 @@ class MarketDataFeed: realized_pnl = outcome if realized_pnl is not None and self._loss_limit_guard is not None: - self._loss_limit_guard.register_realized_pnl(realized_pnl) + self._loss_limit_guard.register_realized_pnl( + realized_pnl) if self._loss_limit_guard.is_halted: _LOG.warning( "loss_limit_halt_triggered", @@ -459,7 +464,7 @@ class MarketDataFeed: self._trade_limits_guard.close_trade(exposure_by_asset) if self._audit_repository is not None: - self._audit_repository.insert( + await self._audit_repository.insert( AuditRecord( occurred_at=datetime.now(UTC), actor="execution_engine", diff --git a/src/arbitrade/storage/repositories.py b/src/arbitrade/storage/repositories.py index 9ba472f..ad37aa6 100644 --- a/src/arbitrade/storage/repositories.py +++ b/src/arbitrade/storage/repositories.py @@ -521,7 +521,11 @@ class ConfigSettingRepository: """, key, ) - return result != "DELETE 0" + if result is None: + return False + elif isinstance(result, str): + return result != "DELETE 0" + return False async def list_settings(self, section: str | None = None) -> list[ConfigSetting]: """List all configuration settings, optionally filtered by section.""" @@ -567,7 +571,7 @@ class ConfigSettingRepository: ts = row["latest_updated_at"] if isinstance(ts, str): return datetime.fromisoformat(ts.replace("Z", "+00:00")) - return ts + return ts # type: ignore[no-any-return] return None @@ -663,7 +667,11 @@ class ConfigPairingRepository: """, base_asset, quote_asset, ) - return result != "DELETE 0" + if result is None: + return False + elif isinstance(result, str): + return result != "DELETE 0" + return False async def upsert_pairing(self, pairing: ConfigPairing) -> ConfigPairing: """Insert or update a currency pairing (upsert on base_asset, quote_asset).""" @@ -996,4 +1004,8 @@ class BacktestJobRepository: "DELETE FROM backtest_jobs WHERE id = $1", job_id, ) - return result != "DELETE 0" + if result is None: + return False + elif isinstance(result, str): + return result != "DELETE 0" + return False diff --git a/tests/integration/test_audit_repository.py b/tests/integration/test_audit_repository.py index 2109b7c..7e138c8 100644 --- a/tests/integration/test_audit_repository.py +++ b/tests/integration/test_audit_repository.py @@ -5,7 +5,6 @@ from contextlib import asynccontextmanager from datetime import UTC, datetime import pytest -import pytest_asyncio from arbitrade.config.settings import get_settings from arbitrade.storage.pg_store import PgStore diff --git a/tests/integration/test_execution_persistence.py b/tests/integration/test_execution_persistence.py index c3aa9ac..c28d880 100644 --- a/tests/integration/test_execution_persistence.py +++ b/tests/integration/test_execution_persistence.py @@ -10,8 +10,8 @@ import pytest from arbitrade.config.settings import get_settings from arbitrade.detection.engine import OpportunityEvent from arbitrade.execution.sequencer import TriangularExecutionSequencer -from arbitrade.storage.pg_store import PgStore from arbitrade.storage.executions import AsyncExecutionWriter +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import OrderRepository, PnLRepository, TradeRepository pytestmark = pytest.mark.integration diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 61525c1..4e7336a 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -5,7 +5,6 @@ from contextlib import asynccontextmanager from datetime import UTC, datetime, timedelta import pytest -import pytest_asyncio from arbitrade.config.settings import get_settings from arbitrade.metrics import MetricsCalculator diff --git a/tests/integration/test_opportunity_writer.py b/tests/integration/test_opportunity_writer.py index 705bce2..aca6a6d 100644 --- a/tests/integration/test_opportunity_writer.py +++ b/tests/integration/test_opportunity_writer.py @@ -8,8 +8,8 @@ import pytest from arbitrade.config.settings import get_settings from arbitrade.detection.engine import OpportunityEvent -from arbitrade.storage.pg_store import PgStore from arbitrade.storage.opportunities import AsyncOpportunityWriter +from arbitrade.storage.pg_store import PgStore from arbitrade.storage.repositories import OpportunityRepository pytestmark = pytest.mark.integration diff --git a/tests/integration/test_postgresql_schema.py b/tests/integration/test_postgresql_schema.py index d87ee52..c9c7c9a 100644 --- a/tests/integration/test_postgresql_schema.py +++ b/tests/integration/test_postgresql_schema.py @@ -13,7 +13,7 @@ from contextlib import asynccontextmanager import pytest import pytest_asyncio -from arbitrade.config.settings import Settings, get_settings +from arbitrade.config.settings import get_settings from arbitrade.storage.pg_store import PgStore pytestmark = pytest.mark.integration @@ -332,6 +332,7 @@ async def test_audit_list_recent(pg: PgStore) -> None: """AuditRepository.list_recent returns records in desc order.""" await pg.migrate() from datetime import UTC, datetime + from arbitrade.storage.repositories import AuditRecord, AuditRepository repo = AuditRepository(pg)