feat: add admin video jobs management endpoints and UI for listing, cancelling, and purging video jobs
Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
"""Admin router: operational endpoints for application management."""
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
@@ -7,6 +7,7 @@ from fastapi import APIRouter, Depends
|
||||
from ..db import get_conn, get_write_lock
|
||||
from ..dependencies import require_admin
|
||||
from ..services import models as models_service
|
||||
from ..services.models import mark_timed_out_video_jobs
|
||||
|
||||
router = APIRouter(prefix="/admin", tags=["admin"])
|
||||
|
||||
@@ -104,3 +105,83 @@ async def refresh_models(
|
||||
"total_models": status.get("model_count"),
|
||||
"last_updated": status.get("last_updated"),
|
||||
}
|
||||
|
||||
|
||||
@router.get("/videos")
|
||||
async def admin_list_video_jobs(_: dict = Depends(require_admin)) -> list[dict[str, Any]]:
|
||||
"""Return all video generation jobs across all users."""
|
||||
conn = get_conn()
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
v.id, v.job_id, v.user_id, u.email, v.model_id, v.prompt,
|
||||
v.status, v.video_url, v.created_at, v.updated_at
|
||||
FROM generated_videos v
|
||||
LEFT JOIN users u ON v.user_id = u.id
|
||||
ORDER BY v.created_at DESC
|
||||
"""
|
||||
).fetchall()
|
||||
return [
|
||||
{
|
||||
"id": str(row[0]),
|
||||
"job_id": row[1],
|
||||
"user_id": str(row[2]),
|
||||
"user_email": row[3],
|
||||
"model_id": row[4],
|
||||
"prompt": row[5],
|
||||
"status": row[6],
|
||||
"video_url": row[7],
|
||||
"created_at": row[8].isoformat() if row[8] else None,
|
||||
"updated_at": row[9].isoformat() if row[9] else None,
|
||||
}
|
||||
for row in rows
|
||||
]
|
||||
|
||||
|
||||
@router.post("/videos/{job_id}/cancel", status_code=200)
|
||||
async def admin_cancel_video_job(job_id: str, _: dict = Depends(require_admin)) -> dict[str, str]:
|
||||
"""Mark a video job as 'cancelled'. Does not stop the provider job."""
|
||||
conn = get_conn()
|
||||
lock = get_write_lock()
|
||||
now = datetime.now(timezone.utc)
|
||||
async with lock:
|
||||
conn.execute(
|
||||
"UPDATE generated_videos SET status = 'cancelled', updated_at = ? WHERE id = ?", [
|
||||
now, job_id]
|
||||
)
|
||||
return {"status": "ok", "job_id": job_id}
|
||||
|
||||
|
||||
@router.post("/videos/purge", status_code=200)
|
||||
async def admin_purge_video_jobs(_: dict = Depends(require_admin)) -> dict[str, Any]:
|
||||
"""Delete all completed, failed, or cancelled jobs older than 30 days."""
|
||||
conn = get_conn()
|
||||
lock = get_write_lock()
|
||||
thirty_days_ago = datetime.now(
|
||||
timezone.utc) - timedelta(days=30)
|
||||
|
||||
sql_count = "SELECT COUNT(*) FROM generated_videos"
|
||||
sql_delete = """
|
||||
DELETE FROM generated_videos
|
||||
WHERE status IN ('completed', 'failed', 'cancelled')
|
||||
AND updated_at < ?
|
||||
"""
|
||||
|
||||
async with lock:
|
||||
before_row = conn.execute(sql_count).fetchone()
|
||||
before = before_row[0] if before_row else 0
|
||||
|
||||
conn.execute(sql_delete, [thirty_days_ago])
|
||||
|
||||
after_row = conn.execute(sql_count).fetchone()
|
||||
after = after_row[0] if after_row else 0
|
||||
|
||||
return {"deleted": before - after, "remaining": after}
|
||||
|
||||
|
||||
@router.post("/videos/timed-out", status_code=200)
|
||||
async def admin_mark_timed_out(_: dict = Depends(require_admin)) -> dict[str, int]:
|
||||
"""Mark video jobs that have been in 'queued' or 'processing' status for too long as 'failed'."""
|
||||
conn = get_conn()
|
||||
count = mark_timed_out_video_jobs(conn, timeout_minutes=120)
|
||||
return {"timed_out": count}
|
||||
|
||||
@@ -207,3 +207,40 @@ def get_cache_status(conn: duckdb.DuckDBPyConnection) -> dict[str, Any]:
|
||||
).fetchone()
|
||||
last_updated, model_count = (row[0], row[1]) if row else (None, 0)
|
||||
return {"last_updated": last_updated, "model_count": model_count}
|
||||
|
||||
|
||||
def mark_timed_out_video_jobs(conn: duckdb.DuckDBPyConnection, timeout_minutes: int = 120) -> int:
|
||||
"""Mark video jobs that have been in 'queued' or 'processing' status for too long as 'failed'.
|
||||
|
||||
Returns the number of jobs marked as timed out.
|
||||
"""
|
||||
timeout_threshold = datetime.now(
|
||||
timezone.utc) - timedelta(minutes=timeout_minutes)
|
||||
|
||||
# Find timed out jobs
|
||||
timed_out_rows = conn.execute(
|
||||
"""
|
||||
SELECT id FROM generated_videos
|
||||
WHERE status IN ('queued', 'processing')
|
||||
AND updated_at < ?
|
||||
""",
|
||||
[timeout_threshold]
|
||||
).fetchall()
|
||||
|
||||
if not timed_out_rows:
|
||||
return 0
|
||||
|
||||
job_ids = [row[0] for row in timed_out_rows]
|
||||
placeholders = ",".join(["?"] * len(job_ids))
|
||||
|
||||
# Update them to failed
|
||||
conn.execute(
|
||||
f"""
|
||||
UPDATE generated_videos
|
||||
SET status = 'failed', updated_at = ?
|
||||
WHERE id IN ({placeholders})
|
||||
""",
|
||||
[datetime.now(timezone.utc)] + job_ids
|
||||
)
|
||||
|
||||
return len(job_ids)
|
||||
|
||||
Reference in New Issue
Block a user