2c6fdc03a8
Co-authored-by: Copilot <copilot@github.com>
43 lines
1.3 KiB
Python
43 lines
1.3 KiB
Python
"""FastAPI dependencies (e.g. authenticated user extraction)."""
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from jose import JWTError
|
|
|
|
from .services.auth import decode_token
|
|
|
|
_bearer = HTTPBearer()
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(_bearer),
|
|
) -> dict:
|
|
"""Extract and validate the Bearer JWT. Returns the token payload."""
|
|
credentials_error = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials.",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
payload = decode_token(credentials.credentials)
|
|
except JWTError:
|
|
raise credentials_error
|
|
|
|
if payload.get("type") != "access":
|
|
raise credentials_error
|
|
|
|
user_id: str | None = payload.get("sub")
|
|
if user_id is None:
|
|
raise credentials_error
|
|
|
|
return {"id": user_id, "email": payload.get("email"), "role": payload.get("role")}
|
|
|
|
|
|
async def require_admin(current_user: dict = Depends(get_current_user)) -> dict:
|
|
"""Raise 403 if the authenticated user is not an admin."""
|
|
if current_user.get("role") != "admin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin access required.",
|
|
)
|
|
return current_user
|