"""Database helpers supporting SQLite and optional Postgres.""" from __future__ import annotations import logging import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import TYPE_CHECKING, Any, Iterator, Tuple from . import settings try: # psycopg2 is optional import psycopg2 except Exception: # pragma: no cover psycopg2 = None # type: ignore if TYPE_CHECKING: # pragma: no cover from .services.contact import ContactSubmission DB_PATH = Path(settings.SQLITE_DB_PATH) DB_PATH.parent.mkdir(parents=True, exist_ok=True) DEFAULT_DB_PATH = DB_PATH _USE_POSTGRES_OVERRIDE: bool | None = None # Keep legacy-style flag available for external access. USE_POSTGRES = False def set_db_path(new_path: Path | str) -> None: """Update the SQLite database path (used primarily in tests).""" global DB_PATH DB_PATH = Path(new_path) DB_PATH.parent.mkdir(parents=True, exist_ok=True) def set_postgres_override(value: bool | None) -> None: """Allow callers to force-enable or disable Postgres usage.""" global _USE_POSTGRES_OVERRIDE _USE_POSTGRES_OVERRIDE = value def is_postgres_enabled() -> bool: """Return True when Postgres should be used for database operations.""" if _USE_POSTGRES_OVERRIDE is not None: use_pg = _USE_POSTGRES_OVERRIDE elif psycopg2 is None or not settings.POSTGRES_URL: use_pg = False else: use_pg = DB_PATH == DEFAULT_DB_PATH globals()["USE_POSTGRES"] = use_pg return use_pg @contextmanager def db_cursor(*, read_only: bool = False) -> Iterator[Tuple[Any, Any]]: """Yield a database cursor for either SQLite or Postgres.""" use_pg = is_postgres_enabled() if use_pg: if psycopg2 is None: raise RuntimeError( "Postgres requested but psycopg2 is unavailable") conn = psycopg2.connect(settings.POSTGRES_URL) else: DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH)) try: cur = conn.cursor() try: yield conn, cur if use_pg: if read_only: conn.rollback() else: conn.commit() elif not read_only: conn.commit() except Exception: try: conn.rollback() except Exception: pass raise finally: try: cur.close() except Exception: pass finally: conn.close() def init_db() -> None: """Create the required tables if they do not exist.""" if is_postgres_enabled(): with db_cursor() as (_, cur): cur.execute( """ CREATE TABLE IF NOT EXISTS contact ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, email TEXT NOT NULL, company TEXT, message TEXT NOT NULL, timeline TEXT, created_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS subscribers ( email TEXT PRIMARY KEY, subscribed_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS app_settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS newsletters ( id SERIAL PRIMARY KEY, subject TEXT NOT NULL, content TEXT NOT NULL, sender_name TEXT, send_date TEXT, status TEXT NOT NULL DEFAULT 'draft', created_at TEXT NOT NULL, sent_at TEXT ) """ ) else: with db_cursor() as (_, cur): cur.execute( """ CREATE TABLE IF NOT EXISTS contact ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT NOT NULL, company TEXT, message TEXT NOT NULL, timeline TEXT, created_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS subscribers ( email TEXT PRIMARY KEY, subscribed_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS app_settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS newsletters ( id INTEGER PRIMARY KEY AUTOINCREMENT, subject TEXT NOT NULL, content TEXT NOT NULL, sender_name TEXT, send_date TEXT, status TEXT NOT NULL DEFAULT 'draft', created_at TEXT NOT NULL, sent_at TEXT ) """ ) def save_contact(submission: "ContactSubmission") -> int: """Persist a contact submission and return its identifier.""" record_id = 0 use_pg = is_postgres_enabled() with db_cursor() as (_, cur): if use_pg: cur.execute( "INSERT INTO contact (name, email, company, message, timeline, created_at) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id", ( submission.name, submission.email, submission.company, submission.message, submission.timeline, submission.created_at, ), ) row = cur.fetchone() if row: record_id = int(row[0]) else: cur.execute( "INSERT INTO contact (name, email, company, message, timeline, created_at) VALUES (?, ?, ?, ?, ?, ?)", ( submission.name, submission.email, submission.company, submission.message, submission.timeline, submission.created_at, ), ) record_id = int(cur.lastrowid or 0) return record_id def save_subscriber(email: str, *, created_at: str) -> bool: """Persist a newsletter subscriber. Returns False on duplicate entries.""" use_pg = is_postgres_enabled() try: with db_cursor() as (_, cur): if use_pg: cur.execute( "INSERT INTO subscribers (email, subscribed_at) VALUES (%s, %s)", (email, created_at), ) else: cur.execute( "INSERT INTO subscribers (email, subscribed_at) VALUES (?, ?)", (email, created_at), ) return True except sqlite3.IntegrityError: return False except Exception as exc: if use_pg and psycopg2 is not None and isinstance(exc, psycopg2.IntegrityError): return False raise def delete_subscriber(email: str) -> bool: """Remove a newsletter subscriber. Returns True if deleted, False if not found.""" use_pg = is_postgres_enabled() try: with db_cursor() as (_, cur): if use_pg: cur.execute( "DELETE FROM subscribers WHERE email = %s", (email,)) else: cur.execute( "DELETE FROM subscribers WHERE email = ?", (email,)) return cur.rowcount > 0 except Exception as exc: logging.exception("Failed to delete subscriber: %s", exc) raise def update_subscriber(old_email: str, new_email: str) -> bool: """Update a subscriber's email. Returns True if updated, False if old_email not found or new_email exists.""" use_pg = is_postgres_enabled() try: with db_cursor() as (_, cur): # Check if old_email exists and new_email doesn't if use_pg: cur.execute( "SELECT 1 FROM subscribers WHERE email = %s", (old_email,)) if not cur.fetchone(): return False cur.execute( "SELECT 1 FROM subscribers WHERE email = %s", (new_email,)) if cur.fetchone(): return False cur.execute( "UPDATE subscribers SET email = %s WHERE email = %s", (new_email, old_email)) else: cur.execute( "SELECT 1 FROM subscribers WHERE email = ?", (old_email,)) if not cur.fetchone(): return False cur.execute( "SELECT 1 FROM subscribers WHERE email = ?", (new_email,)) if cur.fetchone(): return False cur.execute( "UPDATE subscribers SET email = ? WHERE email = ?", (new_email, old_email)) return cur.rowcount > 0 except Exception as exc: logging.exception("Failed to update subscriber: %s", exc) raise def get_contacts( page: int = 1, per_page: int = 50, sort_by: str = "created_at", sort_order: str = "desc", email_filter: str | None = None, date_from: str | None = None, date_to: str | None = None, ) -> Tuple[list[dict], int]: """Retrieve contact submissions with pagination, filtering, and sorting.""" use_pg = is_postgres_enabled() offset = (page - 1) * per_page # Build WHERE clause where_conditions = [] params = [] if email_filter: where_conditions.append("email LIKE ?") params.append(f"%{email_filter}%") if date_from: where_conditions.append("created_at >= ?") params.append(date_from) if date_to: where_conditions.append("created_at <= ?") params.append(date_to) where_clause = "WHERE " + \ " AND ".join(where_conditions) if where_conditions else "" # Build ORDER BY clause valid_sort_fields = {"id", "name", "email", "created_at"} if sort_by not in valid_sort_fields: sort_by = "created_at" sort_order = "DESC" if sort_order.lower() == "desc" else "ASC" order_clause = f"ORDER BY {sort_by} {sort_order}" # Get total count count_query = f"SELECT COUNT(*) FROM contact {where_clause}" with db_cursor(read_only=True) as (_, cur): if use_pg: # Convert ? to %s for PostgreSQL count_query = count_query.replace("?", "%s") cur.execute(count_query, params) total = cur.fetchone()[0] # Get paginated results select_query = f""" SELECT id, name, email, company, message, timeline, created_at FROM contact {where_clause} {order_clause} LIMIT ? OFFSET ? """ params.extend([per_page, offset]) contacts = [] with db_cursor(read_only=True) as (_, cur): if use_pg: # Convert ? to %s for PostgreSQL and handle LIMIT/OFFSET select_query = select_query.replace("?", "%s") select_query = select_query.replace( "LIMIT %s OFFSET %s", "LIMIT %s OFFSET %s") cur.execute(select_query, params) if use_pg: rows = cur.fetchall() else: rows = cur.fetchall() for row in rows: contacts.append({ "id": row[0], "name": row[1], "email": row[2], "company": row[3], "message": row[4], "timeline": row[5], "created_at": row[6], }) return contacts, total def get_subscribers( page: int = 1, per_page: int = 50, sort_by: str = "subscribed_at", sort_order: str = "desc", email_filter: str | None = None, date_from: str | None = None, date_to: str | None = None, ) -> Tuple[list[dict], int]: """Retrieve newsletter subscribers with pagination, filtering, and sorting.""" use_pg = is_postgres_enabled() offset = (page - 1) * per_page # Build WHERE clause where_conditions = [] params = [] if email_filter: where_conditions.append("email LIKE ?") params.append(f"%{email_filter}%") if date_from: where_conditions.append("subscribed_at >= ?") params.append(date_from) if date_to: where_conditions.append("subscribed_at <= ?") params.append(date_to) where_clause = "WHERE " + \ " AND ".join(where_conditions) if where_conditions else "" # Build ORDER BY clause valid_sort_fields = {"email", "subscribed_at"} if sort_by not in valid_sort_fields: sort_by = "subscribed_at" sort_order = "DESC" if sort_order.lower() == "desc" else "ASC" order_clause = f"ORDER BY {sort_by} {sort_order}" # Get total count count_query = f"SELECT COUNT(*) FROM subscribers {where_clause}" with db_cursor(read_only=True) as (_, cur): if use_pg: # Convert ? to %s for PostgreSQL count_query = count_query.replace("?", "%s") cur.execute(count_query, params) total = cur.fetchone()[0] # Get paginated results select_query = f""" SELECT email, subscribed_at FROM subscribers {where_clause} {order_clause} LIMIT ? OFFSET ? """ params.extend([per_page, offset]) subscribers = [] with db_cursor(read_only=True) as (_, cur): if use_pg: # Convert ? to %s for PostgreSQL and handle LIMIT/OFFSET select_query = select_query.replace("?", "%s") select_query = select_query.replace( "LIMIT %s OFFSET %s", "LIMIT %s OFFSET %s") cur.execute(select_query, params) rows = cur.fetchall() for row in rows: subscribers.append({ "email": row[0], "subscribed_at": row[1], }) return subscribers, total def delete_contact(contact_id: int) -> bool: """Delete a contact submission by ID. Returns True if deleted.""" use_pg = is_postgres_enabled() try: with db_cursor() as (_, cur): if use_pg: cur.execute("DELETE FROM contact WHERE id = %s", (contact_id,)) else: cur.execute("DELETE FROM contact WHERE id = ?", (contact_id,)) return cur.rowcount > 0 except Exception as exc: logging.exception("Failed to delete contact: %s", exc) raise def get_app_settings() -> dict[str, str]: """Retrieve all application settings as a dictionary.""" settings_dict = {} with db_cursor(read_only=True) as (_, cur): cur.execute("SELECT key, value FROM app_settings ORDER BY key") rows = cur.fetchall() for row in rows: settings_dict[row[0]] = row[1] return settings_dict def update_app_setting(key: str, value: str) -> bool: """Update or insert an application setting. Returns True on success.""" from datetime import datetime, timezone updated_at = datetime.now(timezone.utc).isoformat() use_pg = is_postgres_enabled() try: with db_cursor() as (_, cur): if use_pg: cur.execute( """ INSERT INTO app_settings (key, value, updated_at) VALUES (%s, %s, %s) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at """, (key, value, updated_at), ) else: cur.execute( """ INSERT OR REPLACE INTO app_settings (key, value, updated_at) VALUES (?, ?, ?) """, (key, value, updated_at), ) return True except Exception as exc: logging.exception("Failed to update app setting: %s", exc) raise def delete_app_setting(key: str) -> bool: """Delete an application setting. Returns True if deleted.""" use_pg = is_postgres_enabled() try: with db_cursor() as (_, cur): if use_pg: cur.execute("DELETE FROM app_settings WHERE key = %s", (key,)) else: cur.execute("DELETE FROM app_settings WHERE key = ?", (key,)) return cur.rowcount > 0 except Exception as exc: logging.exception("Failed to delete app setting: %s", exc) raise def save_newsletter(subject: str, content: str, sender_name: str | None = None, send_date: str | None = None, status: str = "draft") -> int: """Save a newsletter and return its ID.""" from datetime import datetime, timezone created_at = datetime.now(timezone.utc).isoformat() use_pg = is_postgres_enabled() try: with db_cursor() as (_, cur): if use_pg: cur.execute( """ INSERT INTO newsletters (subject, content, sender_name, send_date, status, created_at) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id """, (subject, content, sender_name, send_date, status, created_at), ) newsletter_id = cur.fetchone()[0] else: cur.execute( """ INSERT INTO newsletters (subject, content, sender_name, send_date, status, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (subject, content, sender_name, send_date, status, created_at), ) newsletter_id = cur.lastrowid return newsletter_id except Exception as exc: logging.exception("Failed to save newsletter: %s", exc) raise def get_newsletters(page: int = 1, per_page: int = 20, status_filter: str | None = None) -> tuple[list[dict], int]: """Get newsletters with pagination and optional status filtering.""" use_pg = is_postgres_enabled() newsletters = [] total = 0 offset = (page - 1) * per_page try: with db_cursor(read_only=True) as (_, cur): # Get total count count_query = "SELECT COUNT(*) FROM newsletters" count_params = [] if status_filter: count_query += " WHERE status = %s" if use_pg else " WHERE status = ?" count_params.append(status_filter) cur.execute(count_query, count_params) total = cur.fetchone()[0] # Get newsletters select_query = """ SELECT id, subject, sender_name, send_date, status, created_at, sent_at FROM newsletters """ params = [] if status_filter: select_query += " WHERE status = %s" if use_pg else " WHERE status = ?" params.append(status_filter) select_query += " ORDER BY created_at DESC" select_query += " LIMIT %s OFFSET %s" if use_pg else " LIMIT ? OFFSET ?" params.extend([per_page, offset]) cur.execute(select_query, params) rows = cur.fetchall() for row in rows: newsletters.append({ "id": row[0], "subject": row[1], "sender_name": row[2], "send_date": row[3], "status": row[4], "created_at": row[5], "sent_at": row[6], }) except Exception as exc: logging.exception("Failed to get newsletters: %s", exc) raise return newsletters, total def update_newsletter_status(newsletter_id: int, status: str, sent_at: str | None = None) -> bool: """Update newsletter status and optionally sent_at timestamp.""" use_pg = is_postgres_enabled() try: with db_cursor() as (_, cur): if sent_at: if use_pg: cur.execute( "UPDATE newsletters SET status = %s, sent_at = %s WHERE id = %s", (status, sent_at, newsletter_id), ) else: cur.execute( "UPDATE newsletters SET status = ?, sent_at = ? WHERE id = ?", (status, sent_at, newsletter_id), ) else: if use_pg: cur.execute( "UPDATE newsletters SET status = %s WHERE id = %s", (status, newsletter_id), ) else: cur.execute( "UPDATE newsletters SET status = ? WHERE id = ?", (status, newsletter_id), ) return cur.rowcount > 0 except Exception as exc: logging.exception("Failed to update newsletter status: %s", exc) raise def get_newsletter_by_id(newsletter_id: int) -> dict | None: """Get a specific newsletter by ID.""" use_pg = is_postgres_enabled() try: with db_cursor(read_only=True) as (_, cur): if use_pg: cur.execute( "SELECT id, subject, content, sender_name, send_date, status, created_at, sent_at FROM newsletters WHERE id = %s", (newsletter_id,), ) else: cur.execute( "SELECT id, subject, content, sender_name, send_date, status, created_at, sent_at FROM newsletters WHERE id = ?", (newsletter_id,), ) row = cur.fetchone() if row: return { "id": row[0], "subject": row[1], "content": row[2], "sender_name": row[3], "send_date": row[4], "status": row[5], "created_at": row[6], "sent_at": row[7], } except Exception as exc: logging.exception("Failed to get newsletter by ID: %s", exc) raise return None