feat: Add OSM refresh script and update loading scripts for improved database handling
This commit is contained in:
207
backend/scripts/osm_refresh.py
Normal file
207
backend/scripts/osm_refresh.py
Normal file
@@ -0,0 +1,207 @@
|
||||
from __future__ import annotations
|
||||
|
||||
"""Orchestrate the OSM station/track import and load pipeline."""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Callable, Sequence
|
||||
|
||||
from backend.app.core.osm_config import DEFAULT_REGIONS
|
||||
from backend.scripts import (
|
||||
stations_import,
|
||||
stations_load,
|
||||
tracks_import,
|
||||
tracks_load,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Stage:
|
||||
label: str
|
||||
runner: Callable[[list[str] | None], int]
|
||||
args: list[str]
|
||||
input_path: Path | None = None
|
||||
output_path: Path | None = None
|
||||
|
||||
|
||||
def build_argument_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Run the station and track import/load workflow in sequence.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--region",
|
||||
choices=[region.name for region in DEFAULT_REGIONS] + ["all"],
|
||||
default="all",
|
||||
help="Region selector forwarded to the import scripts (default: all).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
type=Path,
|
||||
default=Path("data"),
|
||||
help="Directory where intermediate JSON payloads are stored (default: data/).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--stations-json",
|
||||
type=Path,
|
||||
help="Existing station JSON file to load; defaults to <output-dir>/osm_stations.json.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tracks-json",
|
||||
type=Path,
|
||||
help="Existing track JSON file to load; defaults to <output-dir>/osm_tracks.json.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-station-import",
|
||||
action="store_true",
|
||||
help="Skip the station import step (expects --stations-json to point to data).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-station-load",
|
||||
action="store_true",
|
||||
help="Skip loading stations into PostGIS.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-track-import",
|
||||
action="store_true",
|
||||
help="Skip the track import step (expects --tracks-json to point to data).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-track-load",
|
||||
action="store_true",
|
||||
help="Skip loading tracks into PostGIS.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Print the planned stages without invoking Overpass or mutating the database.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--commit",
|
||||
dest="commit",
|
||||
action="store_true",
|
||||
default=True,
|
||||
help="Commit database changes produced by the load steps (default).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-commit",
|
||||
dest="commit",
|
||||
action="store_false",
|
||||
help="Rollback database changes after load steps (dry run).",
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
def _build_stage_plan(args: argparse.Namespace) -> tuple[list[Stage], Path, Path]:
|
||||
station_json = args.stations_json or args.output_dir / "osm_stations.json"
|
||||
track_json = args.tracks_json or args.output_dir / "osm_tracks.json"
|
||||
|
||||
stages: list[Stage] = []
|
||||
|
||||
if not args.skip_station_import:
|
||||
stages.append(
|
||||
Stage(
|
||||
label="Import stations",
|
||||
runner=stations_import.main,
|
||||
args=["--output", str(station_json), "--region", args.region],
|
||||
output_path=station_json,
|
||||
)
|
||||
)
|
||||
|
||||
if not args.skip_station_load:
|
||||
load_args = [str(station_json)]
|
||||
if not args.commit:
|
||||
load_args.append("--no-commit")
|
||||
stages.append(
|
||||
Stage(
|
||||
label="Load stations",
|
||||
runner=stations_load.main,
|
||||
args=load_args,
|
||||
input_path=station_json,
|
||||
)
|
||||
)
|
||||
|
||||
if not args.skip_track_import:
|
||||
stages.append(
|
||||
Stage(
|
||||
label="Import tracks",
|
||||
runner=tracks_import.main,
|
||||
args=["--output", str(track_json), "--region", args.region],
|
||||
output_path=track_json,
|
||||
)
|
||||
)
|
||||
|
||||
if not args.skip_track_load:
|
||||
load_args = [str(track_json)]
|
||||
if not args.commit:
|
||||
load_args.append("--no-commit")
|
||||
stages.append(
|
||||
Stage(
|
||||
label="Load tracks",
|
||||
runner=tracks_load.main,
|
||||
args=load_args,
|
||||
input_path=track_json,
|
||||
)
|
||||
)
|
||||
|
||||
return stages, station_json, track_json
|
||||
|
||||
|
||||
def _describe_plan(stages: Sequence[Stage]) -> None:
|
||||
if not stages:
|
||||
print("No stages selected; nothing to do.")
|
||||
return
|
||||
|
||||
print("Selected stages:")
|
||||
for stage in stages:
|
||||
detail = " ".join(stage.args) if stage.args else "<no args>"
|
||||
print(f" - {stage.label}: {detail}")
|
||||
|
||||
|
||||
def _execute_stage(stage: Stage) -> None:
|
||||
print(f"\n>>> {stage.label}")
|
||||
|
||||
if stage.output_path is not None:
|
||||
stage.output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if stage.input_path is not None and not stage.input_path.exists():
|
||||
raise RuntimeError(
|
||||
f"Expected input file {stage.input_path} for {stage.label}; run the import step first or provide an existing file."
|
||||
)
|
||||
|
||||
try:
|
||||
exit_code = stage.runner(stage.args)
|
||||
except SystemExit as exc: # argparse.error exits via SystemExit
|
||||
exit_code = int(exc.code or 0)
|
||||
|
||||
if exit_code:
|
||||
raise RuntimeError(f"{stage.label} failed with exit code {exit_code}.")
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = build_argument_parser()
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
stages, station_json, track_json = _build_stage_plan(args)
|
||||
|
||||
if args.dry_run:
|
||||
print("Dry run: the following stages would run in order.")
|
||||
_describe_plan(stages)
|
||||
return 0
|
||||
|
||||
# Ensure parent directories exist when we plan to write files.
|
||||
if not args.skip_station_import:
|
||||
station_json.parent.mkdir(parents=True, exist_ok=True)
|
||||
if not args.skip_track_import:
|
||||
track_json.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for stage in stages:
|
||||
_execute_stage(stage)
|
||||
|
||||
print("\nOSM refresh pipeline completed successfully.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user