feat: add token validation and auto-refresh functionality in frontend helpers

This commit is contained in:
2026-05-30 19:59:41 +02:00
parent aa0f79fe75
commit 45fac8223a
2 changed files with 67 additions and 0 deletions
+11
View File
@@ -4,6 +4,9 @@ import uuid
from fastapi import APIRouter, HTTPException, status
from jose import JWTError
from fastapi import Depends
from ..dependencies import get_current_user
from ..models.auth import LoginRequest, RefreshRequest, RegisterRequest, TokenResponse
from ..services.auth import (
authenticate_user,
@@ -95,3 +98,11 @@ async def logout(body: RefreshRequest) -> None:
jti = payload.get("jti", "")
if jti:
await revoke_refresh_token(jti)
@router.get("/validate")
async def validate(
current_user: dict = Depends(get_current_user),
) -> dict:
"""Return the token payload if the access token is valid."""
return current_user
+56
View File
@@ -76,6 +76,9 @@ def login_required(view):
def wrapped(*args, **kwargs):
if "access_token" not in session:
return redirect(url_for("auth.login"))
# Validate, with auto-refresh on expiry
if not _validate_and_refresh():
return redirect(url_for("auth.login"))
return view(*args, **kwargs)
return wrapped
@@ -85,8 +88,61 @@ def admin_required(view):
def wrapped(*args, **kwargs):
if "access_token" not in session:
return redirect(url_for("auth.login"))
if not _validate_and_refresh():
return redirect(url_for("auth.login"))
if session.get("user_role") != "admin":
flash("Admin access required.", "error")
return redirect(url_for("dashboard.index"))
return view(*args, **kwargs)
return wrapped
# ── Token validation & refresh ────────────────────────────────────────────
def _validate_access_token(token: str) -> bool:
"""Return True if the access token is still valid."""
try:
resp = _api("GET", "/auth/validate", token=token)
return resp.status_code == 200
except httpx.RequestError:
return False
def _try_refresh() -> bool:
"""Attempt to refresh an expired access token using the stored refresh token.
On success, updates session tokens in place. Returns True if a valid
access token exists after the attempt.
"""
refresh_token = session.get("refresh_token")
if not refresh_token:
return False
try:
resp = _api("POST", "/auth/refresh",
json={"refresh_token": refresh_token})
except httpx.RequestError:
return False
if resp.status_code != 200:
return False
data = resp.json()
session["access_token"] = data["access_token"]
session["refresh_token"] = data["refresh_token"]
return True
def _validate_and_refresh() -> bool:
"""Check access token validity; attempt refresh if expired.
Returns True if a valid session exists after the check.
"""
token = session.get("access_token")
if not token:
return False
if _validate_access_token(token):
return True
# Access token expired — try to refresh
if _try_refresh():
return True
# Both tokens are dead — clear session
session.clear()
return False