"""Flask frontend application.""" import functools import httpx from flask import ( Flask, flash, jsonify, redirect, render_template, request, session, url_for, ) from .config import Config app = Flask(__name__) app.config.from_object(Config) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _backend(path: str) -> str: return f"{app.config['BACKEND_URL']}{path}" def _api(method: str, path: str, *, token: str | None = None, **kwargs): headers = kwargs.pop("headers", {}) if token: headers["Authorization"] = f"Bearer {token}" return httpx.request(method, _backend(path), headers=headers, timeout=30, **kwargs) def login_required(view): @functools.wraps(view) def wrapped(*args, **kwargs): if "access_token" not in session: return redirect(url_for("login")) return view(*args, **kwargs) return wrapped def admin_required(view): @functools.wraps(view) def wrapped(*args, **kwargs): if "access_token" not in session: return redirect(url_for("login")) if session.get("user_role") != "admin": flash("Admin access required.", "error") return redirect(url_for("dashboard")) return view(*args, **kwargs) return wrapped # --------------------------------------------------------------------------- # Auth routes # --------------------------------------------------------------------------- @app.get("/") def index(): if "access_token" in session: return redirect(url_for("dashboard")) return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": email = request.form["email"] password = request.form["password"] resp = _api("POST", "/auth/login", json={"email": email, "password": password}) if resp.status_code == 200: data = resp.json() session["access_token"] = data["access_token"] session["refresh_token"] = data["refresh_token"] me = _api("GET", "/users/me", token=data["access_token"]) if me.status_code == 200: u = me.json() session["user_email"] = u.get("email", "") session["user_role"] = u.get("role", "user") return redirect(url_for("dashboard")) flash("Invalid email or password.", "error") return render_template("login.html") @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": email = request.form["email"] password = request.form["password"] resp = _api("POST", "/auth/register", json={"email": email, "password": password}) if resp.status_code == 201: flash("Account created. Please log in.", "success") return redirect(url_for("login")) detail = resp.json().get("detail", "Registration failed.") flash(detail, "error") return render_template("register.html") @app.get("/logout") def logout(): refresh_token = session.get("refresh_token") if refresh_token: _api("POST", "/auth/logout", json={"refresh_token": refresh_token}) session.clear() return redirect(url_for("login")) # --------------------------------------------------------------------------- # Authenticated routes # --------------------------------------------------------------------------- @app.get("/dashboard") @login_required def dashboard(): token = session["access_token"] resp = _api("GET", "/users/me", token=token) user = resp.json() if resp.status_code == 200 else {} return render_template("dashboard.html", user=user) # ── Generate ────────────────────────────────────────────────────────────── @app.get("/generate") @login_required def generate(): return redirect(url_for("generate_text")) @app.route("/generate/text", methods=["GET", "POST"]) @login_required def generate_text(): result = error = None if request.method == "POST": resp = _api("POST", "/generate/text", token=session["access_token"], json={ "model": request.form.get("model", "").strip(), "prompt": request.form.get("prompt", "").strip(), }) if resp.status_code == 200: result = resp.json() else: error = resp.json().get("detail", "Generation failed.") return render_template("generate_text.html", result=result, error=error) @app.route("/generate/image", methods=["GET", "POST"]) @login_required def generate_image(): result = error = None if request.method == "POST": resp = _api("POST", "/generate/image", token=session["access_token"], json={ "model": request.form.get("model", "").strip(), "prompt": request.form.get("prompt", "").strip(), "n": int(request.form.get("n", 1)), "size": request.form.get("size", "1024x1024"), "aspect_ratio": request.form.get("aspect_ratio", "").strip() or None, "image_size": request.form.get("image_size", "").strip() or None, }) if resp.status_code == 200: result = resp.json() else: error = resp.json().get("detail", "Generation failed.") return render_template("generate_image.html", result=result, error=error) @app.route("/generate/video", methods=["GET", "POST"]) @login_required def generate_video(): result = error = None if request.method == "POST": mode = request.form.get("mode", "text") token = session["access_token"] duration_raw = request.form.get("duration_seconds", "") duration = int( duration_raw) if duration_raw.strip().isdigit() else None resolution = request.form.get("resolution", "").strip() or None if mode == "image": resp = _api("POST", "/generate/video/from-image", token=token, json={ "model": request.form.get("model", "").strip(), "image_url": request.form.get("image_url", "").strip(), "prompt": request.form.get("prompt", "").strip(), "aspect_ratio": request.form.get("aspect_ratio", "16:9"), "duration_seconds": duration, "resolution": resolution, }) else: resp = _api("POST", "/generate/video", token=token, json={ "model": request.form.get("model", "").strip(), "prompt": request.form.get("prompt", "").strip(), "aspect_ratio": request.form.get("aspect_ratio", "16:9"), "duration_seconds": duration, "resolution": resolution, }) if resp.status_code == 200: result = resp.json() else: error = resp.json().get("detail", "Generation failed.") return render_template("generate_video.html", result=result, error=error) @app.get("/generate/video/status") @login_required def generate_video_status(): """Proxy video status polling to the backend.""" polling_url = request.args.get("polling_url", "") if not polling_url: return jsonify({"error": "polling_url required"}), 400 resp = _api( "GET", "/generate/video/status", token=session["access_token"], params={"polling_url": polling_url}, ) return jsonify(resp.json()), resp.status_code # ── Admin ───────────────────────────────────────────────────────────────── @app.get("/admin") @admin_required def admin(): token = session["access_token"] stats_resp = _api("GET", "/admin/stats", token=token) users_resp = _api("GET", "/users", token=token) stats = stats_resp.json() if stats_resp.status_code == 200 else {} users = users_resp.json() if users_resp.status_code == 200 else [] return render_template("admin.html", stats=stats, users=users) @app.post("/admin/users//role") @admin_required def admin_set_role(user_id: str): role = request.form.get("role", "user") _api("PUT", f"/users/{user_id}/role", token=session["access_token"], json={"role": role}) flash(f"Role updated to '{role}'.", "success") return redirect(url_for("admin")) @app.post("/admin/users//delete") @admin_required def admin_delete_user(user_id: str): _api("DELETE", f"/users/{user_id}", token=session["access_token"]) flash("User deleted.", "success") return redirect(url_for("admin")) # ── Profile ─────────────────────────────────────────────────────────────── @app.route("/users/profile", methods=["GET", "POST"]) @login_required def profile(): token = session["access_token"] if request.method == "POST": payload: dict = {} new_email = request.form.get("email", "").strip() new_password = request.form.get("password", "").strip() if new_email: payload["email"] = new_email if new_password: payload["password"] = new_password if payload: resp = _api("PUT", "/users/me", token=token, json=payload) if resp.status_code == 200: updated = resp.json() session["user_email"] = updated.get( "email", session.get("user_email", "")) flash("Profile updated.", "success") else: flash(resp.json().get("detail", "Update failed."), "error") return redirect(url_for("profile")) resp = _api("GET", "/users/me", token=token) user = resp.json() if resp.status_code == 200 else {} return render_template("profile.html", user=user)