"""Integration tests for user management endpoints.""" import os import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from app.main import app from app import db as db_module os.environ.setdefault("JWT_SECRET", "test-secret-key-for-testing-only") @pytest.fixture(autouse=True) def fresh_db(): db_module._conn = None db_module.init_db(":memory:") yield db_module.close_db() db_module._conn = None @pytest_asyncio.fixture async def client(fresh_db): transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac async def _register_and_login(client, email="user@example.com", password="secret123"): await client.post("/auth/register", json={"email": email, "password": password}) resp = await client.post("/auth/login", json={"email": email, "password": password}) return resp.json()["access_token"] async def _make_admin(user_id: str): """Directly set a user's role to admin in the DB.""" conn = db_module.get_conn() conn.execute("UPDATE users SET role = 'admin' WHERE id = ?", [user_id]) # --------------------------------------------------------------------------- # GET /users/me # --------------------------------------------------------------------------- async def test_get_me(client): token = await _register_and_login(client) resp = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) assert resp.status_code == 200 data = resp.json() assert data["email"] == "user@example.com" assert data["role"] == "user" assert "id" in data async def test_get_me_unauthenticated(client): resp = await client.get("/users/me") assert resp.status_code == 401 # --------------------------------------------------------------------------- # PUT /users/me # --------------------------------------------------------------------------- async def test_update_me_email(client): token = await _register_and_login(client) resp = await client.put( "/users/me", json={"email": "new@example.com"}, headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 200 assert resp.json()["email"] == "new@example.com" async def test_update_me_password(client): token = await _register_and_login(client) resp = await client.put( "/users/me", json={"password": "newpassword123"}, headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 200 # Verify new password works for login login = await client.post( "/auth/login", json={"email": "user@example.com", "password": "newpassword123"} ) assert login.status_code == 200 async def test_update_me_duplicate_email(client): await _register_and_login(client, "other@example.com") token = await _register_and_login(client, "user@example.com") resp = await client.put( "/users/me", json={"email": "other@example.com"}, headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 409 # --------------------------------------------------------------------------- # GET /users (admin only) # --------------------------------------------------------------------------- async def test_list_users_as_admin(client): token = await _register_and_login(client) me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) await _make_admin(me.json()["id"]) # Re-login to get a token with admin role login = await client.post( "/auth/login", json={"email": "user@example.com", "password": "secret123"} ) admin_token = login.json()["access_token"] resp = await client.get("/users", headers={"Authorization": f"Bearer {admin_token}"}) assert resp.status_code == 200 assert isinstance(resp.json(), list) assert len(resp.json()) >= 1 emails = [u["email"] for u in resp.json()] assert "user@example.com" in emails async def test_list_users_as_regular_user(client): token = await _register_and_login(client) resp = await client.get("/users", headers={"Authorization": f"Bearer {token}"}) assert resp.status_code == 403 # --------------------------------------------------------------------------- # DELETE /users/{id} (admin only) # --------------------------------------------------------------------------- async def test_delete_user_as_admin(client): # Register target user await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"}) target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"}) target_token = target_resp.json()["access_token"] target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_token}"}) target_id = target_me.json()["id"] # Register admin token = await _register_and_login(client) me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) await _make_admin(me.json()["id"]) login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) admin_token = login.json()["access_token"] resp = await client.delete(f"/users/{target_id}", headers={"Authorization": f"Bearer {admin_token}"}) assert resp.status_code == 204 async def test_delete_own_account_forbidden(client): token = await _register_and_login(client) me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) user_id = me.json()["id"] await _make_admin(user_id) login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) admin_token = login.json()["access_token"] resp = await client.delete(f"/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}"}) assert resp.status_code == 400 # --------------------------------------------------------------------------- # PUT /users/{id}/role (admin only) # --------------------------------------------------------------------------- async def test_set_role_as_admin(client): # Register target await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"}) target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"}) target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_resp.json()['access_token']}"}) target_id = target_me.json()["id"] # Register admin token = await _register_and_login(client) me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) await _make_admin(me.json()["id"]) login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) admin_token = login.json()["access_token"] resp = await client.put( f"/users/{target_id}/role", json={"role": "admin"}, headers={"Authorization": f"Bearer {admin_token}"}, ) assert resp.status_code == 200 assert resp.json()["role"] == "admin" async def test_set_invalid_role(client): await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"}) target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"}) target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_resp.json()['access_token']}"}) target_id = target_me.json()["id"] token = await _register_and_login(client) me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) await _make_admin(me.json()["id"]) login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) admin_token = login.json()["access_token"] resp = await client.put( f"/users/{target_id}/role", json={"role": "superuser"}, headers={"Authorization": f"Bearer {admin_token}"}, ) assert resp.status_code == 422