feat: Initialize frontend and backend structure with essential configurations
Some checks failed
Backend CI / lint-and-test (push) Failing after 2m15s
Frontend CI / lint-and-build (push) Successful in 1m1s

- Added TypeScript build info for frontend.
- Created Vite configuration for React application.
- Implemented pre-commit hook to run checks before commits.
- Set up PostgreSQL Dockerfile with PostGIS support and initialization scripts.
- Added database creation script for PostgreSQL with necessary extensions.
- Established Python project configuration with dependencies and development tools.
- Developed pre-commit script to enforce code quality checks for backend and frontend.
- Created PowerShell script to set up Git hooks path.
This commit is contained in:
2025-10-11 15:25:32 +02:00
commit fc1e874309
74 changed files with 9477 additions and 0 deletions

0
backend/app/__init__.py Normal file
View File

View File

@@ -0,0 +1,10 @@
from fastapi import APIRouter
from backend.app.api.auth import router as auth_router
from backend.app.api.health import router as health_router
from backend.app.api.network import router as network_router
router = APIRouter()
router.include_router(health_router, tags=["health"])
router.include_router(auth_router)
router.include_router(network_router)

41
backend/app/api/auth.py Normal file
View File

@@ -0,0 +1,41 @@
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, status
from backend.app.api.deps import get_current_user
from backend.app.models import AuthResponse, LoginRequest, RegisterRequest, UserPublic
from backend.app.services.auth import (
authenticate_user,
issue_token_for_user,
register_user,
)
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/login", response_model=AuthResponse)
async def login(credentials: LoginRequest) -> AuthResponse:
user = authenticate_user(credentials.username, credentials.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return issue_token_for_user(user)
@router.post("/register", response_model=AuthResponse, status_code=status.HTTP_201_CREATED)
async def register(payload: RegisterRequest) -> AuthResponse:
try:
user = register_user(payload.username, payload.password, payload.full_name)
except ValueError as exc:
message = str(exc)
status_code = status.HTTP_409_CONFLICT if "exists" in message else status.HTTP_400_BAD_REQUEST
raise HTTPException(status_code=status_code, detail=message) from exc
return issue_token_for_user(user)
@router.get("/me", response_model=UserPublic)
async def read_current_user(current_user: UserPublic = Depends(get_current_user)) -> UserPublic:
return current_user

40
backend/app/api/deps.py Normal file
View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from collections.abc import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import ValidationError
from sqlalchemy.orm import Session
from backend.app.core.security import decode_access_token
from backend.app.db.session import get_db_session
from backend.app.models import TokenPayload, UserPublic
from backend.app.services.auth import get_user, to_public_user
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserPublic:
try:
payload = TokenPayload(**decode_access_token(token))
except (ValueError, ValidationError) as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
) from exc
user = get_user(payload.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
return to_public_user(user)
def get_db() -> Generator[Session, None, None]:
yield from get_db_session()

View File

@@ -0,0 +1,8 @@
from fastapi import APIRouter
router = APIRouter()
@router.get("/health", summary="Service health status")
async def health_check() -> dict[str, str]:
return {"status": "ok"}

View File

@@ -0,0 +1,16 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from backend.app.api.deps import get_current_user, get_db
from backend.app.models import UserPublic
from backend.app.services.network import get_network_snapshot
router = APIRouter(prefix="/network", tags=["network"])
@router.get("", summary="Fetch a snapshot of the railway network")
def read_network_snapshot(
_: UserPublic = Depends(get_current_user),
db: Session = Depends(get_db),
) -> dict[str, list[dict[str, object]]]:
return get_network_snapshot(db)

View File

View File

@@ -0,0 +1,38 @@
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Optional
class Settings(BaseSettings):
project_name: str = "Rail Game API"
version: str = "0.1.0"
api_prefix: str = "/api"
jwt_secret_key: str = "insecure-change-me"
jwt_algorithm: str = "HS256"
access_token_expire_minutes: int = 60
database_url: str = "postgresql+psycopg://railgame:railgame@localhost:5432/railgame"
database_echo: bool = False
test_database_url: Optional[str] = None
alembic_database_url: Optional[str] = None
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
@property
def sqlalchemy_database_url(self) -> str:
return self.database_url
@property
def sqlalchemy_test_url(self) -> Optional[str]:
return self.test_database_url
@property
def sqlalchemy_alembic_url(self) -> str:
return self.alembic_database_url or self.database_url
@lru_cache()
def get_settings() -> Settings:
return Settings()

View File

@@ -0,0 +1,36 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any, Dict
from jose import JWTError, jwt
from passlib.context import CryptContext
from backend.app.core.config import get_settings
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(subject: str, expires_delta: timedelta | None = None) -> str:
settings = get_settings()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.access_token_expire_minutes)
)
to_encode: Dict[str, Any] = {"sub": subject, "exp": expire}
return jwt.encode(to_encode, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
def decode_access_token(token: str) -> Dict[str, Any]:
settings = get_settings()
try:
return jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
except JWTError as exc: # pragma: no cover - specific error mapping handled by caller
raise ValueError("Invalid token") from exc

View File

@@ -0,0 +1,6 @@
"""Database package exposing SQLAlchemy base metadata and session utilities."""
from backend.app.db.models import Base
from backend.app.db.session import SessionLocal, engine, get_db_session
__all__ = ["Base", "SessionLocal", "engine", "get_db_session"]

90
backend/app/db/models.py Normal file
View File

@@ -0,0 +1,90 @@
from __future__ import annotations
import uuid
from geoalchemy2 import Geometry
from sqlalchemy import Boolean, DateTime, Float, ForeignKey, Integer, Numeric, String, Text, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql import func
class Base(DeclarativeBase):
"""Base class for all SQLAlchemy models."""
class TimestampMixin:
created_at: Mapped[DateTime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False
)
class User(Base, TimestampMixin):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
email: Mapped[str | None] = mapped_column(String(255), unique=True, nullable=True)
full_name: Mapped[str | None] = mapped_column(String(128), nullable=True)
password_hash: Mapped[str] = mapped_column(String(256), nullable=False)
role: Mapped[str] = mapped_column(String(32), nullable=False, default="player")
preferences: Mapped[str | None] = mapped_column(Text, nullable=True)
class Station(Base, TimestampMixin):
__tablename__ = "stations"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
osm_id: Mapped[str | None] = mapped_column(String(32), nullable=True)
name: Mapped[str] = mapped_column(String(128), nullable=False)
code: Mapped[str | None] = mapped_column(String(16), nullable=True)
location: Mapped[str] = mapped_column(Geometry(geometry_type="POINT", srid=4326), nullable=False)
elevation_m: Mapped[float | None] = mapped_column(Float, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
class Track(Base, TimestampMixin):
__tablename__ = "tracks"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str | None] = mapped_column(String(128), nullable=True)
start_station_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("stations.id", ondelete="RESTRICT"), nullable=False)
end_station_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("stations.id", ondelete="RESTRICT"), nullable=False)
length_meters: Mapped[float | None] = mapped_column(Numeric(10, 2), nullable=True)
max_speed_kph: Mapped[int | None] = mapped_column(Integer, nullable=True)
is_bidirectional: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
status: Mapped[str] = mapped_column(String(32), nullable=False, default="planned")
track_geometry: Mapped[str] = mapped_column(Geometry(geometry_type="LINESTRING", srid=4326), nullable=False)
__table_args__ = (
UniqueConstraint("start_station_id", "end_station_id", name="uq_tracks_station_pair"),
)
class Train(Base, TimestampMixin):
__tablename__ = "trains"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
designation: Mapped[str] = mapped_column(String(64), nullable=False, unique=True)
operator_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"))
home_station_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("stations.id", ondelete="SET NULL"))
capacity: Mapped[int] = mapped_column(Integer, nullable=False)
max_speed_kph: Mapped[int] = mapped_column(Integer, nullable=False)
consist: Mapped[str | None] = mapped_column(Text, nullable=True)
class TrainSchedule(Base, TimestampMixin):
__tablename__ = "train_schedules"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
train_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("trains.id", ondelete="CASCADE"), nullable=False)
sequence_index: Mapped[int] = mapped_column(Integer, nullable=False)
station_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("stations.id", ondelete="CASCADE"), nullable=False)
scheduled_arrival: Mapped[DateTime | None] = mapped_column(DateTime(timezone=True), nullable=True)
scheduled_departure: Mapped[DateTime | None] = mapped_column(DateTime(timezone=True), nullable=True)
dwell_seconds: Mapped[int | None] = mapped_column(Integer, nullable=True)
__table_args__ = (
UniqueConstraint("train_id", "sequence_index", name="uq_train_schedule_sequence"),
)

