diff --git a/backend/app/routers/auth.py b/backend/app/routers/auth.py index 6707897..66b47f4 100644 --- a/backend/app/routers/auth.py +++ b/backend/app/routers/auth.py @@ -4,6 +4,9 @@ import uuid from fastapi import APIRouter, HTTPException, status from jose import JWTError +from fastapi import Depends + +from ..dependencies import get_current_user from ..models.auth import LoginRequest, RefreshRequest, RegisterRequest, TokenResponse from ..services.auth import ( authenticate_user, @@ -95,3 +98,11 @@ async def logout(body: RefreshRequest) -> None: jti = payload.get("jti", "") if jti: await revoke_refresh_token(jti) + + +@router.get("/validate") +async def validate( + current_user: dict = Depends(get_current_user), +) -> dict: + """Return the token payload if the access token is valid.""" + return current_user diff --git a/frontend/app/helpers.py b/frontend/app/helpers.py index 4ec0585..14e63d0 100644 --- a/frontend/app/helpers.py +++ b/frontend/app/helpers.py @@ -76,6 +76,9 @@ def login_required(view): def wrapped(*args, **kwargs): if "access_token" not in session: return redirect(url_for("auth.login")) + # Validate, with auto-refresh on expiry + if not _validate_and_refresh(): + return redirect(url_for("auth.login")) return view(*args, **kwargs) return wrapped @@ -85,8 +88,61 @@ def admin_required(view): def wrapped(*args, **kwargs): if "access_token" not in session: return redirect(url_for("auth.login")) + if not _validate_and_refresh(): + return redirect(url_for("auth.login")) if session.get("user_role") != "admin": flash("Admin access required.", "error") return redirect(url_for("dashboard.index")) return view(*args, **kwargs) return wrapped + + +# ── Token validation & refresh ──────────────────────────────────────────── + +def _validate_access_token(token: str) -> bool: + """Return True if the access token is still valid.""" + try: + resp = _api("GET", "/auth/validate", token=token) + return resp.status_code == 200 + except httpx.RequestError: + return False + + +def _try_refresh() -> bool: + """Attempt to refresh an expired access token using the stored refresh token. + + On success, updates session tokens in place. Returns True if a valid + access token exists after the attempt. + """ + refresh_token = session.get("refresh_token") + if not refresh_token: + return False + try: + resp = _api("POST", "/auth/refresh", + json={"refresh_token": refresh_token}) + except httpx.RequestError: + return False + if resp.status_code != 200: + return False + data = resp.json() + session["access_token"] = data["access_token"] + session["refresh_token"] = data["refresh_token"] + return True + + +def _validate_and_refresh() -> bool: + """Check access token validity; attempt refresh if expired. + + Returns True if a valid session exists after the check. + """ + token = session.get("access_token") + if not token: + return False + if _validate_access_token(token): + return True + # Access token expired — try to refresh + if _try_refresh(): + return True + # Both tokens are dead — clear session + session.clear() + return False