Files

43 lines
1.3 KiB
Python

"""FastAPI dependencies (e.g. authenticated user extraction)."""
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError
from .services.auth import decode_token
_bearer = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(_bearer),
) -> dict:
"""Extract and validate the Bearer JWT. Returns the token payload."""
credentials_error = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials.",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(credentials.credentials)
except JWTError:
raise credentials_error
if payload.get("type") != "access":
raise credentials_error
user_id: str | None = payload.get("sub")
if user_id is None:
raise credentials_error
return {"id": user_id, "email": payload.get("email"), "role": payload.get("role")}
async def require_admin(current_user: dict = Depends(get_current_user)) -> dict:
"""Raise 403 if the authenticated user is not an admin."""
if current_user.get("role") != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required.",
)
return current_user