Files
calminer/tests/test_auth_repositories.py

139 lines
4.0 KiB
Python

from __future__ import annotations
from collections.abc import Iterator
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from config.database import Base
from models import Role, User
from services.repositories import (
RoleRepository,
UserRepository,
ensure_admin_user,
ensure_default_roles,
)
from services.unit_of_work import UnitOfWork
from tests.utils.security import random_password
@pytest.fixture()
def engine() -> Iterator:
engine = create_engine("sqlite:///:memory:", future=True)
Base.metadata.create_all(bind=engine)
try:
yield engine
finally:
Base.metadata.drop_all(bind=engine)
@pytest.fixture()
def session(engine) -> Iterator[Session]:
TestingSession = sessionmaker(
bind=engine, expire_on_commit=False, future=True)
db = TestingSession()
try:
yield db
finally:
db.close()
def test_role_repository_create_and_lookup(session: Session) -> None:
repo = RoleRepository(session)
role = Role(name="custom", display_name="Custom",
description="Custom role")
repo.create(role)
retrieved = repo.get(role.id)
assert retrieved.name == "custom"
assert repo.get_by_name("custom") is retrieved
assert repo.list()[0].name == "custom"
def test_user_repository_assign_and_revoke_role(session: Session) -> None:
role_repo = RoleRepository(session)
user_repo = UserRepository(session)
analyst = role_repo.create(
Role(name="analyst", display_name="Analyst", description="Analyzes data")
)
user = User(
email="user@example.com",
username="user",
password_hash=User.hash_password(random_password()),
)
user_repo.create(user)
assignment = user_repo.assign_role(
user_id=user.id, role_id=analyst.id, granted_by=None)
assert assignment.role_id == analyst.id
refreshed = user_repo.get(user.id, with_roles=True)
assert refreshed.roles[0].name == "analyst"
user_repo.revoke_role(user_id=user.id, role_id=analyst.id)
refreshed = user_repo.get(user.id, with_roles=True)
assert refreshed.roles == []
def test_default_role_and_admin_helpers(session: Session) -> None:
role_repo = RoleRepository(session)
user_repo = UserRepository(session)
roles = ensure_default_roles(role_repo)
assert {role.name for role in roles} == {
"admin", "project_manager", "analyst", "viewer"}
admin_password = random_password()
ensure_admin_user(
user_repo,
role_repo,
email="admin@example.com",
username="admin",
password=admin_password,
)
admin = user_repo.get_by_email("admin@example.com", with_roles=True)
assert admin is not None
assert admin.is_superuser
assert {role.name for role in admin.roles} >= {"admin"}
# Idempotent behaviour on subsequent calls
ensure_admin_user(
user_repo,
role_repo,
email="admin@example.com",
username="admin",
password=admin_password,
)
admin_again = user_repo.get_by_email("admin@example.com", with_roles=True)
assert admin_again is not None
assert len(admin_again.roles) == len(
{role.name for role in admin_again.roles})
def test_unit_of_work_exposes_auth_repositories(engine) -> None:
TestingSession = sessionmaker(
bind=engine, expire_on_commit=False, future=True)
with UnitOfWork(session_factory=TestingSession) as uow:
assert uow.users is not None
assert uow.roles is not None
roles = uow.ensure_default_roles()
assert any(role.name == "admin" for role in roles)
uow.ensure_admin_user(
email="uow-admin@example.com",
username="uow-admin",
password=random_password(),
)
admin = uow.users.get_by_email(
"uow-admin@example.com", with_roles=True)
assert admin is not None
assert admin.is_superuser
assert any(role.name == "admin" for role in admin.roles)