"""Background worker: processes queued/processing video generation jobs.""" import asyncio import json import logging from datetime import datetime, timezone import duckdb from . import openrouter from .models import mark_timed_out_video_jobs logger = logging.getLogger(__name__) # Interval between worker ticks (seconds) WORKER_INTERVAL = 15 # Jobs to process per tick (prevents unbounded bursts) BATCH_SIZE = 5 async def process_queued_jobs(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> int: """Submit queued jobs to OpenRouter and transition them to 'processing'.""" rows = conn.execute( """SELECT id, generation_type, request_params FROM generated_videos WHERE status = 'queued' AND request_params IS NOT NULL ORDER BY created_at ASC LIMIT ?""", [BATCH_SIZE], ).fetchall() processed = 0 for row in rows: db_id, generation_type, raw_params = str(row[0]), row[1], row[2] try: params = json.loads(raw_params) except (json.JSONDecodeError, TypeError): logger.error("Bad request_params for video job %s", db_id) continue try: if generation_type == "image_to_video": result = await openrouter.generate_video_from_image( model=params["model"], image_url=params.get("image_url", ""), prompt=params.get("prompt", ""), duration_seconds=params.get("duration_seconds"), aspect_ratio=params.get("aspect_ratio", "16:9"), resolution=params.get("resolution"), ) else: result = await openrouter.generate_video( model=params["model"], prompt=params.get("prompt", ""), duration_seconds=params.get("duration_seconds"), aspect_ratio=params.get("aspect_ratio", "16:9"), resolution=params.get("resolution"), ) except Exception as exc: logger.warning("OpenRouter call failed for job %s: %s", db_id, exc) now = datetime.now(timezone.utc).replace(tzinfo=None) async with lock: conn.execute( "UPDATE generated_videos SET status = 'failed', error = ?, updated_at = ? WHERE id = ?", [str(exc), now, db_id], ) continue job_id = result.get("id", "") polling_url = result.get("polling_url") new_status = result.get("status", "processing") # Normalise terminal statuses returned immediately (rare but possible) if new_status not in ("queued", "processing", "completed", "failed", "cancelled"): new_status = "processing" urls = result.get("unsigned_urls") or result.get("video_urls") video_url = (urls or [None])[0] now = datetime.now(timezone.utc).replace(tzinfo=None) async with lock: conn.execute( """UPDATE generated_videos SET job_id = ?, polling_url = ?, status = ?, video_url = ?, updated_at = ? WHERE id = ?""", [job_id, polling_url, new_status, video_url, now, db_id], ) processed += 1 logger.info("Video job %s → %s (provider id: %s)", db_id, new_status, job_id) return processed async def process_processing_jobs(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> int: """Poll in-progress jobs and update to 'completed' or 'failed'.""" rows = conn.execute( """SELECT id, polling_url FROM generated_videos WHERE status = 'processing' AND polling_url IS NOT NULL ORDER BY updated_at ASC LIMIT ?""", [BATCH_SIZE], ).fetchall() updated = 0 for row in rows: db_id, polling_url = str(row[0]), row[1] try: result = await openrouter.poll_video_status(polling_url) except Exception as exc: logger.warning("Polling failed for job %s: %s", db_id, exc) continue job_status = result.get("status", "processing") if job_status not in ("completed", "failed"): continue # still in-progress — check again next tick urls = result.get("unsigned_urls") or result.get("video_urls") video_url = (urls or [None])[0] error_msg = result.get("error") now = datetime.now(timezone.utc).replace(tzinfo=None) async with lock: conn.execute( """UPDATE generated_videos SET status = ?, video_url = ?, error = ?, updated_at = ? WHERE id = ?""", [job_status, video_url, error_msg, now, db_id], ) updated += 1 logger.info("Video job %s → %s", db_id, job_status) return updated async def worker_tick(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> None: """Single worker tick: submit queued, poll processing, expire timed-out.""" queued = await process_queued_jobs(conn, lock) polled = await process_processing_jobs(conn, lock) async with lock: timed_out = mark_timed_out_video_jobs(conn, timeout_minutes=120) if queued or polled or timed_out: logger.info( "Worker tick: submitted=%d polled=%d timed_out=%d", queued, polled, timed_out, ) async def run_worker(conn: duckdb.DuckDBPyConnection, lock: asyncio.Lock) -> None: """Infinite loop: run a worker tick every WORKER_INTERVAL seconds.""" logger.info("Video worker started (interval=%ds)", WORKER_INTERVAL) while True: try: await worker_tick(conn, lock) except asyncio.CancelledError: logger.info("Video worker stopped.") return except Exception as exc: logger.exception("Unexpected error in video worker: %s", exc) await asyncio.sleep(WORKER_INTERVAL)