24
backend/app/db/session.py Normal file
View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from collections.abc import Generator
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from backend.app.core.config import get_settings
settings = get_settings()
engine = create_engine(settings.sqlalchemy_database_url, echo=settings.database_echo, future=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=False)
def get_db_session() -> Generator[Session, None, None]:
session = SessionLocal()
try:
yield session
finally:
session.close()
__all__ = ["engine", "SessionLocal", "get_db_session"]

14
backend/app/main.py Normal file
View File

@@ -0,0 +1,14 @@
from fastapi import FastAPI
from backend.app.api import router as api_router
from backend.app.core.config import get_settings
def create_app() -> FastAPI:
settings = get_settings()
application = FastAPI(title=settings.project_name, version=settings.version)
application.include_router(api_router, prefix=settings.api_prefix)
return application
app = create_app()

View File

@@ -0,0 +1,35 @@
from .auth import (
AuthResponse,
LoginRequest,
RegisterRequest,
TokenPayload,
TokenResponse,
UserInDB,
UserPublic,
)
from .base import (
StationCreate,
StationModel,
TrackCreate,
TrackModel,
TrainCreate,
TrainModel,
to_camel,
)
__all__ = [
"LoginRequest",
"RegisterRequest",
"AuthResponse",
"TokenPayload",
"TokenResponse",
"UserInDB",
"UserPublic",
"StationCreate",
"StationModel",
"TrackCreate",
"TrackModel",
"TrainCreate",
"TrainModel",
"to_camel",
]

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
from pydantic import BaseModel, ConfigDict
from backend.app.models.base import to_camel
class LoginRequest(BaseModel):
username: str
password: str
class RegisterRequest(BaseModel):
username: str
password: str
full_name: str | None = None
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
class TokenPayload(BaseModel):
sub: str
exp: int
class UserPublic(BaseModel):
username: str
full_name: str | None = None
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
class UserInDB(UserPublic):
hashed_password: str
class AuthResponse(TokenResponse):
user: UserPublic
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from datetime import datetime
from typing import Generic, Sequence, TypeVar
from pydantic import BaseModel, ConfigDict
def to_camel(string: str) -> str:
head, *tail = string.split("_")
return head + "".join(part.capitalize() for part in tail)
IdT = TypeVar("IdT", bound=str)
class CamelModel(BaseModel):
model_config = ConfigDict(
from_attributes=True,
populate_by_name=True,
alias_generator=to_camel,
)
class TimestampedModel(CamelModel):
created_at: datetime
updated_at: datetime
model_config = ConfigDict(
frozen=True,
from_attributes=True,
populate_by_name=True,
alias_generator=to_camel,
)
class IdentifiedModel(TimestampedModel, Generic[IdT]):
id: IdT
class StationModel(IdentifiedModel[str]):
name: str
latitude: float
longitude: float
class TrackModel(IdentifiedModel[str]):
start_station_id: str
end_station_id: str
length_meters: float
max_speed_kph: float
class TrainModel(IdentifiedModel[str]):
designation: str
capacity: int
max_speed_kph: float
operating_track_ids: list[str]
class StationCreate(CamelModel):
name: str
latitude: float
longitude: float
osm_id: str | None = None
code: str | None = None
elevation_m: float | None = None
is_active: bool = True
class TrackCreate(CamelModel):
start_station_id: str
end_station_id: str
coordinates: Sequence[tuple[float, float]]
name: str | None = None
length_meters: float | None = None
max_speed_kph: int | None = None
is_bidirectional: bool = True
status: str = "planned"
class TrainCreate(CamelModel):
designation: str
capacity: int
max_speed_kph: int
operator_id: str | None = None
home_station_id: str | None = None
consist: str | None = None

