Files
rail-game/backend/app/services/network.py
zwitschi 1099a738a3 feat: add Playwright configuration and initial e2e test for authentication
- Created Playwright configuration file to set up testing environment.
- Added a new e2e test for user authentication in login.spec.ts.
- Updated tsconfig.node.json to include playwright.config.ts.
- Enhanced vite.config.ts to include API proxying for backend integration.
- Added a placeholder for last run test results in .last-run.json.
2025-10-11 17:25:38 +02:00

163 lines
5.0 KiB
Python

"""Domain services for railway network aggregation."""
from datetime import datetime, timezone
from decimal import Decimal
from typing import Iterable, cast
from geoalchemy2.elements import WKBElement, WKTElement
from geoalchemy2.shape import to_shape
try: # pragma: no cover - optional dependency guard
from shapely.geometry import Point # type: ignore
except ImportError: # pragma: no cover - allow running without shapely at import time
Point = None # type: ignore[assignment]
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from backend.app.models import StationModel, TrackModel, TrainModel
from backend.app.repositories import StationRepository, TrackRepository, TrainRepository
def _timestamp() -> datetime:
return datetime.now(timezone.utc)
def _fallback_snapshot() -> dict[str, list[dict[str, object]]]:
now = _timestamp()
stations = [
StationModel(
id="station-1",
name="Central",
latitude=52.520008,
longitude=13.404954,
created_at=now,
updated_at=now,
),
StationModel(
id="station-2",
name="Harbor",
latitude=53.551086,
longitude=9.993682,
created_at=now,
updated_at=now,
),
]
tracks = [
TrackModel(
id="track-1",
start_station_id="station-1",
end_station_id="station-2",
length_meters=289000.0,
max_speed_kph=230.0,
created_at=now,
updated_at=now,
)
]
trains = [
TrainModel(
id="train-1",
designation="ICE 123",
capacity=400,
max_speed_kph=300.0,
operating_track_ids=[track.id for track in tracks],
created_at=now,
updated_at=now,
)
]
return _serialize_snapshot(stations, tracks, trains)
def _serialize_snapshot(
stations: Iterable[StationModel],
tracks: Iterable[TrackModel],
trains: Iterable[TrainModel],
) -> dict[str, list[dict[str, object]]]:
return {
"stations": [station.model_dump(by_alias=True) for station in stations],
"tracks": [track.model_dump(by_alias=True) for track in tracks],
"trains": [train.model_dump(by_alias=True) for train in trains],
}
def _to_float(value: Decimal | float | int | None, default: float = 0.0) -> float:
if value is None:
return default
if isinstance(value, Decimal):
return float(value)
return float(value)
def get_network_snapshot(session: Session) -> dict[str, list[dict[str, object]]]:
station_repo = StationRepository(session)
track_repo = TrackRepository(session)
train_repo = TrainRepository(session)
try:
stations_entities = station_repo.list_active()
tracks_entities = track_repo.list_all()
trains_entities = train_repo.list_all()
except SQLAlchemyError:
session.rollback()
return _fallback_snapshot()
if not stations_entities and not tracks_entities and not trains_entities:
return _fallback_snapshot()
station_models: list[StationModel] = []
for station in stations_entities:
location = station.location
geom = (
to_shape(cast(WKBElement | WKTElement, location))
if location is not None and Point is not None
else None
)
if Point is not None and geom is not None and isinstance(geom, Point):
latitude = float(geom.y)
longitude = float(geom.x)
else:
latitude = 0.0
longitude = 0.0
station_models.append(
StationModel(
id=str(station.id),
name=station.name,
latitude=latitude,
longitude=longitude,
created_at=cast(datetime, station.created_at),
updated_at=cast(datetime, station.updated_at),
)
)
track_models: list[TrackModel] = []
for track in tracks_entities:
track_models.append(
TrackModel(
id=str(track.id),
start_station_id=str(track.start_station_id),
end_station_id=str(track.end_station_id),
length_meters=_to_float(track.length_meters),
max_speed_kph=_to_float(track.max_speed_kph),
created_at=cast(datetime, track.created_at),
updated_at=cast(datetime, track.updated_at),
)
)
train_models: list[TrainModel] = []
for train in trains_entities:
train_models.append(
TrainModel(
id=str(train.id),
designation=train.designation,
capacity=train.capacity,
max_speed_kph=_to_float(train.max_speed_kph),
operating_track_ids=[],
created_at=cast(datetime, train.created_at),
updated_at=cast(datetime, train.updated_at),
)
)
return _serialize_snapshot(station_models, track_models, train_models)