- Updated architecture constraints documentation to include detailed sections on technical, organizational, regulatory, environmental, and performance constraints. - Created separate markdown files for each type of constraint for better organization and clarity. - Revised the architecture scope section to provide a clearer overview of the system's key areas. - Enhanced the solution strategy documentation with detailed explanations of the client-server architecture, technology choices, trade-offs, and future considerations. - Added comprehensive descriptions of backend and frontend components, middleware, and utilities in the architecture documentation. - Migrated UI, templates, and styling notes to a dedicated section for better structure. - Updated requirements.txt to include missing dependencies. - Refactored user authentication logic in the users.py and security.py files to improve code organization and maintainability, including the integration of OAuth2 password bearer token handling.
60 lines
1.9 KiB
Python
60 lines
1.9 KiB
Python
from datetime import datetime, timedelta
|
|
from typing import Any, Union
|
|
|
|
from fastapi import HTTPException, status, Depends
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from jose import jwt, JWTError
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy.orm import Session
|
|
|
|
from config.database import get_db
|
|
|
|
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
|
SECRET_KEY = "your-secret-key" # Change this in production
|
|
ALGORITHM = "HS256"
|
|
|
|
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="users/login")
|
|
|
|
|
|
def create_access_token(
|
|
subject: Union[str, Any], expires_delta: Union[timedelta, None] = None
|
|
) -> str:
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
to_encode = {"exp": expire, "sub": str(subject)}
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
def get_password_hash(password: str) -> str:
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
|
|
from models.user import User
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username = payload.get("sub")
|
|
if username is None:
|
|
raise credentials_exception
|
|
except JWTError:
|
|
raise credentials_exception
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if user is None:
|
|
raise credentials_exception
|
|
return user
|