From 37edef716ab34d6843c4972825f8c7463bdd5352 Mon Sep 17 00:00:00 2001 From: zwitschi Date: Wed, 29 Apr 2026 18:27:59 +0200 Subject: [PATCH] feat: implement video job management with retry and delete functionality, enhance video generation status tracking Co-authored-by: Copilot --- backend/app/db.py | 7 + backend/app/main.py | 10 +- backend/app/routers/admin.py | 40 ++++ backend/app/routers/generate.py | 103 +++-------- backend/app/services/video_worker.py | 158 ++++++++++++++++ frontend/app/main.py | 9 + frontend/app/static/app.js | 34 ++-- frontend/app/templates/admin.html | 203 +++++++++++++++++++++ frontend/app/templates/generate_video.html | 6 +- frontend/app/templates/video_detail.html | 4 +- 10 files changed, 479 insertions(+), 95 deletions(-) create mode 100644 backend/app/services/video_worker.py diff --git a/backend/app/db.py b/backend/app/db.py index 51bceac..b83fec9 100644 --- a/backend/app/db.py +++ b/backend/app/db.py @@ -114,6 +114,13 @@ def _run_migrations(conn: duckdb.DuckDBPyConnection) -> None: conn.execute(""" ALTER TABLE models_cache ADD COLUMN IF NOT EXISTS output_modalities VARCHAR """) + # Migration: add video job request params + generation type + conn.execute(""" + ALTER TABLE generated_videos ADD COLUMN IF NOT EXISTS request_params VARCHAR + """) + conn.execute(""" + ALTER TABLE generated_videos ADD COLUMN IF NOT EXISTS generation_type VARCHAR DEFAULT 'text_to_video' + """) _seed_admin(conn) diff --git a/backend/app/main.py b/backend/app/main.py index 24457c8..3f5997e 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -5,7 +5,9 @@ from .routers import ai from .routers import generate from .routers import images from .routers import models -from .db import close_db, init_db +from .db import close_db, get_conn, get_write_lock, init_db +from .services.video_worker import run_worker +import asyncio import os from contextlib import asynccontextmanager @@ -19,7 +21,13 @@ load_dotenv() @asynccontextmanager async def lifespan(app: FastAPI): init_db() + worker_task = asyncio.create_task(run_worker(get_conn(), get_write_lock())) yield + worker_task.cancel() + try: + await worker_task + except asyncio.CancelledError: + pass close_db() diff --git a/backend/app/routers/admin.py b/backend/app/routers/admin.py index 983a7bf..c5a7525 100644 --- a/backend/app/routers/admin.py +++ b/backend/app/routers/admin.py @@ -185,3 +185,43 @@ async def admin_mark_timed_out(_: dict = Depends(require_admin)) -> dict[str, in conn = get_conn() count = mark_timed_out_video_jobs(conn, timeout_minutes=120) return {"timed_out": count} + + +@router.post("/videos/{job_id}/retry", status_code=200) +async def admin_retry_video_job(job_id: str, _: dict = Depends(require_admin)) -> dict[str, str]: + """Reset a failed or cancelled video job back to 'queued' for reprocessing.""" + conn = get_conn() + lock = get_write_lock() + now = datetime.now(timezone.utc) + async with lock: + row = conn.execute( + "SELECT status FROM generated_videos WHERE id = ?", [job_id] + ).fetchone() + if row is None: + from fastapi import HTTPException + raise HTTPException(status_code=404, detail="Job not found") + if row[0] not in ("failed", "cancelled"): + from fastapi import HTTPException + raise HTTPException( + status_code=400, detail=f"Cannot retry job with status '{row[0]}'") + conn.execute( + "UPDATE generated_videos SET status = 'queued', updated_at = ? WHERE id = ?", + [now, job_id], + ) + return {"status": "ok", "job_id": job_id} + + +@router.delete("/videos/{job_id}", status_code=200) +async def admin_delete_video_job(job_id: str, _: dict = Depends(require_admin)) -> dict[str, str]: + """Permanently delete a video job record.""" + conn = get_conn() + lock = get_write_lock() + async with lock: + row = conn.execute( + "SELECT id FROM generated_videos WHERE id = ?", [job_id] + ).fetchone() + if row is None: + from fastapi import HTTPException + raise HTTPException(status_code=404, detail="Job not found") + conn.execute("DELETE FROM generated_videos WHERE id = ?", [job_id]) + return {"status": "ok", "job_id": job_id} diff --git a/backend/app/routers/generate.py b/backend/app/routers/generate.py index 687af4f..59af0b0 100644 --- a/backend/app/routers/generate.py +++ b/backend/app/routers/generate.py @@ -1,4 +1,5 @@ """Generate router: text, image, video, and image-to-video generation.""" +import json from datetime import datetime, timezone import httpx @@ -209,54 +210,32 @@ async def generate_video( body: VideoRequest, current_user: dict = Depends(get_current_user), ) -> VideoResponse: - """Generate a video from a text prompt.""" - try: - result = await openrouter.generate_video( - model=body.model, - prompt=body.prompt, - duration_seconds=body.duration_seconds, - aspect_ratio=body.aspect_ratio, - resolution=body.resolution, - ) - except httpx.HTTPStatusError as exc: - detail = ( - f"OpenRouter API error: {exc.response.status_code} - {exc.response.text}" - ) - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, detail=detail) - except Exception as exc: - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, detail=f"OpenRouter error: {exc}" - ) - + """Queue a text-to-video generation job for background processing.""" user_id = current_user.get("id") or current_user.get("sub") - job_id = result.get("id", "") - polling_url = result.get("polling_url") - job_status = result.get("status", "pending") now = datetime.now(timezone.utc).replace(tzinfo=None) + request_params = json.dumps({ + "model": body.model, + "prompt": body.prompt, + "duration_seconds": body.duration_seconds, + "aspect_ratio": body.aspect_ratio, + "resolution": body.resolution, + }) db_id = None async with get_write_lock(): conn = get_conn() row = conn.execute( - """INSERT INTO generated_videos (user_id, job_id, model_id, prompt, polling_url, status, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id""", - [user_id, job_id, body.model, body.prompt, - polling_url, job_status, now, now], + """INSERT INTO generated_videos + (user_id, job_id, model_id, prompt, status, request_params, generation_type, created_at, updated_at) + VALUES (?, ?, ?, ?, 'queued', ?, 'text_to_video', ?, ?) RETURNING id""", + [user_id, "", body.model, body.prompt, request_params, now, now], ).fetchone() if row: db_id = str(row[0]) - - urls = result.get("unsigned_urls") or result.get("video_urls") return VideoResponse( - id=job_id, + id="", db_id=db_id, model=body.model, - status=job_status, - polling_url=polling_url, - video_urls=urls, - video_url=(urls or [None])[0], - error=result.get("error"), - metadata=result.get("metadata"), + status="queued", ) @@ -265,55 +244,33 @@ async def generate_video_from_image( body: VideoFromImageRequest, current_user: dict = Depends(get_current_user), ) -> VideoResponse: - """Generate a video from an image and a text prompt.""" - try: - result = await openrouter.generate_video_from_image( - model=body.model, - image_url=body.image_url, - prompt=body.prompt, - duration_seconds=body.duration_seconds, - aspect_ratio=body.aspect_ratio, - resolution=body.resolution, - ) - except httpx.HTTPStatusError as exc: - detail = ( - f"OpenRouter API error: {exc.response.status_code} - {exc.response.text}" - ) - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, detail=detail) - except Exception as exc: - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, detail=f"OpenRouter error: {exc}" - ) - + """Queue an image-to-video generation job for background processing.""" user_id = current_user.get("id") or current_user.get("sub") - job_id = result.get("id", "") - polling_url = result.get("polling_url") - job_status = result.get("status", "pending") now = datetime.now(timezone.utc).replace(tzinfo=None) + request_params = json.dumps({ + "model": body.model, + "image_url": body.image_url, + "prompt": body.prompt, + "duration_seconds": body.duration_seconds, + "aspect_ratio": body.aspect_ratio, + "resolution": body.resolution, + }) db_id = None async with get_write_lock(): conn = get_conn() row = conn.execute( - """INSERT INTO generated_videos (user_id, job_id, model_id, prompt, polling_url, status, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id""", - [user_id, job_id, body.model, body.prompt, - polling_url, job_status, now, now], + """INSERT INTO generated_videos + (user_id, job_id, model_id, prompt, status, request_params, generation_type, created_at, updated_at) + VALUES (?, ?, ?, ?, 'queued', ?, 'image_to_video', ?, ?) RETURNING id""", + [user_id, "", body.model, body.prompt, request_params, now, now], ).fetchone() if row: db_id = str(row[0]) - - urls = result.get("unsigned_urls") or result.get("video_urls") return VideoResponse( - id=job_id, + id="", db_id=db_id, model=body.model, - status=job_status, - polling_url=polling_url, - video_urls=urls, - video_url=(urls or [None])[0], - error=result.get("error"), - metadata=result.get("metadata"), + status="queued", ) diff --git a/backend/app/services/video_worker.py b/backend/app/services/video_worker.py new file mode 100644 index 0000000..5435d64 --- /dev/null +++ b/backend/app/services/video_worker.py @@ -0,0 +1,158 @@ +"""Background worker: processes queued/processing video generation jobs.""" +import asyncio +import json +import logging +from datetime import datetime, timezone + +import duckdb + +from . import openrouter +from .models import mark_timed_out_video_jobs + +logger = logging.getLogger(__name__) + +# Interval between worker ticks (seconds) +WORKER_INTERVAL = 15 +# Jobs to process per tick (prevents unbounded bursts) +BATCH_SIZE = 5 + + +async def process_queued_jobs(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> int: + """Submit queued jobs to OpenRouter and transition them to 'processing'.""" + rows = conn.execute( + """SELECT id, generation_type, request_params + FROM generated_videos + WHERE status = 'queued' AND request_params IS NOT NULL + ORDER BY created_at ASC + LIMIT ?""", + [BATCH_SIZE], + ).fetchall() + + processed = 0 + for row in rows: + db_id, generation_type, raw_params = str(row[0]), row[1], row[2] + try: + params = json.loads(raw_params) + except (json.JSONDecodeError, TypeError): + logger.error("Bad request_params for video job %s", db_id) + continue + + try: + if generation_type == "image_to_video": + result = await openrouter.generate_video_from_image( + model=params["model"], + image_url=params.get("image_url", ""), + prompt=params.get("prompt", ""), + duration_seconds=params.get("duration_seconds"), + aspect_ratio=params.get("aspect_ratio", "16:9"), + resolution=params.get("resolution"), + ) + else: + result = await openrouter.generate_video( + model=params["model"], + prompt=params.get("prompt", ""), + duration_seconds=params.get("duration_seconds"), + aspect_ratio=params.get("aspect_ratio", "16:9"), + resolution=params.get("resolution"), + ) + except Exception as exc: + logger.warning("OpenRouter call failed for job %s: %s", db_id, exc) + now = datetime.now(timezone.utc).replace(tzinfo=None) + async with lock: + conn.execute( + "UPDATE generated_videos SET status = 'failed', updated_at = ? WHERE id = ?", + [now, db_id], + ) + continue + + job_id = result.get("id", "") + polling_url = result.get("polling_url") + new_status = result.get("status", "processing") + # Normalise terminal statuses returned immediately (rare but possible) + if new_status not in ("queued", "processing", "completed", "failed", "cancelled"): + new_status = "processing" + + urls = result.get("unsigned_urls") or result.get("video_urls") + video_url = (urls or [None])[0] + now = datetime.now(timezone.utc).replace(tzinfo=None) + + async with lock: + conn.execute( + """UPDATE generated_videos + SET job_id = ?, polling_url = ?, status = ?, video_url = ?, updated_at = ? + WHERE id = ?""", + [job_id, polling_url, new_status, video_url, now, db_id], + ) + processed += 1 + logger.info("Video job %s → %s (provider id: %s)", + db_id, new_status, job_id) + + return processed + + +async def process_processing_jobs(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> int: + """Poll in-progress jobs and update to 'completed' or 'failed'.""" + rows = conn.execute( + """SELECT id, polling_url + FROM generated_videos + WHERE status = 'processing' AND polling_url IS NOT NULL + ORDER BY updated_at ASC + LIMIT ?""", + [BATCH_SIZE], + ).fetchall() + + updated = 0 + for row in rows: + db_id, polling_url = str(row[0]), row[1] + try: + result = await openrouter.poll_video_status(polling_url) + except Exception as exc: + logger.warning("Polling failed for job %s: %s", db_id, exc) + continue + + job_status = result.get("status", "processing") + if job_status not in ("completed", "failed"): + continue # still in-progress — check again next tick + + urls = result.get("unsigned_urls") or result.get("video_urls") + video_url = (urls or [None])[0] + now = datetime.now(timezone.utc).replace(tzinfo=None) + + async with lock: + conn.execute( + """UPDATE generated_videos + SET status = ?, video_url = ?, updated_at = ? + WHERE id = ?""", + [job_status, video_url, now, db_id], + ) + updated += 1 + logger.info("Video job %s → %s", db_id, job_status) + + return updated + + +async def worker_tick(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> None: + """Single worker tick: submit queued, poll processing, expire timed-out.""" + queued = await process_queued_jobs(conn, lock) + polled = await process_processing_jobs(conn, lock) + async with lock: + timed_out = mark_timed_out_video_jobs(conn, timeout_minutes=120) + if queued or polled or timed_out: + logger.info( + "Worker tick: submitted=%d polled=%d timed_out=%d", + queued, polled, timed_out, + ) + + +async def run_worker(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> None: + """Infinite loop: run a worker tick every WORKER_INTERVAL seconds.""" + logger.info("Video worker started (interval=%ds)", WORKER_INTERVAL) + while True: + try: + await worker_tick(conn, lock) + except asyncio.CancelledError: + logger.info("Video worker stopped.") + return + except Exception as exc: + logger.exception("Unexpected error in video worker: %s", exc) + await asyncio.sleep(WORKER_INTERVAL) diff --git a/frontend/app/main.py b/frontend/app/main.py index 162f42f..3251e72 100644 --- a/frontend/app/main.py +++ b/frontend/app/main.py @@ -469,6 +469,15 @@ def generate_video_status(): return jsonify(resp.json()), resp.status_code +@app.get("/generate/video//status") +@login_required +def generate_video_db_status(video_id: str): + """Return current DB status for a video job (polled by frontend JS).""" + resp = _api( + "GET", f"/generate/videos/{video_id}", token=session["access_token"]) + return jsonify(resp.json()), resp.status_code + + # ── Admin ───────────────────────────────────────────────────────────────── @app.get("/admin") diff --git a/frontend/app/static/app.js b/frontend/app/static/app.js index 97cffdd..725918d 100644 --- a/frontend/app/static/app.js +++ b/frontend/app/static/app.js @@ -63,15 +63,14 @@ document.addEventListener("DOMContentLoaded", () => { // ── Video status polling ─────────────────────────────── const pollDiv = document.getElementById("video-poll-status"); if (pollDiv) { - const pollingUrl = pollDiv.dataset.pollingUrl; + const videoId = pollDiv.dataset.videoId; const statusText = document.getElementById("poll-status-text"); const videoContainer = document.getElementById("poll-video-container"); const interval = setInterval(async () => { try { const resp = await fetch( - "/generate/video/status?polling_url=" + - encodeURIComponent(pollingUrl), + "/generate/video/" + encodeURIComponent(videoId) + "/status", ); if (!resp.ok) return; const data = await resp.json(); @@ -82,25 +81,28 @@ document.addEventListener("DOMContentLoaded", () => { if (data.status === "completed") { clearInterval(interval); - if (data.video_url && videoContainer) { - const vid = document.createElement("video"); - vid.src = data.video_url; - vid.controls = true; - vid.className = "generated-video"; - videoContainer.appendChild(vid); - const msg = pollDiv.querySelector("p"); - if (msg) msg.textContent = "Video ready!"; + if (data.video_url) { + if (videoContainer) { + const vid = document.createElement("video"); + vid.src = data.video_url; + vid.controls = true; + vid.className = "generated-video"; + videoContainer.appendChild(vid); + const msg = pollDiv.querySelector("p"); + if (msg) msg.textContent = "Video ready!"; + } else { + // video_detail page: reload to show the video element + window.location.reload(); + } } - } else if (data.status === "failed") { + } else if (data.status === "failed" || data.status === "cancelled") { clearInterval(interval); pollDiv.innerHTML = - '
Generation failed: ' + - (data.error || "Unknown error") + - "
"; + '
Generation failed or was cancelled.
'; } } catch (e) { console.error("Video polling error:", e); } - }, 12016); + }, 5000); } }); diff --git a/frontend/app/templates/admin.html b/frontend/app/templates/admin.html index b9d3a6c..638ed69 100644 --- a/frontend/app/templates/admin.html +++ b/frontend/app/templates/admin.html @@ -76,5 +76,208 @@ + + +

Video Jobs

+ +
+ + + + + + +
+ +
+ + + + + + + + + + + + + + + + + +
UserStatusModelPromptCreatedUpdatedActions
Loading…
+
+ + {% endblock %} diff --git a/frontend/app/templates/generate_video.html b/frontend/app/templates/generate_video.html index 2247c82..2a8a530 100644 --- a/frontend/app/templates/generate_video.html +++ b/frontend/app/templates/generate_video.html @@ -155,9 +155,9 @@ AI{% endblock %} {% block content %} {% endif %} {% if result %}

Video job

-

Job ID: {{ result.id }}

- {% if result.status in ('queued', 'processing') and result.polling_url %} -
+

Job ID: {{ result.db_id or result.id }}

+ {% if result.status in ('queued', 'processing') and result.db_id %} +

Status: {{ result.status }} {% if video.status == 'completed' and video.video_url %} - {% elif video.status in ('queued', 'processing') and video.polling_url %} + {% elif video.status in ('queued', 'processing') %}

Status: {{ video.status }}