diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/tests/test_admin.py b/backend/tests/test_admin.py new file mode 100644 index 0000000..c273dff --- /dev/null +++ b/backend/tests/test_admin.py @@ -0,0 +1,118 @@ +"""Integration tests for admin endpoints.""" +import os +import pytest +import pytest_asyncio +from httpx import AsyncClient, ASGITransport + +from backend.app.main import app +from backend.app import db as db_module + +os.environ.setdefault("JWT_SECRET", "test-secret-key-for-testing-only") + + +@pytest.fixture(autouse=True) +def fresh_db(): + db_module._conn = None + db_module.init_db(":memory:") + yield + db_module.close_db() + db_module._conn = None + + +@pytest_asyncio.fixture +async def client(fresh_db): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + yield ac + + +async def _register_login(client, email="user@example.com", password="secret123"): + await client.post("/auth/register", json={"email": email, "password": password}) + resp = await client.post("/auth/login", json={"email": email, "password": password}) + return resp.json() + + +async def _admin_token(client, email="admin@example.com", password="secret123"): + tokens = await _register_login(client, email, password) + me = await client.get("/users/me", headers={"Authorization": f"Bearer {tokens['access_token']}"}) + db_module.get_conn().execute( + "UPDATE users SET role = 'admin' WHERE id = ?", [me.json()["id"]]) + login = await client.post("/auth/login", json={"email": email, "password": password}) + return login.json()["access_token"] + + +# --------------------------------------------------------------------------- +# GET /admin/stats +# --------------------------------------------------------------------------- + +async def test_stats_as_admin(client): + await _register_login(client, "user1@example.com") + await _register_login(client, "user2@example.com") + token = await _admin_token(client) + + resp = await client.get("/admin/stats", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + data = resp.json() + assert data["users"]["total"] == 3 # 2 users + 1 admin + assert "by_role" in data["users"] + assert "refresh_tokens" in data + + +async def test_stats_as_regular_user(client): + tokens = await _register_login(client) + resp = await client.get("/admin/stats", headers={"Authorization": f"Bearer {tokens['access_token']}"}) + assert resp.status_code == 403 + + +async def test_stats_unauthenticated(client): + resp = await client.get("/admin/stats") + assert resp.status_code == 401 + + +# --------------------------------------------------------------------------- +# GET /admin/health/db +# --------------------------------------------------------------------------- + +async def test_db_health_as_admin(client): + token = await _admin_token(client) + resp = await client.get("/admin/health/db", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + assert resp.json()["status"] == "ok" + + +async def test_db_health_as_regular_user(client): + tokens = await _register_login(client) + resp = await client.get("/admin/health/db", headers={"Authorization": f"Bearer {tokens['access_token']}"}) + assert resp.status_code == 403 + + +# --------------------------------------------------------------------------- +# POST /admin/tokens/purge +# --------------------------------------------------------------------------- + +async def test_purge_removes_revoked_tokens(client): + tokens = await _register_login(client, "user@example.com") + refresh_token = tokens["refresh_token"] + + # Logout to revoke the token + await client.post("/auth/logout", json={"refresh_token": refresh_token}) + + token = await _admin_token(client) + resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + data = resp.json() + assert data["deleted"] >= 1 + + +async def test_purge_nothing_to_remove(client): + token = await _admin_token(client) + resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + # Admin login issued one active token — nothing to purge + assert resp.json()["deleted"] == 0 + + +async def test_purge_as_regular_user(client): + tokens = await _register_login(client) + resp = await client.post("/admin/tokens/purge", headers={"Authorization": f"Bearer {tokens['access_token']}"}) + assert resp.status_code == 403 diff --git a/backend/tests/test_auth.py b/backend/tests/test_auth.py new file mode 100644 index 0000000..4e74cb0 --- /dev/null +++ b/backend/tests/test_auth.py @@ -0,0 +1,103 @@ +"""Integration tests for auth endpoints using in-memory DuckDB.""" +from backend.app.main import app +from backend.app import db as db_module +from httpx import AsyncClient, ASGITransport +import os +import pytest +import pytest_asyncio + +os.environ.setdefault("JWT_SECRET", "test-secret-key-for-testing-only") + + +@pytest.fixture(autouse=True) +def fresh_db(): + """Reset the DB singleton to a fresh in-memory DB for each test.""" + db_module._conn = None + db_module.init_db(":memory:") + yield + db_module.close_db() + db_module._conn = None + + +@pytest_asyncio.fixture +async def client(fresh_db): + """HTTP client wired to the app; explicitly depends on fresh_db to guarantee ordering.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + yield ac + + +async def test_register_success(client): + resp = await client.post("/auth/register", json={"email": "user@example.com", "password": "secret123"}) + assert resp.status_code == 201 + data = resp.json() + assert data["email"] == "user@example.com" + assert data["role"] == "user" + assert "id" in data + + +async def test_register_duplicate_email(client): + payload = {"email": "dup@example.com", "password": "secret123"} + await client.post("/auth/register", json=payload) + resp = await client.post("/auth/register", json=payload) + assert resp.status_code == 409 + + +async def test_login_success(client): + await client.post("/auth/register", json={"email": "user@example.com", "password": "secret123"}) + resp = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) + assert resp.status_code == 200 + data = resp.json() + assert "access_token" in data + assert "refresh_token" in data + assert data["token_type"] == "bearer" + + +async def test_login_wrong_password(client): + await client.post("/auth/register", json={"email": "user@example.com", "password": "secret123"}) + resp = await client.post("/auth/login", json={"email": "user@example.com", "password": "wrong"}) + assert resp.status_code == 401 + + +async def test_login_unknown_user(client): + resp = await client.post("/auth/login", json={"email": "nobody@example.com", "password": "x"}) + assert resp.status_code == 401 + + +async def test_refresh_success(client): + await client.post("/auth/register", json={"email": "user@example.com", "password": "secret123"}) + login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) + refresh_token = login.json()["refresh_token"] + + resp = await client.post("/auth/refresh", json={"refresh_token": refresh_token}) + assert resp.status_code == 200 + data = resp.json() + assert "access_token" in data + assert "refresh_token" in data + # New refresh token must differ (rotation) + assert data["refresh_token"] != refresh_token + + +async def test_refresh_revoked_token(client): + await client.post("/auth/register", json={"email": "user@example.com", "password": "secret123"}) + login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) + refresh_token = login.json()["refresh_token"] + + # Use once (rotates) + await client.post("/auth/refresh", json={"refresh_token": refresh_token}) + # Try to reuse old token — must fail + resp = await client.post("/auth/refresh", json={"refresh_token": refresh_token}) + assert resp.status_code == 401 + + +async def test_logout_success(client): + await client.post("/auth/register", json={"email": "user@example.com", "password": "secret123"}) + login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) + refresh_token = login.json()["refresh_token"] + + resp = await client.post("/auth/logout", json={"refresh_token": refresh_token}) + assert resp.status_code == 204 + + # Refresh after logout must fail + resp2 = await client.post("/auth/refresh", json={"refresh_token": refresh_token}) + assert resp2.status_code == 401 diff --git a/backend/tests/test_db.py b/backend/tests/test_db.py new file mode 100644 index 0000000..8fae8ac --- /dev/null +++ b/backend/tests/test_db.py @@ -0,0 +1,54 @@ +"""Tests for DuckDB initialization and schema.""" +import pytest +import duckdb + +from backend.app import db as db_module + + +@pytest.fixture(autouse=True) +def fresh_db(): + """Use an in-memory DB for each test and reset global state.""" + db_module._conn = None + yield + db_module.close_db() + db_module._conn = None + + +def test_init_creates_users_table(): + conn = db_module.init_db(":memory:") + result = conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_name = 'users'" + ).fetchone() + assert result is not None + + +def test_init_creates_refresh_tokens_table(): + conn = db_module.init_db(":memory:") + result = conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_name = 'refresh_tokens'" + ).fetchone() + assert result is not None + + +def test_init_is_idempotent(): + conn1 = db_module.init_db(":memory:") + conn2 = db_module.init_db(":memory:") + assert conn1 is conn2 + + +def test_get_conn_raises_before_init(): + with pytest.raises(RuntimeError, match="not initialised"): + db_module.get_conn() + + +def test_get_conn_returns_connection_after_init(): + db_module.init_db(":memory:") + conn = db_module.get_conn() + assert conn is not None + + +def test_close_db_resets_connection(): + db_module.init_db(":memory:") + db_module.close_db() + with pytest.raises(RuntimeError): + db_module.get_conn() diff --git a/backend/tests/test_users.py b/backend/tests/test_users.py new file mode 100644 index 0000000..5c6e3c5 --- /dev/null +++ b/backend/tests/test_users.py @@ -0,0 +1,206 @@ +"""Integration tests for user management endpoints.""" +import os +import pytest +import pytest_asyncio +from httpx import AsyncClient, ASGITransport + +from backend.app.main import app +from backend.app import db as db_module + +os.environ.setdefault("JWT_SECRET", "test-secret-key-for-testing-only") + + +@pytest.fixture(autouse=True) +def fresh_db(): + db_module._conn = None + db_module.init_db(":memory:") + yield + db_module.close_db() + db_module._conn = None + + +@pytest_asyncio.fixture +async def client(fresh_db): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + yield ac + + +async def _register_and_login(client, email="user@example.com", password="secret123"): + await client.post("/auth/register", json={"email": email, "password": password}) + resp = await client.post("/auth/login", json={"email": email, "password": password}) + return resp.json()["access_token"] + + +async def _make_admin(user_id: str): + """Directly set a user's role to admin in the DB.""" + conn = db_module.get_conn() + conn.execute("UPDATE users SET role = 'admin' WHERE id = ?", [user_id]) + + +# --------------------------------------------------------------------------- +# GET /users/me +# --------------------------------------------------------------------------- + +async def test_get_me(client): + token = await _register_and_login(client) + resp = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 200 + data = resp.json() + assert data["email"] == "user@example.com" + assert data["role"] == "user" + assert "id" in data + + +async def test_get_me_unauthenticated(client): + resp = await client.get("/users/me") + assert resp.status_code == 401 + + +# --------------------------------------------------------------------------- +# PUT /users/me +# --------------------------------------------------------------------------- + +async def test_update_me_email(client): + token = await _register_and_login(client) + resp = await client.put( + "/users/me", + json={"email": "new@example.com"}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code == 200 + assert resp.json()["email"] == "new@example.com" + + +async def test_update_me_password(client): + token = await _register_and_login(client) + resp = await client.put( + "/users/me", + json={"password": "newpassword123"}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code == 200 + # Verify new password works for login + login = await client.post( + "/auth/login", json={"email": "user@example.com", "password": "newpassword123"} + ) + assert login.status_code == 200 + + +async def test_update_me_duplicate_email(client): + await _register_and_login(client, "other@example.com") + token = await _register_and_login(client, "user@example.com") + resp = await client.put( + "/users/me", + json={"email": "other@example.com"}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code == 409 + + +# --------------------------------------------------------------------------- +# GET /users (admin only) +# --------------------------------------------------------------------------- + +async def test_list_users_as_admin(client): + token = await _register_and_login(client) + me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) + await _make_admin(me.json()["id"]) + + # Re-login to get a token with admin role + login = await client.post( + "/auth/login", json={"email": "user@example.com", "password": "secret123"} + ) + admin_token = login.json()["access_token"] + resp = await client.get("/users", headers={"Authorization": f"Bearer {admin_token}"}) + assert resp.status_code == 200 + assert isinstance(resp.json(), list) + assert len(resp.json()) == 1 + + +async def test_list_users_as_regular_user(client): + token = await _register_and_login(client) + resp = await client.get("/users", headers={"Authorization": f"Bearer {token}"}) + assert resp.status_code == 403 + + +# --------------------------------------------------------------------------- +# DELETE /users/{id} (admin only) +# --------------------------------------------------------------------------- + +async def test_delete_user_as_admin(client): + # Register target user + await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"}) + target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"}) + target_token = target_resp.json()["access_token"] + target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_token}"}) + target_id = target_me.json()["id"] + + # Register admin + token = await _register_and_login(client) + me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) + await _make_admin(me.json()["id"]) + login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) + admin_token = login.json()["access_token"] + + resp = await client.delete(f"/users/{target_id}", headers={"Authorization": f"Bearer {admin_token}"}) + assert resp.status_code == 204 + + +async def test_delete_own_account_forbidden(client): + token = await _register_and_login(client) + me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) + user_id = me.json()["id"] + await _make_admin(user_id) + login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) + admin_token = login.json()["access_token"] + + resp = await client.delete(f"/users/{user_id}", headers={"Authorization": f"Bearer {admin_token}"}) + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# PUT /users/{id}/role (admin only) +# --------------------------------------------------------------------------- + +async def test_set_role_as_admin(client): + # Register target + await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"}) + target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"}) + target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_resp.json()['access_token']}"}) + target_id = target_me.json()["id"] + + # Register admin + token = await _register_and_login(client) + me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) + await _make_admin(me.json()["id"]) + login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) + admin_token = login.json()["access_token"] + + resp = await client.put( + f"/users/{target_id}/role", + json={"role": "admin"}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 200 + assert resp.json()["role"] == "admin" + + +async def test_set_invalid_role(client): + await client.post("/auth/register", json={"email": "target@example.com", "password": "secret123"}) + target_resp = await client.post("/auth/login", json={"email": "target@example.com", "password": "secret123"}) + target_me = await client.get("/users/me", headers={"Authorization": f"Bearer {target_resp.json()['access_token']}"}) + target_id = target_me.json()["id"] + + token = await _register_and_login(client) + me = await client.get("/users/me", headers={"Authorization": f"Bearer {token}"}) + await _make_admin(me.json()["id"]) + login = await client.post("/auth/login", json={"email": "user@example.com", "password": "secret123"}) + admin_token = login.json()["access_token"] + + resp = await client.put( + f"/users/{target_id}/role", + json={"role": "superuser"}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 422 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3672301 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["backend/tests", "frontend/tests"]