Files
calminer/services/session.py

193 lines
6.0 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Optional, TYPE_CHECKING
from fastapi import Request, Response
from config.settings import SessionSettings
from services.security import JWTSettings
if TYPE_CHECKING: # pragma: no cover - used only for static typing
from models import User
@dataclass(slots=True)
class SessionStrategy:
"""Describe how authentication tokens are transported with requests."""
access_cookie_name: str
refresh_cookie_name: str
cookie_secure: bool
cookie_domain: Optional[str]
cookie_path: str
header_name: str
header_prefix: str
allow_header_fallback: bool = True
@classmethod
def from_settings(cls, settings: SessionSettings) -> "SessionStrategy":
return cls(
access_cookie_name=settings.access_cookie_name,
refresh_cookie_name=settings.refresh_cookie_name,
cookie_secure=settings.cookie_secure,
cookie_domain=settings.cookie_domain,
cookie_path=settings.cookie_path,
header_name=settings.header_name,
header_prefix=settings.header_prefix,
allow_header_fallback=settings.allow_header_fallback,
)
@dataclass(slots=True)
class SessionTokens:
"""Raw access and refresh tokens extracted from the transport layer."""
access_token: Optional[str]
refresh_token: Optional[str]
access_token_source: Literal["cookie", "header", "none"] = "none"
@property
def has_access(self) -> bool:
return bool(self.access_token)
@property
def has_refresh(self) -> bool:
return bool(self.refresh_token)
@property
def is_empty(self) -> bool:
return not self.has_access and not self.has_refresh
@dataclass(slots=True)
class AuthSession:
"""Holds authenticated user context resolved from session tokens."""
tokens: SessionTokens
user: Optional["User"] = None
scopes: tuple[str, ...] = ()
issued_access_token: Optional[str] = None
issued_refresh_token: Optional[str] = None
clear_cookies: bool = False
@property
def is_authenticated(self) -> bool:
return self.user is not None
@classmethod
def anonymous(cls) -> "AuthSession":
return cls(tokens=SessionTokens(access_token=None, refresh_token=None))
def issue_tokens(
self,
*,
access_token: str,
refresh_token: Optional[str] = None,
access_source: Literal["cookie", "header", "none"] = "cookie",
) -> None:
self.issued_access_token = access_token
if refresh_token is not None:
self.issued_refresh_token = refresh_token
self.tokens = SessionTokens(
access_token=access_token,
refresh_token=refresh_token if refresh_token is not None else self.tokens.refresh_token,
access_token_source=access_source,
)
def mark_cleared(self) -> None:
self.clear_cookies = True
self.tokens = SessionTokens(access_token=None, refresh_token=None)
self.user = None
self.scopes = ()
def extract_session_tokens(request: Request, strategy: SessionStrategy) -> SessionTokens:
"""Pull tokens from cookies or headers according to configured strategy."""
access_token: Optional[str] = None
refresh_token: Optional[str] = None
access_source: Literal["cookie", "header", "none"] = "none"
if strategy.access_cookie_name in request.cookies:
access_token = request.cookies.get(strategy.access_cookie_name) or None
if access_token:
access_source = "cookie"
if strategy.refresh_cookie_name in request.cookies:
refresh_token = request.cookies.get(
strategy.refresh_cookie_name) or None
if not access_token and strategy.allow_header_fallback:
header_value = request.headers.get(strategy.header_name)
if header_value:
candidate = header_value.strip()
prefix = f"{strategy.header_prefix} " if strategy.header_prefix else ""
if prefix and candidate.lower().startswith(prefix.lower()):
candidate = candidate[len(prefix):].strip()
if candidate:
access_token = candidate
access_source = "header"
return SessionTokens(
access_token=access_token,
refresh_token=refresh_token,
access_token_source=access_source,
)
def build_session_strategy(settings: SessionSettings) -> SessionStrategy:
"""Create a session strategy object from settings configuration."""
return SessionStrategy.from_settings(settings)
def set_session_cookies(
response: Response,
*,
access_token: str,
refresh_token: Optional[str],
strategy: SessionStrategy,
jwt_settings: JWTSettings,
) -> None:
"""Persist session cookies on an outgoing response."""
access_ttl = int(jwt_settings.access_token_ttl.total_seconds())
refresh_ttl = int(jwt_settings.refresh_token_ttl.total_seconds())
response.set_cookie(
strategy.access_cookie_name,
access_token,
httponly=True,
secure=strategy.cookie_secure,
samesite="lax",
max_age=max(access_ttl, 0) or None,
domain=strategy.cookie_domain,
path=strategy.cookie_path,
)
if refresh_token is not None:
response.set_cookie(
strategy.refresh_cookie_name,
refresh_token,
httponly=True,
secure=strategy.cookie_secure,
samesite="lax",
max_age=max(refresh_ttl, 0) or None,
domain=strategy.cookie_domain,
path=strategy.cookie_path,
)
def clear_session_cookies(response: Response, strategy: SessionStrategy) -> None:
"""Remove session cookies from the client."""
response.delete_cookie(
strategy.access_cookie_name,
domain=strategy.cookie_domain,
path=strategy.cookie_path,
)
response.delete_cookie(
strategy.refresh_cookie_name,
domain=strategy.cookie_domain,
path=strategy.cookie_path,
)