2c6fdc03a8
Co-authored-by: Copilot <copilot@github.com>
119 lines
4.2 KiB
Python
119 lines
4.2 KiB
Python
"""Integration tests for admin endpoints."""
|
|
import os
|
|
import pytest
|
|
import pytest_asyncio
|
|
from httpx import AsyncClient, ASGITransport
|
|
|
|
from app.main import app
|
|
from app import db as db_module
|
|
|
|
os.environ.setdefault("JWT_SECRET", "test-secret-key-for-testing-only")
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db():
|
|
db_module._conn = None
|
|
db_module.init_db(":memory:")
|
|
yield
|
|
db_module.close_db()
|
|
db_module._conn = None
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def client(fresh_db):
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
|
yield ac
|
|
|
|
|
|
async def _register_login(client, email="user@example.com", password="secret123"):
|
|
await client.post("/auth/register", json={"email": email, "password": password})
|
|
resp = await client.post("/auth/login", json={"email": email, "password": password})
|
|
return resp.json()
|
|
|
|
|
|
async def _admin_token(client, email="admin@example.com", password="secret123"):
|
|
tokens = await _register_login(client, email, password)
|
|
me = await client.get("/users/me", headers={"Authorization": f"Bearer {tokens['access_token']}"})
|
|
db_module.get_conn().execute(
|
|
"UPDATE users SET role = 'admin' WHERE id = ?", [me.json()["id"]])
|
|
login = await client.post("/auth/login", json={"email": email, "password": password})
|
|
return login.json()["access_token"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /admin/stats
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_stats_as_admin(client):
|
|
await _register_login(client, "user1@example.com")
|
|
await _register_login(client, "user2@example.com")
|
|
token = await _admin_token(client)
|
|
|
|
resp = await client.get("/admin/stats", headers={"Authorization": f"Bearer {token}"})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["users"]["total"] == 3 # 2 users + 1 admin
|
|
assert "by_role" in data["users"]
|
|
assert "refresh_tokens" in data
|
|
|
|
|
|
async def test_stats_as_regular_user(client):
|
|
tokens = await _register_login(client)
|
|
resp = await client.get("/admin/stats", headers={"Authorization": f"Bearer {tokens['access_token']}"})
|
|
assert resp.status_code == 403
|
|
|
|
|
|
async def test_stats_unauthenticated(client):
|
|
resp = await client.get("/admin/stats")
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /admin/health/db
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_db_health_as_admin(client):
|
|
token = await _admin_token(client)
|
|
resp = await client.get("/admin/health/db", headers={"Authorization": f"Bearer {token}"})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["status"] == "ok"
|
|
|
|
|
|
async def test_db_health_as_regular_user(client):
|
|
tokens = await _register_login(client)
|
|
resp = await client.get("/admin/health/db", headers={"Authorization": f"Bearer {tokens['access_token']}"})
|
|
assert resp.status_code == 403
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /admin/tokens/purge
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_purge_removes_revoked_tokens(client):
|
|
tokens = await _register_login(client, "user@example.com")
|
|
refresh_token = tokens["refresh_token"]
|
|
|
|
# Logout to revoke the token
|
|
await client.post("/auth/logout", json={"refresh_token": refresh_token})
|
|
|
|
token = await _admin_token(client)
|
|
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {token}"})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["deleted"] >= 1
|
|
|
|
|
|
async def test_purge_nothing_to_remove(client):
|
|
token = await _admin_token(client)
|
|
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {token}"})
|
|
assert resp.status_code == 200
|
|
# Admin login issued one active token — nothing to purge
|
|
assert resp.json()["deleted"] == 0
|
|
|
|
|
|
async def test_purge_as_regular_user(client):
|
|
tokens = await _register_login(client)
|
|
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {tokens['access_token']}"})
|
|
assert resp.status_code == 403
|