add integration tests for admin, auth, user management, and database initialization
This commit is contained in:
@@ -0,0 +1,118 @@
|
||||
"""Integration tests for admin endpoints."""
|
||||
import os
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from httpx import AsyncClient, ASGITransport
|
||||
|
||||
from backend.app.main import app
|
||||
from backend.app import db as db_module
|
||||
|
||||
os.environ.setdefault("JWT_SECRET", "test-secret-key-for-testing-only")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def fresh_db():
|
||||
db_module._conn = None
|
||||
db_module.init_db(":memory:")
|
||||
yield
|
||||
db_module.close_db()
|
||||
db_module._conn = None
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def client(fresh_db):
|
||||
transport = ASGITransport(app=app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
yield ac
|
||||
|
||||
|
||||
async def _register_login(client, email="user@example.com", password="secret123"):
|
||||
await client.post("/auth/register", json={"email": email, "password": password})
|
||||
resp = await client.post("/auth/login", json={"email": email, "password": password})
|
||||
return resp.json()
|
||||
|
||||
|
||||
async def _admin_token(client, email="admin@example.com", password="secret123"):
|
||||
tokens = await _register_login(client, email, password)
|
||||
me = await client.get("/users/me", headers={"Authorization": f"Bearer {tokens['access_token']}"})
|
||||
db_module.get_conn().execute(
|
||||
"UPDATE users SET role = 'admin' WHERE id = ?", [me.json()["id"]])
|
||||
login = await client.post("/auth/login", json={"email": email, "password": password})
|
||||
return login.json()["access_token"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /admin/stats
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def test_stats_as_admin(client):
|
||||
await _register_login(client, "user1@example.com")
|
||||
await _register_login(client, "user2@example.com")
|
||||
token = await _admin_token(client)
|
||||
|
||||
resp = await client.get("/admin/stats", headers={"Authorization": f"Bearer {token}"})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["users"]["total"] == 3 # 2 users + 1 admin
|
||||
assert "by_role" in data["users"]
|
||||
assert "refresh_tokens" in data
|
||||
|
||||
|
||||
async def test_stats_as_regular_user(client):
|
||||
tokens = await _register_login(client)
|
||||
resp = await client.get("/admin/stats", headers={"Authorization": f"Bearer {tokens['access_token']}"})
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
async def test_stats_unauthenticated(client):
|
||||
resp = await client.get("/admin/stats")
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /admin/health/db
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def test_db_health_as_admin(client):
|
||||
token = await _admin_token(client)
|
||||
resp = await client.get("/admin/health/db", headers={"Authorization": f"Bearer {token}"})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ok"
|
||||
|
||||
|
||||
async def test_db_health_as_regular_user(client):
|
||||
tokens = await _register_login(client)
|
||||
resp = await client.get("/admin/health/db", headers={"Authorization": f"Bearer {tokens['access_token']}"})
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /admin/tokens/purge
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def test_purge_removes_revoked_tokens(client):
|
||||
tokens = await _register_login(client, "user@example.com")
|
||||
refresh_token = tokens["refresh_token"]
|
||||
|
||||
# Logout to revoke the token
|
||||
await client.post("/auth/logout", json={"refresh_token": refresh_token})
|
||||
|
||||
token = await _admin_token(client)
|
||||
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {token}"})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["deleted"] >= 1
|
||||
|
||||
|
||||
async def test_purge_nothing_to_remove(client):
|
||||
token = await _admin_token(client)
|
||||
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {token}"})
|
||||
assert resp.status_code == 200
|
||||
# Admin login issued one active token — nothing to purge
|
||||
assert resp.json()["deleted"] == 0
|
||||
|
||||
|
||||
async def test_purge_as_regular_user(client):
|
||||
tokens = await _register_login(client)
|
||||
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {tokens['access_token']}"})
|
||||
assert resp.status_code == 403
|
||||
Reference in New Issue
Block a user