completing user administration

This commit is contained in:
georg.sinn-schirwitz
2025-08-30 18:33:08 +02:00
parent 7379d3040d
commit fe2a579fc4
5 changed files with 276 additions and 136 deletions

View File

@@ -534,7 +534,8 @@ def remove_job(url):
def get_or_create_user(username: str) -> int:
"""Return user_id for username, creating if missing."""
created_at = datetime.now(UTC).isoformat()
# 2025-08-30T16:04:29.660245+00:00 is wrong. should be 2025-08-30T16:04:29
created_at = datetime.now(UTC).isoformat().split('.')[0]
with _ensure_session() as session:
row = session.execute(
text("SELECT user_id FROM users WHERE username = :u"), {
@@ -654,22 +655,50 @@ def get_user(username: str) -> Optional[Dict[str, Any]]:
"""Return single user dict or None."""
with _ensure_session() as session:
row = session.execute(text(
"SELECT user_id, username, is_admin, is_active, password_hash, last_login, created_at FROM users WHERE username = :u"
"SELECT user_id, username, created_at, is_admin, is_active, last_login, (password_hash IS NOT NULL) AS has_pw FROM users WHERE username = :u"
), {"u": username}).fetchone()
if not row:
return None
return {
"user_id": int(row[0]),
"username": row[1],
"is_admin": bool(row[2]),
"is_active": bool(row[3]),
"password_hash": row[4],
"created_at": row[2].isoformat() if isinstance(row[2], datetime) else (row[2] or None),
"is_admin": bool(row[3]),
"is_active": bool(row[4]),
"last_login": row[5].isoformat() if row[5] else None,
"created_at": row[6].isoformat() if isinstance(row[6], datetime) else (row[6] or None),
"has_password": bool(row[6]),
}
def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]:
"""Return single user dict or None."""
with _ensure_session() as session:
row = session.execute(text(
"SELECT user_id, username, created_at, is_admin, is_active, last_login, (password_hash IS NOT NULL) AS has_pw FROM users WHERE user_id = :u"
), {"u": user_id}).fetchone()
if not row:
return None
return {
"user_id": int(row[0]),
"username": row[1],
"created_at": row[2].isoformat() if isinstance(row[2], datetime) else (row[2] or None),
"is_admin": bool(row[3]),
"is_active": bool(row[4]),
"last_login": row[5].isoformat() if row[5] else None,
"has_password": bool(row[6]),
}
def delete_user_by_id(user_id: int) -> bool:
with _ensure_session() as session:
result = session.execute(
text("DELETE FROM users WHERE user_id = :u"), {"u": user_id})
session.commit()
return result.rowcount > 0
# ---------------- Regions/Keywords helpers ---------------------------------
def upsert_region(name: str) -> int:
"""Get or create a region by name; return region_id."""
name = (name or "").strip()