diff --git a/backend/app/routers/admin.py b/backend/app/routers/admin.py index c4ea8da..983a7bf 100644 --- a/backend/app/routers/admin.py +++ b/backend/app/routers/admin.py @@ -1,5 +1,5 @@ """Admin router: operational endpoints for application management.""" -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Any from fastapi import APIRouter, Depends @@ -7,6 +7,7 @@ from fastapi import APIRouter, Depends from ..db import get_conn, get_write_lock from ..dependencies import require_admin from ..services import models as models_service +from ..services.models import mark_timed_out_video_jobs router = APIRouter(prefix="/admin", tags=["admin"]) @@ -104,3 +105,83 @@ async def refresh_models( "total_models": status.get("model_count"), "last_updated": status.get("last_updated"), } + + +@router.get("/videos") +async def admin_list_video_jobs(_: dict = Depends(require_admin)) -> list[dict[str, Any]]: + """Return all video generation jobs across all users.""" + conn = get_conn() + rows = conn.execute( + """ + SELECT + v.id, v.job_id, v.user_id, u.email, v.model_id, v.prompt, + v.status, v.video_url, v.created_at, v.updated_at + FROM generated_videos v + LEFT JOIN users u ON v.user_id = u.id + ORDER BY v.created_at DESC + """ + ).fetchall() + return [ + { + "id": str(row[0]), + "job_id": row[1], + "user_id": str(row[2]), + "user_email": row[3], + "model_id": row[4], + "prompt": row[5], + "status": row[6], + "video_url": row[7], + "created_at": row[8].isoformat() if row[8] else None, + "updated_at": row[9].isoformat() if row[9] else None, + } + for row in rows + ] + + +@router.post("/videos/{job_id}/cancel", status_code=200) +async def admin_cancel_video_job(job_id: str, _: dict = Depends(require_admin)) -> dict[str, str]: + """Mark a video job as 'cancelled'. Does not stop the provider job.""" + conn = get_conn() + lock = get_write_lock() + now = datetime.now(timezone.utc) + async with lock: + conn.execute( + "UPDATE generated_videos SET status = 'cancelled', updated_at = ? WHERE id = ?", [ + now, job_id] + ) + return {"status": "ok", "job_id": job_id} + + +@router.post("/videos/purge", status_code=200) +async def admin_purge_video_jobs(_: dict = Depends(require_admin)) -> dict[str, Any]: + """Delete all completed, failed, or cancelled jobs older than 30 days.""" + conn = get_conn() + lock = get_write_lock() + thirty_days_ago = datetime.now( + timezone.utc) - timedelta(days=30) + + sql_count = "SELECT COUNT(*) FROM generated_videos" + sql_delete = """ + DELETE FROM generated_videos + WHERE status IN ('completed', 'failed', 'cancelled') + AND updated_at < ? + """ + + async with lock: + before_row = conn.execute(sql_count).fetchone() + before = before_row[0] if before_row else 0 + + conn.execute(sql_delete, [thirty_days_ago]) + + after_row = conn.execute(sql_count).fetchone() + after = after_row[0] if after_row else 0 + + return {"deleted": before - after, "remaining": after} + + +@router.post("/videos/timed-out", status_code=200) +async def admin_mark_timed_out(_: dict = Depends(require_admin)) -> dict[str, int]: + """Mark video jobs that have been in 'queued' or 'processing' status for too long as 'failed'.""" + conn = get_conn() + count = mark_timed_out_video_jobs(conn, timeout_minutes=120) + return {"timed_out": count} diff --git a/backend/app/services/models.py b/backend/app/services/models.py index 040982a..82b1fed 100644 --- a/backend/app/services/models.py +++ b/backend/app/services/models.py @@ -207,3 +207,40 @@ def get_cache_status(conn: duckdb.DuckDBPyConnection) -> dict[str, Any]: ).fetchone() last_updated, model_count = (row[0], row[1]) if row else (None, 0) return {"last_updated": last_updated, "model_count": model_count} + + +def mark_timed_out_video_jobs(conn: duckdb.DuckDBPyConnection, timeout_minutes: int = 120) -> int: + """Mark video jobs that have been in 'queued' or 'processing' status for too long as 'failed'. + + Returns the number of jobs marked as timed out. + """ + timeout_threshold = datetime.now( + timezone.utc) - timedelta(minutes=timeout_minutes) + + # Find timed out jobs + timed_out_rows = conn.execute( + """ + SELECT id FROM generated_videos + WHERE status IN ('queued', 'processing') + AND updated_at < ? + """, + [timeout_threshold] + ).fetchall() + + if not timed_out_rows: + return 0 + + job_ids = [row[0] for row in timed_out_rows] + placeholders = ",".join(["?"] * len(job_ids)) + + # Update them to failed + conn.execute( + f""" + UPDATE generated_videos + SET status = 'failed', updated_at = ? + WHERE id IN ({placeholders}) + """, + [datetime.now(timezone.utc)] + job_ids + ) + + return len(job_ids) diff --git a/frontend/app/main.py b/frontend/app/main.py index 47a3a9a..162f42f 100644 --- a/frontend/app/main.py +++ b/frontend/app/main.py @@ -217,10 +217,11 @@ def dashboard(): images = img_resp.json() if img_resp.status_code == 200 else [] gen_resp = _api("GET", "/generate/images", token=token) generated_images = gen_resp.json() if gen_resp.status_code == 200 else [] - + vid_resp = _api("GET", "/generate/videos", token=token) videos = vid_resp.json() if vid_resp.status_code == 200 else [] - pending_videos = [v for v in videos if v.get("status") not in ("completed", "failed")] + pending_videos = [v for v in videos if v.get( + "status") not in ("completed", "failed")] completed_videos = [v for v in videos if v.get("status") == "completed"] return render_template("dashboard.html", user=user, images=images, @@ -418,7 +419,7 @@ def generate_video(): duration = int( duration_raw) if duration_raw.strip().isdigit() else None resolution = request.form.get("resolution", "").strip() or None - + if mode == "image": resp = _api("POST", "/generate/video/from-image", token=token, json={ "model": request.form.get("model", "").strip(), @@ -436,7 +437,7 @@ def generate_video(): "duration_seconds": duration, "resolution": resolution, }) - + if resp.status_code == 200: result = resp.json() # On success, redirect to the detail page to monitor progress @@ -506,6 +507,13 @@ def admin_models(): return render_template("admin/models.html") +@app.get("/admin/videos") +@admin_required +def admin_videos(): + """Show all video generation jobs across all users.""" + return render_template("admin/videos.html") + + # ── Profile ─────────────────────────────────────────────────────────────── @app.route("/users/profile", methods=["GET", "POST"]) diff --git a/frontend/app/templates/admin/videos.html b/frontend/app/templates/admin/videos.html new file mode 100644 index 0000000..2c9f51c --- /dev/null +++ b/frontend/app/templates/admin/videos.html @@ -0,0 +1,163 @@ +{% extends "base.html" %} {% block title %}Admin - Video Jobs{% endblock %} {% +block content %} +
+

Admin: Video Jobs

+ + +
+

Maintenance

+

+ Delete all completed, failed, or cancelled jobs older than 30 days. +

+ +

+
+ + +
+ + + + + + + + + + + + + + + + +
+ User + + Status + + Model + + Prompt + + Created + + Actions +
Loading jobs...
+
+
+ + +{% endblock %} diff --git a/frontend/app/templates/base.html b/frontend/app/templates/base.html index bea6c51..b9723cc 100644 --- a/frontend/app/templates/base.html +++ b/frontend/app/templates/base.html @@ -30,6 +30,7 @@ Profile {% if session.get('user_role') == 'admin' %} Admin + Video Jobs {% endif %} Log out {% else %}