Files
ai.allucanget.biz/backend/tests/test_users.py
T

207 lines
7.8 KiB
Python

"""Integration tests for user management endpoints."""
import os
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from app.main import app
from app import db as db_module
os.environ.setdefault("JWT_SECRET", "test-secret-key-for-testing-only")
@pytest.fixture(autouse=True)
def fresh_db():
db_module._conn = None
db_module.init_db(":memory:")
yield
db_module.close_db()
db_module._conn = None
@pytest_asyncio.fixture
async def client(fresh_db):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
async def _register_and_login(client, email="user@example.com", password="secret123"):
await client.post("/auth/register", json={"email": email, "password": password})
resp = await client.post("/auth/login", json={"email": email, "password": password})
return resp.json()["access_token"]
async def _make_admin(user_id: str):
"""Directly set a user's role to admin in the DB."""
conn = db_module.get_conn()
conn.execute("UPDATE users SET role = 'admin' WHERE id = ?", [user_id])
# ---------------------------------------------------------------------------
# GET /users/me
# ---------------------------------------------------------------------------
async def test_get_me(client):
token = await _register_and_login(client)
resp = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
data = resp.json()
assert data["email"] == "user@example.com"
assert data["role"] == "user"
assert "id" in data
async def test_get_me_unauthenticated(client):
resp = await client.get("/users/me")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# PUT /users/me
# ---------------------------------------------------------------------------
async def test_update_me_email(client):
token = await _register_and_login(client)
resp = await client.put(
"/users/me",
json={"email": "new@example.com"},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200
assert resp.json()["email"] == "new@example.com"
async def test_update_me_password(client):
token = await _register_and_login(client)
resp = await client.put(
"/users/me",
json={"password": "newpassword123"},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200
# Verify new password works for login
login = await client.post(
"/auth/login", json={"email": "user@example.com", "password": "newpassword123"}
)
assert login.status_code == 200
async def test_update_me_duplicate_email(client):
await _register_and_login(client, "other@example.com")
token = await _register_and_login(client, "user@example.com")
resp = await client.put(
"/users/me",
json={"email": "other@example.com"},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 409
# ---------------------------------------------------------------------------
# GET /users (admin only)
# ---------------------------------------------------------------------------
async def test_list_users_as_admin(client):
token = await _register_and_login(client)
me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"})
await _make_admin(me.json()["id"])
# Re-login to get a token with admin role
login = await client.post(
"/auth/login", json={"email": "user@example.com", "password": "secret123"}
)
admin_token = login.json()["access_token"]
resp = await client.get("/users", headers={"Authorization": f"Bearer {admin_token}"})
assert resp.status_code == 200
assert isinstance(resp.json(), list)
assert len(resp.json()) == 1
async def test_list_users_as_regular_user(client):
token = await _register_and_login(client)
resp = await client.get("/users", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# DELETE /users/{id} (admin only)
# ---------------------------------------------------------------------------
async def test_delete_user_as_admin(client):
# Register target user
await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"})
target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"})
target_token = target_resp.json()["access_token"]
target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_token}"})
target_id = target_me.json()["id"]
# Register admin
token = await _register_and_login(client)
me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"})
await _make_admin(me.json()["id"])
login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"})
admin_token = login.json()["access_token"]
resp = await client.delete(f"/users/{target_id}", headers={"Authorization": f"Bearer {admin_token}"})
assert resp.status_code == 204
async def test_delete_own_account_forbidden(client):
token = await _register_and_login(client)
me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"})
user_id = me.json()["id"]
await _make_admin(user_id)
login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"})
admin_token = login.json()["access_token"]
resp = await client.delete(f"/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}"})
assert resp.status_code == 400
# ---------------------------------------------------------------------------
# PUT /users/{id}/role (admin only)
# ---------------------------------------------------------------------------
async def test_set_role_as_admin(client):
# Register target
await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"})
target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"})
target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_resp.json()['access_token']}"})
target_id = target_me.json()["id"]
# Register admin
token = await _register_and_login(client)
me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"})
await _make_admin(me.json()["id"])
login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"})
admin_token = login.json()["access_token"]
resp = await client.put(
f"/users/{target_id}/role",
json={"role": "admin"},
headers={"Authorization": f"Bearer {admin_token}"},
)
assert resp.status_code == 200
assert resp.json()["role"] == "admin"
async def test_set_invalid_role(client):
await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"})
target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"})
target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_resp.json()['access_token']}"})
target_id = target_me.json()["id"]
token = await _register_and_login(client)
me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"})
await _make_admin(me.json()["id"])
login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"})
admin_token = login.json()["access_token"]
resp = await client.put(
f"/users/{target_id}/role",
json={"role": "superuser"},
headers={"Authorization": f"Bearer {admin_token}"},
)
assert resp.status_code == 422