feat: enhance database queries with error handling and improve SQL statement readability

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
2026-04-29 16:28:22 +02:00
parent df85676fa2
commit 8e36f48527
4 changed files with 70 additions and 67 deletions
+20 -6
View File
@@ -20,10 +20,18 @@ async def get_stats(_: dict = Depends(require_admin)) -> dict:
sql_token_count = "SELECT COUNT(*) FROM refresh_tokens"
sql_tokens_active = "SELECT COUNT(*) FROM refresh_tokens WHERE revoked = false AND expires_at > ?"
now = datetime.now(timezone.utc)
total_users = conn.execute(sql_user_count).fetchone()[0]
total_users_row = conn.execute(sql_user_count).fetchone()
total_users = total_users_row[0] if total_users_row else 0
users_by_role = conn.execute(sql_user_counts).fetchall()
total_tokens = conn.execute(sql_token_count).fetchone()[0]
active_tokens = conn.execute(sql_tokens_active, [now]).fetchone()[0]
total_tokens_row = conn.execute(sql_token_count).fetchone()
total_tokens = total_tokens_row[0] if total_tokens_row else 0
active_tokens_row = conn.execute(sql_tokens_active, [now]).fetchone()
active_tokens = active_tokens_row[0] if active_tokens_row else 0
return {
"users": {
"total": total_users,
@@ -41,7 +49,8 @@ async def get_stats(_: dict = Depends(require_admin)) -> dict:
async def db_health(_: dict = Depends(require_admin)) -> dict:
"""Verify DuckDB is reachable."""
conn = get_conn()
result = conn.execute("SELECT 1").fetchone()[0]
result_row = conn.execute("SELECT 1").fetchone()
result = result_row[0] if result_row else 0
return {"status": "ok" if result == 1 else "error"}
@@ -54,9 +63,14 @@ async def purge_tokens(_: dict = Depends(require_admin)) -> dict:
sql_count = "SELECT COUNT(*) FROM refresh_tokens"
sql_delete = "DELETE FROM refresh_tokens WHERE revoked = true OR expires_at <= ?"
async with lock:
before = conn.execute(sql_count).fetchone()[0]
before_row = conn.execute(sql_count).fetchone()
before = before_row[0] if before_row else 0
conn.execute(sql_delete, [now])
after = conn.execute(sql_count).fetchone()[0]
after_row = conn.execute(sql_count).fetchone()
after = after_row[0] if after_row else 0
return {"deleted": before - after, "remaining": after}