diff --git a/models/__init__.py b/models/__init__.py index 55e89ac..305ffb7 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -14,6 +14,7 @@ from .metadata import ( from .project import MiningOperationType, Project from .scenario import Scenario, ScenarioStatus from .simulation_parameter import DistributionType, SimulationParameter +from .user import Role, User, UserRole, password_context __all__ = [ "FinancialCategory", @@ -32,4 +33,8 @@ __all__ = [ "STOCHASTIC_VARIABLE_METADATA", "ResourceDescriptor", "StochasticVariableDescriptor", + "User", + "Role", + "UserRole", + "password_context", ] diff --git a/models/user.py b/models/user.py new file mode 100644 index 0000000..67db19e --- /dev/null +++ b/models/user.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional + +from passlib.context import CryptContext +from sqlalchemy import ( + Boolean, + DateTime, + ForeignKey, + Integer, + String, + Text, + UniqueConstraint, +) +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.sql import func + +from config.database import Base + +# Configure password hashing strategy. Argon2 provides strong resistance against +# GPU-based cracking attempts, aligning with the security plan. +password_context = CryptContext(schemes=["argon2"], deprecated="auto") + + +class User(Base): + """Authenticated platform user with optional elevated privileges.""" + + __tablename__ = "users" + __table_args__ = ( + UniqueConstraint("email", name="uq_users_email"), + UniqueConstraint("username", name="uq_users_username"), + ) + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + email: Mapped[str] = mapped_column(String(255), nullable=False) + username: Mapped[str] = mapped_column(String(128), nullable=False) + password_hash: Mapped[str] = mapped_column(String(255), nullable=False) + is_active: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=True) + is_superuser: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=False) + last_login_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now() + ) + + role_assignments: Mapped[List["UserRole"]] = relationship( + "UserRole", + back_populates="user", + cascade="all, delete-orphan", + foreign_keys="UserRole.user_id", + ) + roles: Mapped[List["Role"]] = relationship( + "Role", + secondary="user_roles", + primaryjoin="User.id == UserRole.user_id", + secondaryjoin="Role.id == UserRole.role_id", + viewonly=True, + back_populates="users", + ) + + def set_password(self, raw_password: str) -> None: + """Hash and store a password for the user.""" + + self.password_hash = self.hash_password(raw_password) + + @staticmethod + def hash_password(raw_password: str) -> str: + """Return the Argon2 hash for a clear-text password.""" + + return password_context.hash(raw_password) + + def verify_password(self, candidate_password: str) -> bool: + """Validate a password against the stored hash.""" + + if not self.password_hash: + return False + return password_context.verify(candidate_password, self.password_hash) + + def __repr__(self) -> str: # pragma: no cover - helpful for debugging + return f"User(id={self.id!r}, email={self.email!r})" + + +class Role(Base): + """Role encapsulating a set of permissions.""" + + __tablename__ = "roles" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + name: Mapped[str] = mapped_column(String(64), nullable=False, unique=True) + display_name: Mapped[str] = mapped_column(String(128), nullable=False) + description: Mapped[str | None] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now() + ) + + assignments: Mapped[List["UserRole"]] = relationship( + "UserRole", + back_populates="role", + cascade="all, delete-orphan", + foreign_keys="UserRole.role_id", + ) + users: Mapped[List["User"]] = relationship( + "User", + secondary="user_roles", + primaryjoin="Role.id == UserRole.role_id", + secondaryjoin="User.id == UserRole.user_id", + viewonly=True, + back_populates="roles", + ) + + def __repr__(self) -> str: # pragma: no cover - helpful for debugging + return f"Role(id={self.id!r}, name={self.name!r})" + + +class UserRole(Base): + """Association between users and roles with assignment metadata.""" + + __tablename__ = "user_roles" + __table_args__ = ( + UniqueConstraint("user_id", "role_id", name="uq_user_roles_user_role"), + ) + + user_id: Mapped[int] = mapped_column( + Integer, + ForeignKey("users.id", ondelete="CASCADE"), + primary_key=True, + ) + role_id: Mapped[int] = mapped_column( + Integer, + ForeignKey("roles.id", ondelete="CASCADE"), + primary_key=True, + ) + granted_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + granted_by: Mapped[Optional[int]] = mapped_column( + Integer, + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + + user: Mapped["User"] = relationship( + "User", + foreign_keys=[user_id], + back_populates="role_assignments", + ) + role: Mapped["Role"] = relationship( + "Role", + foreign_keys=[role_id], + back_populates="assignments", + ) + granted_by_user: Mapped[Optional["User"]] = relationship( + "User", + foreign_keys=[granted_by], + ) + + def __repr__(self) -> str: # pragma: no cover - debugging helper + return f"UserRole(user_id={self.user_id!r}, role_id={self.role_id!r})" diff --git a/requirements.txt b/requirements.txt index 1ae5423..7227da4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,5 +9,6 @@ jinja2 pandas numpy passlib +argon2-cffi python-jose python-multipart \ No newline at end of file diff --git a/tests/test_user_model.py b/tests/test_user_model.py new file mode 100644 index 0000000..b03552c --- /dev/null +++ b/tests/test_user_model.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from collections.abc import Iterator + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker + +from config.database import Base +from models import Role, User, UserRole + + +@pytest.fixture() +def engine() -> Iterator: + engine = create_engine("sqlite:///:memory:", future=True) + Base.metadata.create_all(bind=engine) + try: + yield engine + finally: + Base.metadata.drop_all(bind=engine) + + +@pytest.fixture() +def session(engine) -> Iterator[Session]: + TestingSession = sessionmaker( + bind=engine, expire_on_commit=False, future=True) + session = TestingSession() + try: + yield session + finally: + session.close() + + +def test_user_password_helpers() -> None: + user = User( + email="user@example.com", + username="example", + password_hash=User.hash_password("initial"), + ) + + user.set_password("new-secret") + + assert user.password_hash != "new-secret" + assert user.verify_password("new-secret") + assert not user.verify_password("wrong") + + +def test_user_role_assignment(session: Session) -> None: + grantor = User( + email="admin@example.com", + username="admin", + password_hash=User.hash_password("admin-secret"), + is_superuser=True, + ) + analyst_role = Role( + name="analyst", + display_name="Analyst", + description="Can review project dashboards", + ) + analyst = User( + email="analyst@example.com", + username="analyst", + password_hash=User.hash_password("analyst-secret"), + ) + + assignment = UserRole(user=analyst, role=analyst_role, + granted_by_user=grantor) + + session.add_all([grantor, analyst_role, analyst, assignment]) + session.commit() + + session.refresh(analyst) + session.refresh(analyst_role) + + # Relationship wrapper exposes the role without needing to traverse assignments manually + assert len(analyst.role_assignments) == 1 + assert analyst.role_assignments[0].granted_by_user is grantor + assert len(analyst.roles) == 1 + assert analyst.roles[0].name == "analyst" + + # Ensure reverse relationship exposes the user + assert len(analyst_role.assignments) == 1 + assert analyst_role.users[0].username == "analyst"