Files
calminer/services/security.py
zwitschi ef4fb7dcf0
Some checks failed
Run Tests / e2e tests (push) Failing after 1m20s
Run Tests / unit tests (push) Has been cancelled
Run Tests / lint tests (push) Has been cancelled
Refactor architecture documentation and enhance security features
- Updated architecture constraints documentation to include detailed sections on technical, organizational, regulatory, environmental, and performance constraints.
- Created separate markdown files for each type of constraint for better organization and clarity.
- Revised the architecture scope section to provide a clearer overview of the system's key areas.
- Enhanced the solution strategy documentation with detailed explanations of the client-server architecture, technology choices, trade-offs, and future considerations.
- Added comprehensive descriptions of backend and frontend components, middleware, and utilities in the architecture documentation.
- Migrated UI, templates, and styling notes to a dedicated section for better structure.
- Updated requirements.txt to include missing dependencies.
- Refactored user authentication logic in the users.py and security.py files to improve code organization and maintainability, including the integration of OAuth2 password bearer token handling.
2025-10-27 12:46:51 +01:00

60 lines
1.9 KiB
Python

from datetime import datetime, timedelta
from typing import Any, Union
from fastapi import HTTPException, status, Depends
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from config.database import get_db
ACCESS_TOKEN_EXPIRE_MINUTES = 30
SECRET_KEY = "your-secret-key" # Change this in production
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="users/login")
def create_access_token(
subject: Union[str, Any], expires_delta: Union[timedelta, None] = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
from models.user import User
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
return user