feat: implement user and role models with password hashing, and add tests for user functionality
This commit is contained in:
@@ -14,6 +14,7 @@ from .metadata import (
|
||||
from .project import MiningOperationType, Project
|
||||
from .scenario import Scenario, ScenarioStatus
|
||||
from .simulation_parameter import DistributionType, SimulationParameter
|
||||
from .user import Role, User, UserRole, password_context
|
||||
|
||||
__all__ = [
|
||||
"FinancialCategory",
|
||||
@@ -32,4 +33,8 @@ __all__ = [
|
||||
"STOCHASTIC_VARIABLE_METADATA",
|
||||
"ResourceDescriptor",
|
||||
"StochasticVariableDescriptor",
|
||||
"User",
|
||||
"Role",
|
||||
"UserRole",
|
||||
"password_context",
|
||||
]
|
||||
|
||||
168
models/user.py
Normal file
168
models/user.py
Normal file
@@ -0,0 +1,168 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
|
||||
from passlib.context import CryptContext
|
||||
from sqlalchemy import (
|
||||
Boolean,
|
||||
DateTime,
|
||||
ForeignKey,
|
||||
Integer,
|
||||
String,
|
||||
Text,
|
||||
UniqueConstraint,
|
||||
)
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
from config.database import Base
|
||||
|
||||
# Configure password hashing strategy. Argon2 provides strong resistance against
|
||||
# GPU-based cracking attempts, aligning with the security plan.
|
||||
password_context = CryptContext(schemes=["argon2"], deprecated="auto")
|
||||
|
||||
|
||||
class User(Base):
|
||||
"""Authenticated platform user with optional elevated privileges."""
|
||||
|
||||
__tablename__ = "users"
|
||||
__table_args__ = (
|
||||
UniqueConstraint("email", name="uq_users_email"),
|
||||
UniqueConstraint("username", name="uq_users_username"),
|
||||
)
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
||||
email: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
username: Mapped[str] = mapped_column(String(128), nullable=False)
|
||||
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
is_active: Mapped[bool] = mapped_column(
|
||||
Boolean, nullable=False, default=True)
|
||||
is_superuser: Mapped[bool] = mapped_column(
|
||||
Boolean, nullable=False, default=False)
|
||||
last_login_at: Mapped[datetime | None] = mapped_column(
|
||||
DateTime(timezone=True), nullable=True
|
||||
)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, server_default=func.now()
|
||||
)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
|
||||
)
|
||||
|
||||
role_assignments: Mapped[List["UserRole"]] = relationship(
|
||||
"UserRole",
|
||||
back_populates="user",
|
||||
cascade="all, delete-orphan",
|
||||
foreign_keys="UserRole.user_id",
|
||||
)
|
||||
roles: Mapped[List["Role"]] = relationship(
|
||||
"Role",
|
||||
secondary="user_roles",
|
||||
primaryjoin="User.id == UserRole.user_id",
|
||||
secondaryjoin="Role.id == UserRole.role_id",
|
||||
viewonly=True,
|
||||
back_populates="users",
|
||||
)
|
||||
|
||||
def set_password(self, raw_password: str) -> None:
|
||||
"""Hash and store a password for the user."""
|
||||
|
||||
self.password_hash = self.hash_password(raw_password)
|
||||
|
||||
@staticmethod
|
||||
def hash_password(raw_password: str) -> str:
|
||||
"""Return the Argon2 hash for a clear-text password."""
|
||||
|
||||
return password_context.hash(raw_password)
|
||||
|
||||
def verify_password(self, candidate_password: str) -> bool:
|
||||
"""Validate a password against the stored hash."""
|
||||
|
||||
if not self.password_hash:
|
||||
return False
|
||||
return password_context.verify(candidate_password, self.password_hash)
|
||||
|
||||
def __repr__(self) -> str: # pragma: no cover - helpful for debugging
|
||||
return f"User(id={self.id!r}, email={self.email!r})"
|
||||
|
||||
|
||||
class Role(Base):
|
||||
"""Role encapsulating a set of permissions."""
|
||||
|
||||
__tablename__ = "roles"
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
||||
name: Mapped[str] = mapped_column(String(64), nullable=False, unique=True)
|
||||
display_name: Mapped[str] = mapped_column(String(128), nullable=False)
|
||||
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, server_default=func.now()
|
||||
)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()
|
||||
)
|
||||
|
||||
assignments: Mapped[List["UserRole"]] = relationship(
|
||||
"UserRole",
|
||||
back_populates="role",
|
||||
cascade="all, delete-orphan",
|
||||
foreign_keys="UserRole.role_id",
|
||||
)
|
||||
users: Mapped[List["User"]] = relationship(
|
||||
"User",
|
||||
secondary="user_roles",
|
||||
primaryjoin="Role.id == UserRole.role_id",
|
||||
secondaryjoin="User.id == UserRole.user_id",
|
||||
viewonly=True,
|
||||
back_populates="roles",
|
||||
)
|
||||
|
||||
def __repr__(self) -> str: # pragma: no cover - helpful for debugging
|
||||
return f"Role(id={self.id!r}, name={self.name!r})"
|
||||
|
||||
|
||||
class UserRole(Base):
|
||||
"""Association between users and roles with assignment metadata."""
|
||||
|
||||
__tablename__ = "user_roles"
|
||||
__table_args__ = (
|
||||
UniqueConstraint("user_id", "role_id", name="uq_user_roles_user_role"),
|
||||
)
|
||||
|
||||
user_id: Mapped[int] = mapped_column(
|
||||
Integer,
|
||||
ForeignKey("users.id", ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
)
|
||||
role_id: Mapped[int] = mapped_column(
|
||||
Integer,
|
||||
ForeignKey("roles.id", ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
)
|
||||
granted_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), nullable=False, server_default=func.now()
|
||||
)
|
||||
granted_by: Mapped[Optional[int]] = mapped_column(
|
||||
Integer,
|
||||
ForeignKey("users.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
user: Mapped["User"] = relationship(
|
||||
"User",
|
||||
foreign_keys=[user_id],
|
||||
back_populates="role_assignments",
|
||||
)
|
||||
role: Mapped["Role"] = relationship(
|
||||
"Role",
|
||||
foreign_keys=[role_id],
|
||||
back_populates="assignments",
|
||||
)
|
||||
granted_by_user: Mapped[Optional["User"]] = relationship(
|
||||
"User",
|
||||
foreign_keys=[granted_by],
|
||||
)
|
||||
|
||||
def __repr__(self) -> str: # pragma: no cover - debugging helper
|
||||
return f"UserRole(user_id={self.user_id!r}, role_id={self.role_id!r})"
|
||||
@@ -9,5 +9,6 @@ jinja2
|
||||
pandas
|
||||
numpy
|
||||
passlib
|
||||
argon2-cffi
|
||||
python-jose
|
||||
python-multipart
|
||||
83
tests/test_user_model.py
Normal file
83
tests/test_user_model.py
Normal file
@@ -0,0 +1,83 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from config.database import Base
|
||||
from models import Role, User, UserRole
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def engine() -> Iterator:
|
||||
engine = create_engine("sqlite:///:memory:", future=True)
|
||||
Base.metadata.create_all(bind=engine)
|
||||
try:
|
||||
yield engine
|
||||
finally:
|
||||
Base.metadata.drop_all(bind=engine)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def session(engine) -> Iterator[Session]:
|
||||
TestingSession = sessionmaker(
|
||||
bind=engine, expire_on_commit=False, future=True)
|
||||
session = TestingSession()
|
||||
try:
|
||||
yield session
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def test_user_password_helpers() -> None:
|
||||
user = User(
|
||||
email="user@example.com",
|
||||
username="example",
|
||||
password_hash=User.hash_password("initial"),
|
||||
)
|
||||
|
||||
user.set_password("new-secret")
|
||||
|
||||
assert user.password_hash != "new-secret"
|
||||
assert user.verify_password("new-secret")
|
||||
assert not user.verify_password("wrong")
|
||||
|
||||
|
||||
def test_user_role_assignment(session: Session) -> None:
|
||||
grantor = User(
|
||||
email="admin@example.com",
|
||||
username="admin",
|
||||
password_hash=User.hash_password("admin-secret"),
|
||||
is_superuser=True,
|
||||
)
|
||||
analyst_role = Role(
|
||||
name="analyst",
|
||||
display_name="Analyst",
|
||||
description="Can review project dashboards",
|
||||
)
|
||||
analyst = User(
|
||||
email="analyst@example.com",
|
||||
username="analyst",
|
||||
password_hash=User.hash_password("analyst-secret"),
|
||||
)
|
||||
|
||||
assignment = UserRole(user=analyst, role=analyst_role,
|
||||
granted_by_user=grantor)
|
||||
|
||||
session.add_all([grantor, analyst_role, analyst, assignment])
|
||||
session.commit()
|
||||
|
||||
session.refresh(analyst)
|
||||
session.refresh(analyst_role)
|
||||
|
||||
# Relationship wrapper exposes the role without needing to traverse assignments manually
|
||||
assert len(analyst.role_assignments) == 1
|
||||
assert analyst.role_assignments[0].granted_by_user is grantor
|
||||
assert len(analyst.roles) == 1
|
||||
assert analyst.roles[0].name == "analyst"
|
||||
|
||||
# Ensure reverse relationship exposes the user
|
||||
assert len(analyst_role.assignments) == 1
|
||||
assert analyst_role.users[0].username == "analyst"
|
||||
Reference in New Issue
Block a user