Files
zwitschi 4cefd4e3ab
Some checks failed
CI / test (3.11) (push) Failing after 5m36s
CI / build-image (push) Has been skipped
v1
2025-10-22 16:48:55 +02:00

692 lines
23 KiB
Python

"""Database helpers supporting SQLite and optional Postgres."""
from __future__ import annotations
import logging
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator, Tuple
from . import settings
try: # psycopg2 is optional
import psycopg2
except Exception: # pragma: no cover
psycopg2 = None # type: ignore
if TYPE_CHECKING: # pragma: no cover
from .services.contact import ContactSubmission
DB_PATH = Path(settings.SQLITE_DB_PATH)
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
DEFAULT_DB_PATH = DB_PATH
_USE_POSTGRES_OVERRIDE: bool | None = None
# Keep legacy-style flag available for external access.
USE_POSTGRES = False
def set_db_path(new_path: Path | str) -> None:
"""Update the SQLite database path (used primarily in tests)."""
global DB_PATH
DB_PATH = Path(new_path)
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
def set_postgres_override(value: bool | None) -> None:
"""Allow callers to force-enable or disable Postgres usage."""
global _USE_POSTGRES_OVERRIDE
_USE_POSTGRES_OVERRIDE = value
def is_postgres_enabled() -> bool:
"""Return True when Postgres should be used for database operations."""
if _USE_POSTGRES_OVERRIDE is not None:
use_pg = _USE_POSTGRES_OVERRIDE
elif psycopg2 is None or not settings.POSTGRES_URL:
use_pg = False
else:
use_pg = DB_PATH == DEFAULT_DB_PATH
globals()["USE_POSTGRES"] = use_pg
return use_pg
@contextmanager
def db_cursor(*, read_only: bool = False) -> Iterator[Tuple[Any, Any]]:
"""Yield a database cursor for either SQLite or Postgres."""
use_pg = is_postgres_enabled()
if use_pg:
if psycopg2 is None:
raise RuntimeError(
"Postgres requested but psycopg2 is unavailable")
conn = psycopg2.connect(settings.POSTGRES_URL)
else:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
try:
cur = conn.cursor()
try:
yield conn, cur
if use_pg:
if read_only:
conn.rollback()
else:
conn.commit()
elif not read_only:
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
raise
finally:
try:
cur.close()
except Exception:
pass
finally:
conn.close()
def init_db() -> None:
"""Create the required tables if they do not exist."""
if is_postgres_enabled():
with db_cursor() as (_, cur):
cur.execute(
"""
CREATE TABLE IF NOT EXISTS contact (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
company TEXT,
message TEXT NOT NULL,
timeline TEXT,
created_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS subscribers (
email TEXT PRIMARY KEY,
subscribed_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS newsletters (
id SERIAL PRIMARY KEY,
subject TEXT NOT NULL,
content TEXT NOT NULL,
sender_name TEXT,
send_date TEXT,
status TEXT NOT NULL DEFAULT 'draft',
created_at TEXT NOT NULL,
sent_at TEXT
)
"""
)
else:
with db_cursor() as (_, cur):
cur.execute(
"""
CREATE TABLE IF NOT EXISTS contact (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
company TEXT,
message TEXT NOT NULL,
timeline TEXT,
created_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS subscribers (
email TEXT PRIMARY KEY,
subscribed_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS newsletters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject TEXT NOT NULL,
content TEXT NOT NULL,
sender_name TEXT,
send_date TEXT,
status TEXT NOT NULL DEFAULT 'draft',
created_at TEXT NOT NULL,
sent_at TEXT
)
"""
)
def save_contact(submission: "ContactSubmission") -> int:
"""Persist a contact submission and return its identifier."""
record_id = 0
use_pg = is_postgres_enabled()
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"INSERT INTO contact (name, email, company, message, timeline, created_at) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id",
(
submission.name,
submission.email,
submission.company,
submission.message,
submission.timeline,
submission.created_at,
),
)
row = cur.fetchone()
if row:
record_id = int(row[0])
else:
cur.execute(
"INSERT INTO contact (name, email, company, message, timeline, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(
submission.name,
submission.email,
submission.company,
submission.message,
submission.timeline,
submission.created_at,
),
)
record_id = int(cur.lastrowid or 0)
return record_id
def save_subscriber(email: str, *, created_at: str) -> bool:
"""Persist a newsletter subscriber. Returns False on duplicate entries."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"INSERT INTO subscribers (email, subscribed_at) VALUES (%s, %s)",
(email, created_at),
)
else:
cur.execute(
"INSERT INTO subscribers (email, subscribed_at) VALUES (?, ?)",
(email, created_at),
)
return True
except sqlite3.IntegrityError:
return False
except Exception as exc:
if use_pg and psycopg2 is not None and isinstance(exc, psycopg2.IntegrityError):
return False
raise
def delete_subscriber(email: str) -> bool:
"""Remove a newsletter subscriber. Returns True if deleted, False if not found."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"DELETE FROM subscribers WHERE email = %s", (email,))
else:
cur.execute(
"DELETE FROM subscribers WHERE email = ?", (email,))
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to delete subscriber: %s", exc)
raise
def update_subscriber(old_email: str, new_email: str) -> bool:
"""Update a subscriber's email. Returns True if updated, False if old_email not found or new_email exists."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
# Check if old_email exists and new_email doesn't
if use_pg:
cur.execute(
"SELECT 1 FROM subscribers WHERE email = %s", (old_email,))
if not cur.fetchone():
return False
cur.execute(
"SELECT 1 FROM subscribers WHERE email = %s", (new_email,))
if cur.fetchone():
return False
cur.execute(
"UPDATE subscribers SET email = %s WHERE email = %s", (new_email, old_email))
else:
cur.execute(
"SELECT 1 FROM subscribers WHERE email = ?", (old_email,))
if not cur.fetchone():
return False
cur.execute(
"SELECT 1 FROM subscribers WHERE email = ?", (new_email,))
if cur.fetchone():
return False
cur.execute(
"UPDATE subscribers SET email = ? WHERE email = ?", (new_email, old_email))
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to update subscriber: %s", exc)
raise
def get_contacts(
page: int = 1,
per_page: int = 50,
sort_by: str = "created_at",
sort_order: str = "desc",
email_filter: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
) -> Tuple[list[dict], int]:
"""Retrieve contact submissions with pagination, filtering, and sorting."""
use_pg = is_postgres_enabled()
offset = (page - 1) * per_page
# Build WHERE clause
where_conditions = []
params = []
if email_filter:
where_conditions.append("email LIKE ?")
params.append(f"%{email_filter}%")
if date_from:
where_conditions.append("created_at >= ?")
params.append(date_from)
if date_to:
where_conditions.append("created_at <= ?")
params.append(date_to)
where_clause = "WHERE " + \
" AND ".join(where_conditions) if where_conditions else ""
# Build ORDER BY clause
valid_sort_fields = {"id", "name", "email", "created_at"}
if sort_by not in valid_sort_fields:
sort_by = "created_at"
sort_order = "DESC" if sort_order.lower() == "desc" else "ASC"
order_clause = f"ORDER BY {sort_by} {sort_order}"
# Get total count
count_query = f"SELECT COUNT(*) FROM contact {where_clause}"
with db_cursor(read_only=True) as (_, cur):
if use_pg:
# Convert ? to %s for PostgreSQL
count_query = count_query.replace("?", "%s")
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Get paginated results
select_query = f"""
SELECT id, name, email, company, message, timeline, created_at
FROM contact {where_clause} {order_clause}
LIMIT ? OFFSET ?
"""
params.extend([per_page, offset])
contacts = []
with db_cursor(read_only=True) as (_, cur):
if use_pg:
# Convert ? to %s for PostgreSQL and handle LIMIT/OFFSET
select_query = select_query.replace("?", "%s")
select_query = select_query.replace(
"LIMIT %s OFFSET %s", "LIMIT %s OFFSET %s")
cur.execute(select_query, params)
if use_pg:
rows = cur.fetchall()
else:
rows = cur.fetchall()
for row in rows:
contacts.append({
"id": row[0],
"name": row[1],
"email": row[2],
"company": row[3],
"message": row[4],
"timeline": row[5],
"created_at": row[6],
})
return contacts, total
def get_subscribers(
page: int = 1,
per_page: int = 50,
sort_by: str = "subscribed_at",
sort_order: str = "desc",
email_filter: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
) -> Tuple[list[dict], int]:
"""Retrieve newsletter subscribers with pagination, filtering, and sorting."""
use_pg = is_postgres_enabled()
offset = (page - 1) * per_page
# Build WHERE clause
where_conditions = []
params = []
if email_filter:
where_conditions.append("email LIKE ?")
params.append(f"%{email_filter}%")
if date_from:
where_conditions.append("subscribed_at >= ?")
params.append(date_from)
if date_to:
where_conditions.append("subscribed_at <= ?")
params.append(date_to)
where_clause = "WHERE " + \
" AND ".join(where_conditions) if where_conditions else ""
# Build ORDER BY clause
valid_sort_fields = {"email", "subscribed_at"}
if sort_by not in valid_sort_fields:
sort_by = "subscribed_at"
sort_order = "DESC" if sort_order.lower() == "desc" else "ASC"
order_clause = f"ORDER BY {sort_by} {sort_order}"
# Get total count
count_query = f"SELECT COUNT(*) FROM subscribers {where_clause}"
with db_cursor(read_only=True) as (_, cur):
if use_pg:
# Convert ? to %s for PostgreSQL
count_query = count_query.replace("?", "%s")
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Get paginated results
select_query = f"""
SELECT email, subscribed_at
FROM subscribers {where_clause} {order_clause}
LIMIT ? OFFSET ?
"""
params.extend([per_page, offset])
subscribers = []
with db_cursor(read_only=True) as (_, cur):
if use_pg:
# Convert ? to %s for PostgreSQL and handle LIMIT/OFFSET
select_query = select_query.replace("?", "%s")
select_query = select_query.replace(
"LIMIT %s OFFSET %s", "LIMIT %s OFFSET %s")
cur.execute(select_query, params)
rows = cur.fetchall()
for row in rows:
subscribers.append({
"email": row[0],
"subscribed_at": row[1],
})
return subscribers, total
def delete_contact(contact_id: int) -> bool:
"""Delete a contact submission by ID. Returns True if deleted."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute("DELETE FROM contact WHERE id = %s", (contact_id,))
else:
cur.execute("DELETE FROM contact WHERE id = ?", (contact_id,))
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to delete contact: %s", exc)
raise
def get_app_settings() -> dict[str, str]:
"""Retrieve all application settings as a dictionary."""
settings_dict = {}
with db_cursor(read_only=True) as (_, cur):
cur.execute("SELECT key, value FROM app_settings ORDER BY key")
rows = cur.fetchall()
for row in rows:
settings_dict[row[0]] = row[1]
return settings_dict
def update_app_setting(key: str, value: str) -> bool:
"""Update or insert an application setting. Returns True on success."""
from datetime import datetime, timezone
updated_at = datetime.now(timezone.utc).isoformat()
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"""
INSERT INTO app_settings (key, value, updated_at)
VALUES (%s, %s, %s)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at
""",
(key, value, updated_at),
)
else:
cur.execute(
"""
INSERT OR REPLACE INTO app_settings (key, value, updated_at)
VALUES (?, ?, ?)
""",
(key, value, updated_at),
)
return True
except Exception as exc:
logging.exception("Failed to update app setting: %s", exc)
raise
def delete_app_setting(key: str) -> bool:
"""Delete an application setting. Returns True if deleted."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute("DELETE FROM app_settings WHERE key = %s", (key,))
else:
cur.execute("DELETE FROM app_settings WHERE key = ?", (key,))
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to delete app setting: %s", exc)
raise
def save_newsletter(subject: str, content: str, sender_name: str | None = None, send_date: str | None = None, status: str = "draft") -> int:
"""Save a newsletter and return its ID."""
from datetime import datetime, timezone
created_at = datetime.now(timezone.utc).isoformat()
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"""
INSERT INTO newsletters (subject, content, sender_name, send_date, status, created_at)
VALUES (%s, %s, %s, %s, %s, %s) RETURNING id
""",
(subject, content, sender_name, send_date, status, created_at),
)
newsletter_id = cur.fetchone()[0]
else:
cur.execute(
"""
INSERT INTO newsletters (subject, content, sender_name, send_date, status, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(subject, content, sender_name, send_date, status, created_at),
)
newsletter_id = cur.lastrowid
return newsletter_id
except Exception as exc:
logging.exception("Failed to save newsletter: %s", exc)
raise
def get_newsletters(page: int = 1, per_page: int = 20, status_filter: str | None = None) -> tuple[list[dict], int]:
"""Get newsletters with pagination and optional status filtering."""
use_pg = is_postgres_enabled()
newsletters = []
total = 0
offset = (page - 1) * per_page
try:
with db_cursor(read_only=True) as (_, cur):
# Get total count
count_query = "SELECT COUNT(*) FROM newsletters"
count_params = []
if status_filter:
count_query += " WHERE status = %s" if use_pg else " WHERE status = ?"
count_params.append(status_filter)
cur.execute(count_query, count_params)
total = cur.fetchone()[0]
# Get newsletters
select_query = """
SELECT id, subject, sender_name, send_date, status, created_at, sent_at
FROM newsletters
"""
params = []
if status_filter:
select_query += " WHERE status = %s" if use_pg else " WHERE status = ?"
params.append(status_filter)
select_query += " ORDER BY created_at DESC"
select_query += " LIMIT %s OFFSET %s" if use_pg else " LIMIT ? OFFSET ?"
params.extend([per_page, offset])
cur.execute(select_query, params)
rows = cur.fetchall()
for row in rows:
newsletters.append({
"id": row[0],
"subject": row[1],
"sender_name": row[2],
"send_date": row[3],
"status": row[4],
"created_at": row[5],
"sent_at": row[6],
})
except Exception as exc:
logging.exception("Failed to get newsletters: %s", exc)
raise
return newsletters, total
def update_newsletter_status(newsletter_id: int, status: str, sent_at: str | None = None) -> bool:
"""Update newsletter status and optionally sent_at timestamp."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if sent_at:
if use_pg:
cur.execute(
"UPDATE newsletters SET status = %s, sent_at = %s WHERE id = %s",
(status, sent_at, newsletter_id),
)
else:
cur.execute(
"UPDATE newsletters SET status = ?, sent_at = ? WHERE id = ?",
(status, sent_at, newsletter_id),
)
else:
if use_pg:
cur.execute(
"UPDATE newsletters SET status = %s WHERE id = %s",
(status, newsletter_id),
)
else:
cur.execute(
"UPDATE newsletters SET status = ? WHERE id = ?",
(status, newsletter_id),
)
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to update newsletter status: %s", exc)
raise
def get_newsletter_by_id(newsletter_id: int) -> dict | None:
"""Get a specific newsletter by ID."""
use_pg = is_postgres_enabled()
try:
with db_cursor(read_only=True) as (_, cur):
if use_pg:
cur.execute(
"SELECT id, subject, content, sender_name, send_date, status, created_at, sent_at FROM newsletters WHERE id = %s",
(newsletter_id,),
)
else:
cur.execute(
"SELECT id, subject, content, sender_name, send_date, status, created_at, sent_at FROM newsletters WHERE id = ?",
(newsletter_id,),
)
row = cur.fetchone()
if row:
return {
"id": row[0],
"subject": row[1],
"content": row[2],
"sender_name": row[3],
"send_date": row[4],
"status": row[5],
"created_at": row[6],
"sent_at": row[7],
}
except Exception as exc:
logging.exception("Failed to get newsletter by ID: %s", exc)
raise
return None