Files
ai.allucanget.biz/backend/tests/test_admin.py
T
zwitschi 712c556032 feat: enhance model caching and output modalities handling
- Updated `refresh_models_cache` to include output modalities in the models cache.
- Added `get_model_output_modalities` function to retrieve output modalities for a specific model.
- Modified tests to cover new functionality for output modalities.
- Updated OpenRouter video generation functions to support audio generation and improved error handling.
- Enhanced dashboard to display generated images and videos.
- Refactored frontend templates to accommodate new data structures for generated content.
- Adjusted tests to validate changes in model handling and dashboard rendering.

Co-authored-by: Copilot <copilot@github.com>
2026-04-29 15:20:48 +02:00

120 lines
4.3 KiB
Python

"""Integration tests for admin endpoints."""
import os
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from app.main import app
from app import db as db_module
os.environ.setdefault("JWT_SECRET", "test-secret-key-for-testing-only")
@pytest.fixture(autouse=True)
def fresh_db():
db_module._conn = None
db_module.init_db(":memory:")
yield
db_module.close_db()
db_module._conn = None
@pytest_asyncio.fixture
async def client(fresh_db):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
async def _register_login(client, email="user@example.com", password="secret123"):
await client.post("/auth/register", json={"email": email, "password": password})
resp = await client.post("/auth/login", json={"email": email, "password": password})
return resp.json()
async def _admin_token(client, email="admin@example.com", password="secret123"):
tokens = await _register_login(client, email, password)
me = await client.get("/users/me", headers={"Authorization": f"Bearer {tokens['access_token']}"})
db_module.get_conn().execute(
"UPDATE users SET role = 'admin' WHERE id = ?", [me.json()["id"]])
login = await client.post("/auth/login", json={"email": email, "password": password})
return login.json()["access_token"]
# ---------------------------------------------------------------------------
# GET /admin/stats
# ---------------------------------------------------------------------------
async def test_stats_as_admin(client):
await _register_login(client, "user1@example.com")
await _register_login(client, "user2@example.com")
token = await _admin_token(client)
resp = await client.get("/admin/stats", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
data = resp.json()
# 2 users + 1 admin + 1 seeded admin (ai@allucanget.biz)
assert data["users"]["total"] == 4
assert "by_role" in data["users"]
assert "refresh_tokens" in data
async def test_stats_as_regular_user(client):
tokens = await _register_login(client)
resp = await client.get("/admin/stats", headers={"Authorization": f"Bearer {tokens['access_token']}"})
assert resp.status_code == 403
async def test_stats_unauthenticated(client):
resp = await client.get("/admin/stats")
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# GET /admin/health/db
# ---------------------------------------------------------------------------
async def test_db_health_as_admin(client):
token = await _admin_token(client)
resp = await client.get("/admin/health/db", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
async def test_db_health_as_regular_user(client):
tokens = await _register_login(client)
resp = await client.get("/admin/health/db", headers={"Authorization": f"Bearer {tokens['access_token']}"})
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# POST /admin/tokens/purge
# ---------------------------------------------------------------------------
async def test_purge_removes_revoked_tokens(client):
tokens = await _register_login(client, "user@example.com")
refresh_token = tokens["refresh_token"]
# Logout to revoke the token
await client.post("/auth/logout", json={"refresh_token": refresh_token})
token = await _admin_token(client)
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
data = resp.json()
assert data["deleted"] >= 1
async def test_purge_nothing_to_remove(client):
token = await _admin_token(client)
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
# Admin login issued one active token — nothing to purge
assert resp.json()["deleted"] == 0
async def test_purge_as_regular_user(client):
tokens = await _register_login(client)
resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {tokens['access_token']}"})
assert resp.status_code == 403