feat: implement video job management with retry and delete functionality, enhance video generation status tracking
Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
@@ -0,0 +1,158 @@
|
||||
"""Background worker: processes queued/processing video generation jobs."""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import duckdb
|
||||
|
||||
from . import openrouter
|
||||
from .models import mark_timed_out_video_jobs
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Interval between worker ticks (seconds)
|
||||
WORKER_INTERVAL = 15
|
||||
# Jobs to process per tick (prevents unbounded bursts)
|
||||
BATCH_SIZE = 5
|
||||
|
||||
|
||||
async def process_queued_jobs(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> int:
|
||||
"""Submit queued jobs to OpenRouter and transition them to 'processing'."""
|
||||
rows = conn.execute(
|
||||
"""SELECT id, generation_type, request_params
|
||||
FROM generated_videos
|
||||
WHERE status = 'queued' AND request_params IS NOT NULL
|
||||
ORDER BY created_at ASC
|
||||
LIMIT ?""",
|
||||
[BATCH_SIZE],
|
||||
).fetchall()
|
||||
|
||||
processed = 0
|
||||
for row in rows:
|
||||
db_id, generation_type, raw_params = str(row[0]), row[1], row[2]
|
||||
try:
|
||||
params = json.loads(raw_params)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.error("Bad request_params for video job %s", db_id)
|
||||
continue
|
||||
|
||||
try:
|
||||
if generation_type == "image_to_video":
|
||||
result = await openrouter.generate_video_from_image(
|
||||
model=params["model"],
|
||||
image_url=params.get("image_url", ""),
|
||||
prompt=params.get("prompt", ""),
|
||||
duration_seconds=params.get("duration_seconds"),
|
||||
aspect_ratio=params.get("aspect_ratio", "16:9"),
|
||||
resolution=params.get("resolution"),
|
||||
)
|
||||
else:
|
||||
result = await openrouter.generate_video(
|
||||
model=params["model"],
|
||||
prompt=params.get("prompt", ""),
|
||||
duration_seconds=params.get("duration_seconds"),
|
||||
aspect_ratio=params.get("aspect_ratio", "16:9"),
|
||||
resolution=params.get("resolution"),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("OpenRouter call failed for job %s: %s", db_id, exc)
|
||||
now = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
async with lock:
|
||||
conn.execute(
|
||||
"UPDATE generated_videos SET status = 'failed', updated_at = ? WHERE id = ?",
|
||||
[now, db_id],
|
||||
)
|
||||
continue
|
||||
|
||||
job_id = result.get("id", "")
|
||||
polling_url = result.get("polling_url")
|
||||
new_status = result.get("status", "processing")
|
||||
# Normalise terminal statuses returned immediately (rare but possible)
|
||||
if new_status not in ("queued", "processing", "completed", "failed", "cancelled"):
|
||||
new_status = "processing"
|
||||
|
||||
urls = result.get("unsigned_urls") or result.get("video_urls")
|
||||
video_url = (urls or [None])[0]
|
||||
now = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
|
||||
async with lock:
|
||||
conn.execute(
|
||||
"""UPDATE generated_videos
|
||||
SET job_id = ?, polling_url = ?, status = ?, video_url = ?, updated_at = ?
|
||||
WHERE id = ?""",
|
||||
[job_id, polling_url, new_status, video_url, now, db_id],
|
||||
)
|
||||
processed += 1
|
||||
logger.info("Video job %s → %s (provider id: %s)",
|
||||
db_id, new_status, job_id)
|
||||
|
||||
return processed
|
||||
|
||||
|
||||
async def process_processing_jobs(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> int:
|
||||
"""Poll in-progress jobs and update to 'completed' or 'failed'."""
|
||||
rows = conn.execute(
|
||||
"""SELECT id, polling_url
|
||||
FROM generated_videos
|
||||
WHERE status = 'processing' AND polling_url IS NOT NULL
|
||||
ORDER BY updated_at ASC
|
||||
LIMIT ?""",
|
||||
[BATCH_SIZE],
|
||||
).fetchall()
|
||||
|
||||
updated = 0
|
||||
for row in rows:
|
||||
db_id, polling_url = str(row[0]), row[1]
|
||||
try:
|
||||
result = await openrouter.poll_video_status(polling_url)
|
||||
except Exception as exc:
|
||||
logger.warning("Polling failed for job %s: %s", db_id, exc)
|
||||
continue
|
||||
|
||||
job_status = result.get("status", "processing")
|
||||
if job_status not in ("completed", "failed"):
|
||||
continue # still in-progress — check again next tick
|
||||
|
||||
urls = result.get("unsigned_urls") or result.get("video_urls")
|
||||
video_url = (urls or [None])[0]
|
||||
now = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
|
||||
async with lock:
|
||||
conn.execute(
|
||||
"""UPDATE generated_videos
|
||||
SET status = ?, video_url = ?, updated_at = ?
|
||||
WHERE id = ?""",
|
||||
[job_status, video_url, now, db_id],
|
||||
)
|
||||
updated += 1
|
||||
logger.info("Video job %s → %s", db_id, job_status)
|
||||
|
||||
return updated
|
||||
|
||||
|
||||
async def worker_tick(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> None:
|
||||
"""Single worker tick: submit queued, poll processing, expire timed-out."""
|
||||
queued = await process_queued_jobs(conn, lock)
|
||||
polled = await process_processing_jobs(conn, lock)
|
||||
async with lock:
|
||||
timed_out = mark_timed_out_video_jobs(conn, timeout_minutes=120)
|
||||
if queued or polled or timed_out:
|
||||
logger.info(
|
||||
"Worker tick: submitted=%d polled=%d timed_out=%d",
|
||||
queued, polled, timed_out,
|
||||
)
|
||||
|
||||
|
||||
async def run_worker(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> None:
|
||||
"""Infinite loop: run a worker tick every WORKER_INTERVAL seconds."""
|
||||
logger.info("Video worker started (interval=%ds)", WORKER_INTERVAL)
|
||||
while True:
|
||||
try:
|
||||
await worker_tick(conn, lock)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Video worker stopped.")
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.exception("Unexpected error in video worker: %s", exc)
|
||||
await asyncio.sleep(WORKER_INTERVAL)
|
||||
Reference in New Issue
Block a user