View File

@@ -0,0 +1,11 @@
"""Repository abstractions for database access."""
from backend.app.repositories.stations import StationRepository
from backend.app.repositories.tracks import TrackRepository
from backend.app.repositories.trains import TrainRepository
__all__ = [
"StationRepository",
"TrackRepository",
"TrainRepository",
]

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import sqlalchemy as sa
from typing import Generic, Iterable, Optional, Sequence, Type, TypeVar
from sqlalchemy.orm import Session
from backend.app.db.models import Base
ModelT = TypeVar("ModelT", bound=Base)
class BaseRepository(Generic[ModelT]):
"""Provide common CRUD helpers for SQLAlchemy models."""
model: Type[ModelT]
def __init__(self, session: Session) -> None:
self.session = session
def get(self, identifier: object) -> Optional[ModelT]:
return self.session.get(self.model, identifier)
def list(self) -> Sequence[ModelT]:
statement = sa.select(self.model)
return list(self.session.scalars(statement))
def add(self, instance: ModelT) -> ModelT:
self.session.add(instance)
return instance
def add_all(self, instances: Iterable[ModelT]) -> None:
self.session.add_all(instances)
def delete(self, instance: ModelT) -> None:
self.session.delete(instance)
def flush(self) -> None:
self.session.flush()

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
import sqlalchemy as sa
from sqlalchemy.orm import Session
from geoalchemy2.elements import WKTElement
from backend.app.db.models import Station
from backend.app.repositories.base import BaseRepository
from backend.app.models import StationCreate
class StationRepository(BaseRepository[Station]):
model = Station
def __init__(self, session: Session) -> None:
super().__init__(session)
def list_active(self) -> list[Station]:
statement = sa.select(self.model).where(self.model.is_active.is_(True))
return list(self.session.scalars(statement))
@staticmethod
def _point(latitude: float, longitude: float) -> WKTElement:
return WKTElement(f"POINT({longitude} {latitude})", srid=4326)
def create(self, data: StationCreate) -> Station:
station = Station(
name=data.name,
osm_id=data.osm_id,
code=data.code,
location=self._point(data.latitude, data.longitude),
elevation_m=data.elevation_m,
is_active=data.is_active,
)
self.session.add(station)
return station

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
import sqlalchemy as sa
from geoalchemy2.elements import WKTElement
from uuid import UUID
from sqlalchemy.orm import Session
from backend.app.db.models import Track
from backend.app.repositories.base import BaseRepository
from backend.app.models import TrackCreate
class TrackRepository(BaseRepository[Track]):
model = Track
def __init__(self, session: Session) -> None:
super().__init__(session)
def list_all(self) -> list[Track]:
statement = sa.select(self.model)
return list(self.session.scalars(statement))
@staticmethod
def _ensure_uuid(value: UUID | str) -> UUID:
if isinstance(value, UUID):
return value
return UUID(str(value))
@staticmethod
def _line_string(coordinates: list[tuple[float, float]]) -> WKTElement:
if len(coordinates) < 2:
raise ValueError("Track geometry requires at least two coordinate pairs")
parts = [f"{lon} {lat}" for lat, lon in coordinates]
return WKTElement(f"LINESTRING({', '.join(parts)})", srid=4326)
def create(self, data: TrackCreate) -> Track:
coordinates = list(data.coordinates)
geometry = self._line_string(coordinates)
track = Track(
name=data.name,
start_station_id=self._ensure_uuid(data.start_station_id),
end_station_id=self._ensure_uuid(data.end_station_id),
length_meters=data.length_meters,
max_speed_kph=data.max_speed_kph,
is_bidirectional=data.is_bidirectional,
status=data.status,
track_geometry=geometry,
)
self.session.add(track)
return track

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
import sqlalchemy as sa
from uuid import UUID
from sqlalchemy.orm import Session
from backend.app.db.models import Train
from backend.app.repositories.base import BaseRepository
from backend.app.models import TrainCreate
class TrainRepository(BaseRepository[Train]):
model = Train
def __init__(self, session: Session) -> None:
super().__init__(session)
def list_all(self) -> list[Train]:
statement = sa.select(self.model)
return list(self.session.scalars(statement))
@staticmethod
def _optional_uuid(value: UUID | str | None) -> UUID | None:
if value is None:
return None
if isinstance(value, UUID):
return value
return UUID(str(value))
def create(self, data: TrainCreate) -> Train:
train = Train(
designation=data.designation,
operator_id=self._optional_uuid(data.operator_id),
home_station_id=self._optional_uuid(data.home_station_id),
capacity=data.capacity,
max_speed_kph=data.max_speed_kph,
consist=data.consist,
)
self.session.add(train)
return train

