Files
calminer/routes/users.py
zwitschi 54137b88d7
Some checks failed
Run Tests / e2e tests (push) Failing after 50s
Run Tests / lint tests (push) Failing after 1m53s
Run Tests / unit tests (push) Failing after 2m25s
feat: Enhance Python environment setup with system Python option and improve dependency installation
refactor: Clean up imports in currencies and users routes
fix: Update theme settings saving logic and clean up test imports
2025-10-27 18:39:20 +01:00

108 lines
4.0 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from config.database import get_db
from models.user import User
from services.security import create_access_token, get_current_user
from schemas.user import (
PasswordReset,
PasswordResetRequest,
UserCreate,
UserInDB,
UserLogin,
UserUpdate,
)
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/register", response_model=UserInDB, status_code=status.HTTP_201_CREATED)
async def register_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.username == user.username).first()
if db_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered")
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
# Get or create default role
from models.role import Role
default_role = db.query(Role).filter(Role.name == "user").first()
if not default_role:
default_role = Role(name="user")
db.add(default_role)
db.commit()
db.refresh(default_role)
new_user = User(username=user.username, email=user.email,
role_id=default_role.id)
new_user.set_password(user.password)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
@router.post("/login")
async def login_user(user: UserLogin, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.username == user.username).first()
if not db_user or not db_user.check_password(user.password):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password")
access_token = create_access_token(subject=db_user.username)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
@router.put("/me", response_model=UserInDB)
async def update_user_me(user_update: UserUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if user_update.username and user_update.username != current_user.username:
existing_user = db.query(User).filter(
User.username == user_update.username).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Username already taken")
setattr(current_user, "username", user_update.username)
if user_update.email and user_update.email != current_user.email:
existing_user = db.query(User).filter(
User.email == user_update.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
setattr(current_user, "email", user_update.email)
if user_update.password:
current_user.set_password(user_update.password)
db.add(current_user)
db.commit()
db.refresh(current_user)
return current_user
@router.post("/forgot-password")
async def forgot_password(request: PasswordResetRequest):
# In a real application, this would send an email with a reset token
return {"message": "Password reset email sent (not really)"}
@router.post("/reset-password")
async def reset_password(request: PasswordReset, db: Session = Depends(get_db)):
# In a real application, the token would be verified
user = db.query(User).filter(User.username ==
request.token).first() # Use token as username for test
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token or user")
user.set_password(request.new_password)
db.add(user)
db.commit()
return {"message": "Password has been reset successfully"}