from __future__ import annotations from dataclasses import dataclass from typing import Literal, Optional, TYPE_CHECKING from fastapi import Request, Response from config.settings import SessionSettings from services.security import JWTSettings if TYPE_CHECKING: # pragma: no cover - used only for static typing from models import User @dataclass(slots=True) class SessionStrategy: """Describe how authentication tokens are transported with requests.""" access_cookie_name: str refresh_cookie_name: str cookie_secure: bool cookie_domain: Optional[str] cookie_path: str header_name: str header_prefix: str allow_header_fallback: bool = True @classmethod def from_settings(cls, settings: SessionSettings) -> "SessionStrategy": return cls( access_cookie_name=settings.access_cookie_name, refresh_cookie_name=settings.refresh_cookie_name, cookie_secure=settings.cookie_secure, cookie_domain=settings.cookie_domain, cookie_path=settings.cookie_path, header_name=settings.header_name, header_prefix=settings.header_prefix, allow_header_fallback=settings.allow_header_fallback, ) @dataclass(slots=True) class SessionTokens: """Raw access and refresh tokens extracted from the transport layer.""" access_token: Optional[str] refresh_token: Optional[str] access_token_source: Literal["cookie", "header", "none"] = "none" @property def has_access(self) -> bool: return bool(self.access_token) @property def has_refresh(self) -> bool: return bool(self.refresh_token) @property def is_empty(self) -> bool: return not self.has_access and not self.has_refresh @dataclass(slots=True) class AuthSession: """Holds authenticated user context resolved from session tokens.""" tokens: SessionTokens user: Optional["User"] = None scopes: tuple[str, ...] = () issued_access_token: Optional[str] = None issued_refresh_token: Optional[str] = None clear_cookies: bool = False @property def is_authenticated(self) -> bool: return self.user is not None @classmethod def anonymous(cls) -> "AuthSession": return cls(tokens=SessionTokens(access_token=None, refresh_token=None)) def issue_tokens( self, *, access_token: str, refresh_token: Optional[str] = None, access_source: Literal["cookie", "header", "none"] = "cookie", ) -> None: self.issued_access_token = access_token if refresh_token is not None: self.issued_refresh_token = refresh_token self.tokens = SessionTokens( access_token=access_token, refresh_token=refresh_token if refresh_token is not None else self.tokens.refresh_token, access_token_source=access_source, ) def mark_cleared(self) -> None: self.clear_cookies = True self.tokens = SessionTokens(access_token=None, refresh_token=None) self.user = None self.scopes = () def extract_session_tokens(request: Request, strategy: SessionStrategy) -> SessionTokens: """Pull tokens from cookies or headers according to configured strategy.""" access_token: Optional[str] = None refresh_token: Optional[str] = None access_source: Literal["cookie", "header", "none"] = "none" if strategy.access_cookie_name in request.cookies: access_token = request.cookies.get(strategy.access_cookie_name) or None if access_token: access_source = "cookie" if strategy.refresh_cookie_name in request.cookies: refresh_token = request.cookies.get( strategy.refresh_cookie_name) or None if not access_token and strategy.allow_header_fallback: header_value = request.headers.get(strategy.header_name) if header_value: candidate = header_value.strip() prefix = f"{strategy.header_prefix} " if strategy.header_prefix else "" if prefix and candidate.lower().startswith(prefix.lower()): candidate = candidate[len(prefix):].strip() if candidate: access_token = candidate access_source = "header" return SessionTokens( access_token=access_token, refresh_token=refresh_token, access_token_source=access_source, ) def build_session_strategy(settings: SessionSettings) -> SessionStrategy: """Create a session strategy object from settings configuration.""" return SessionStrategy.from_settings(settings) def set_session_cookies( response: Response, *, access_token: str, refresh_token: Optional[str], strategy: SessionStrategy, jwt_settings: JWTSettings, ) -> None: """Persist session cookies on an outgoing response.""" access_ttl = int(jwt_settings.access_token_ttl.total_seconds()) refresh_ttl = int(jwt_settings.refresh_token_ttl.total_seconds()) response.set_cookie( strategy.access_cookie_name, access_token, httponly=True, secure=strategy.cookie_secure, samesite="lax", max_age=max(access_ttl, 0) or None, domain=strategy.cookie_domain, path=strategy.cookie_path, ) if refresh_token is not None: response.set_cookie( strategy.refresh_cookie_name, refresh_token, httponly=True, secure=strategy.cookie_secure, samesite="lax", max_age=max(refresh_ttl, 0) or None, domain=strategy.cookie_domain, path=strategy.cookie_path, ) def clear_session_cookies(response: Response, strategy: SessionStrategy) -> None: """Remove session cookies from the client.""" response.delete_cookie( strategy.access_cookie_name, domain=strategy.cookie_domain, path=strategy.cookie_path, ) response.delete_cookie( strategy.refresh_cookie_name, domain=strategy.cookie_domain, path=strategy.cookie_path, )