View File

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
from typing import Dict, Optional
from backend.app.core.security import (
create_access_token,
get_password_hash,
verify_password,
)
from backend.app.models import AuthResponse, UserInDB, UserPublic
_DEMO_USER = UserInDB(
username="demo",
full_name="Demo Engineer",
hashed_password=get_password_hash("railgame123"),
)
_FAKE_USERS: Dict[str, UserInDB] = {_DEMO_USER.username: _DEMO_USER}
def get_user(username: str) -> Optional[UserInDB]:
return _FAKE_USERS.get(username)
def authenticate_user(username: str, password: str) -> Optional[UserInDB]:
user = get_user(username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def issue_token_for_user(user: UserInDB) -> AuthResponse:
return AuthResponse(
access_token=create_access_token(subject=user.username),
token_type="bearer",
user=to_public_user(user),
)
def to_public_user(user: UserInDB) -> UserPublic:
return UserPublic(username=user.username, full_name=user.full_name)
def register_user(username: str, password: str, full_name: Optional[str] = None) -> UserInDB:
normalized_username = username.strip()
if not normalized_username:
raise ValueError("Username must not be empty")
if normalized_username in _FAKE_USERS:
raise ValueError("Username already exists")
user = UserInDB(
username=normalized_username,
full_name=full_name.strip() if full_name else None,
hashed_password=get_password_hash(password),
)
_FAKE_USERS[normalized_username] = user
return user

View File

@@ -0,0 +1,157 @@
"""Domain services for railway network aggregation."""
from datetime import datetime, timezone
from decimal import Decimal
from typing import Iterable, cast
from geoalchemy2.elements import WKBElement, WKTElement
from geoalchemy2.shape import to_shape
try: # pragma: no cover - optional dependency guard
from shapely.geometry import Point # type: ignore
except ImportError: # pragma: no cover - allow running without shapely at import time
Point = None # type: ignore[assignment]
from sqlalchemy.orm import Session
from backend.app.models import StationModel, TrackModel, TrainModel
from backend.app.repositories import StationRepository, TrackRepository, TrainRepository
def _timestamp() -> datetime:
return datetime.now(timezone.utc)
def _fallback_snapshot() -> dict[str, list[dict[str, object]]]:
now = _timestamp()
stations = [
StationModel(
id="station-1",
name="Central",
latitude=52.520008,
longitude=13.404954,
created_at=now,
updated_at=now,
),
StationModel(
id="station-2",
name="Harbor",
latitude=53.551086,
longitude=9.993682,
created_at=now,
updated_at=now,
),
]
tracks = [
TrackModel(
id="track-1",
start_station_id="station-1",
end_station_id="station-2",
length_meters=289000.0,
max_speed_kph=230.0,
created_at=now,
updated_at=now,
)
]
trains = [
TrainModel(
id="train-1",
designation="ICE 123",
capacity=400,
max_speed_kph=300.0,
operating_track_ids=[track.id for track in tracks],
created_at=now,
updated_at=now,
)
]
return _serialize_snapshot(stations, tracks, trains)
def _serialize_snapshot(
stations: Iterable[StationModel],
tracks: Iterable[TrackModel],
trains: Iterable[TrainModel],
) -> dict[str, list[dict[str, object]]]:
return {
"stations": [station.model_dump(by_alias=True) for station in stations],
"tracks": [track.model_dump(by_alias=True) for track in tracks],
"trains": [train.model_dump(by_alias=True) for train in trains],
}
def _to_float(value: Decimal | float | int | None, default: float = 0.0) -> float:
if value is None:
return default
if isinstance(value, Decimal):
return float(value)
return float(value)
def get_network_snapshot(session: Session) -> dict[str, list[dict[str, object]]]:
station_repo = StationRepository(session)
track_repo = TrackRepository(session)
train_repo = TrainRepository(session)
stations_entities = station_repo.list_active()
tracks_entities = track_repo.list_all()
trains_entities = train_repo.list_all()
if not stations_entities and not tracks_entities and not trains_entities:
return _fallback_snapshot()
station_models: list[StationModel] = []
for station in stations_entities:
location = station.location
geom = (
to_shape(cast(WKBElement | WKTElement, location))
if location is not None and Point is not None
else None
)
if Point is not None and geom is not None and isinstance(geom, Point):
latitude = float(geom.y)
longitude = float(geom.x)
else:
latitude = 0.0
longitude = 0.0
station_models.append(
StationModel(
id=str(station.id),
name=station.name,
latitude=latitude,
longitude=longitude,
created_at=cast(datetime, station.created_at),
updated_at=cast(datetime, station.updated_at),
)
)
track_models: list[TrackModel] = []
for track in tracks_entities:
track_models.append(
TrackModel(
id=str(track.id),
start_station_id=str(track.start_station_id),
end_station_id=str(track.end_station_id),
length_meters=_to_float(track.length_meters),
max_speed_kph=_to_float(track.max_speed_kph),
created_at=cast(datetime, track.created_at),
updated_at=cast(datetime, track.updated_at),
)
)
train_models: list[TrainModel] = []
for train in trains_entities:
train_models.append(
TrainModel(
id=str(train.id),
designation=train.designation,
capacity=train.capacity,
max_speed_kph=_to_float(train.max_speed_kph),
operating_track_ids=[],
created_at=cast(datetime, train.created_at),
updated_at=cast(datetime, train.updated_at),
)
)
return _serialize_snapshot(station_models, track_models, train_models)

View File