692 lines
23 KiB
Python
692 lines
23 KiB
Python
"""Database helpers supporting SQLite and optional Postgres."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any, Iterator, Tuple
|
|
|
|
from . import settings
|
|
|
|
try: # psycopg2 is optional
|
|
import psycopg2
|
|
except Exception: # pragma: no cover
|
|
psycopg2 = None # type: ignore
|
|
|
|
if TYPE_CHECKING: # pragma: no cover
|
|
from .services.contact import ContactSubmission
|
|
|
|
DB_PATH = Path(settings.SQLITE_DB_PATH)
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
DEFAULT_DB_PATH = DB_PATH
|
|
_USE_POSTGRES_OVERRIDE: bool | None = None
|
|
|
|
# Keep legacy-style flag available for external access.
|
|
USE_POSTGRES = False
|
|
|
|
|
|
def set_db_path(new_path: Path | str) -> None:
|
|
"""Update the SQLite database path (used primarily in tests)."""
|
|
global DB_PATH
|
|
DB_PATH = Path(new_path)
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def set_postgres_override(value: bool | None) -> None:
|
|
"""Allow callers to force-enable or disable Postgres usage."""
|
|
global _USE_POSTGRES_OVERRIDE
|
|
_USE_POSTGRES_OVERRIDE = value
|
|
|
|
|
|
def is_postgres_enabled() -> bool:
|
|
"""Return True when Postgres should be used for database operations."""
|
|
if _USE_POSTGRES_OVERRIDE is not None:
|
|
use_pg = _USE_POSTGRES_OVERRIDE
|
|
elif psycopg2 is None or not settings.POSTGRES_URL:
|
|
use_pg = False
|
|
else:
|
|
use_pg = DB_PATH == DEFAULT_DB_PATH
|
|
|
|
globals()["USE_POSTGRES"] = use_pg
|
|
return use_pg
|
|
|
|
|
|
@contextmanager
|
|
def db_cursor(*, read_only: bool = False) -> Iterator[Tuple[Any, Any]]:
|
|
"""Yield a database cursor for either SQLite or Postgres."""
|
|
use_pg = is_postgres_enabled()
|
|
|
|
if use_pg:
|
|
if psycopg2 is None:
|
|
raise RuntimeError(
|
|
"Postgres requested but psycopg2 is unavailable")
|
|
conn = psycopg2.connect(settings.POSTGRES_URL)
|
|
else:
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
|
|
try:
|
|
cur = conn.cursor()
|
|
try:
|
|
yield conn, cur
|
|
if use_pg:
|
|
if read_only:
|
|
conn.rollback()
|
|
else:
|
|
conn.commit()
|
|
elif not read_only:
|
|
conn.commit()
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
finally:
|
|
try:
|
|
cur.close()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def init_db() -> None:
|
|
"""Create the required tables if they do not exist."""
|
|
if is_postgres_enabled():
|
|
with db_cursor() as (_, cur):
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS contact (
|
|
id SERIAL PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
email TEXT NOT NULL,
|
|
company TEXT,
|
|
message TEXT NOT NULL,
|
|
timeline TEXT,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS subscribers (
|
|
email TEXT PRIMARY KEY,
|
|
subscribed_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS app_settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS newsletters (
|
|
id SERIAL PRIMARY KEY,
|
|
subject TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
sender_name TEXT,
|
|
send_date TEXT,
|
|
status TEXT NOT NULL DEFAULT 'draft',
|
|
created_at TEXT NOT NULL,
|
|
sent_at TEXT
|
|
)
|
|
"""
|
|
)
|
|
else:
|
|
with db_cursor() as (_, cur):
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS contact (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
email TEXT NOT NULL,
|
|
company TEXT,
|
|
message TEXT NOT NULL,
|
|
timeline TEXT,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS subscribers (
|
|
email TEXT PRIMARY KEY,
|
|
subscribed_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS app_settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS newsletters (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
subject TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
sender_name TEXT,
|
|
send_date TEXT,
|
|
status TEXT NOT NULL DEFAULT 'draft',
|
|
created_at TEXT NOT NULL,
|
|
sent_at TEXT
|
|
)
|
|
"""
|
|
)
|
|
|
|
|
|
def save_contact(submission: "ContactSubmission") -> int:
|
|
"""Persist a contact submission and return its identifier."""
|
|
record_id = 0
|
|
use_pg = is_postgres_enabled()
|
|
with db_cursor() as (_, cur):
|
|
if use_pg:
|
|
cur.execute(
|
|
"INSERT INTO contact (name, email, company, message, timeline, created_at) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id",
|
|
(
|
|
submission.name,
|
|
submission.email,
|
|
submission.company,
|
|
submission.message,
|
|
submission.timeline,
|
|
submission.created_at,
|
|
),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
record_id = int(row[0])
|
|
else:
|
|
cur.execute(
|
|
"INSERT INTO contact (name, email, company, message, timeline, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
|
(
|
|
submission.name,
|
|
submission.email,
|
|
submission.company,
|
|
submission.message,
|
|
submission.timeline,
|
|
submission.created_at,
|
|
),
|
|
)
|
|
record_id = int(cur.lastrowid or 0)
|
|
return record_id
|
|
|
|
|
|
def save_subscriber(email: str, *, created_at: str) -> bool:
|
|
"""Persist a newsletter subscriber. Returns False on duplicate entries."""
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor() as (_, cur):
|
|
if use_pg:
|
|
cur.execute(
|
|
"INSERT INTO subscribers (email, subscribed_at) VALUES (%s, %s)",
|
|
(email, created_at),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"INSERT INTO subscribers (email, subscribed_at) VALUES (?, ?)",
|
|
(email, created_at),
|
|
)
|
|
return True
|
|
except sqlite3.IntegrityError:
|
|
return False
|
|
except Exception as exc:
|
|
if use_pg and psycopg2 is not None and isinstance(exc, psycopg2.IntegrityError):
|
|
return False
|
|
raise
|
|
|
|
|
|
def delete_subscriber(email: str) -> bool:
|
|
"""Remove a newsletter subscriber. Returns True if deleted, False if not found."""
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor() as (_, cur):
|
|
if use_pg:
|
|
cur.execute(
|
|
"DELETE FROM subscribers WHERE email = %s", (email,))
|
|
else:
|
|
cur.execute(
|
|
"DELETE FROM subscribers WHERE email = ?", (email,))
|
|
return cur.rowcount > 0
|
|
except Exception as exc:
|
|
logging.exception("Failed to delete subscriber: %s", exc)
|
|
raise
|
|
|
|
|
|
def update_subscriber(old_email: str, new_email: str) -> bool:
|
|
"""Update a subscriber's email. Returns True if updated, False if old_email not found or new_email exists."""
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor() as (_, cur):
|
|
# Check if old_email exists and new_email doesn't
|
|
if use_pg:
|
|
cur.execute(
|
|
"SELECT 1 FROM subscribers WHERE email = %s", (old_email,))
|
|
if not cur.fetchone():
|
|
return False
|
|
cur.execute(
|
|
"SELECT 1 FROM subscribers WHERE email = %s", (new_email,))
|
|
if cur.fetchone():
|
|
return False
|
|
cur.execute(
|
|
"UPDATE subscribers SET email = %s WHERE email = %s", (new_email, old_email))
|
|
else:
|
|
cur.execute(
|
|
"SELECT 1 FROM subscribers WHERE email = ?", (old_email,))
|
|
if not cur.fetchone():
|
|
return False
|
|
cur.execute(
|
|
"SELECT 1 FROM subscribers WHERE email = ?", (new_email,))
|
|
if cur.fetchone():
|
|
return False
|
|
cur.execute(
|
|
"UPDATE subscribers SET email = ? WHERE email = ?", (new_email, old_email))
|
|
return cur.rowcount > 0
|
|
except Exception as exc:
|
|
logging.exception("Failed to update subscriber: %s", exc)
|
|
raise
|
|
|
|
|
|
def get_contacts(
|
|
page: int = 1,
|
|
per_page: int = 50,
|
|
sort_by: str = "created_at",
|
|
sort_order: str = "desc",
|
|
email_filter: str | None = None,
|
|
date_from: str | None = None,
|
|
date_to: str | None = None,
|
|
) -> Tuple[list[dict], int]:
|
|
"""Retrieve contact submissions with pagination, filtering, and sorting."""
|
|
use_pg = is_postgres_enabled()
|
|
offset = (page - 1) * per_page
|
|
|
|
# Build WHERE clause
|
|
where_conditions = []
|
|
params = []
|
|
|
|
if email_filter:
|
|
where_conditions.append("email LIKE ?")
|
|
params.append(f"%{email_filter}%")
|
|
|
|
if date_from:
|
|
where_conditions.append("created_at >= ?")
|
|
params.append(date_from)
|
|
|
|
if date_to:
|
|
where_conditions.append("created_at <= ?")
|
|
params.append(date_to)
|
|
|
|
where_clause = "WHERE " + \
|
|
" AND ".join(where_conditions) if where_conditions else ""
|
|
|
|
# Build ORDER BY clause
|
|
valid_sort_fields = {"id", "name", "email", "created_at"}
|
|
if sort_by not in valid_sort_fields:
|
|
sort_by = "created_at"
|
|
|
|
sort_order = "DESC" if sort_order.lower() == "desc" else "ASC"
|
|
order_clause = f"ORDER BY {sort_by} {sort_order}"
|
|
|
|
# Get total count
|
|
count_query = f"SELECT COUNT(*) FROM contact {where_clause}"
|
|
with db_cursor(read_only=True) as (_, cur):
|
|
if use_pg:
|
|
# Convert ? to %s for PostgreSQL
|
|
count_query = count_query.replace("?", "%s")
|
|
cur.execute(count_query, params)
|
|
total = cur.fetchone()[0]
|
|
|
|
# Get paginated results
|
|
select_query = f"""
|
|
SELECT id, name, email, company, message, timeline, created_at
|
|
FROM contact {where_clause} {order_clause}
|
|
LIMIT ? OFFSET ?
|
|
"""
|
|
params.extend([per_page, offset])
|
|
|
|
contacts = []
|
|
with db_cursor(read_only=True) as (_, cur):
|
|
if use_pg:
|
|
# Convert ? to %s for PostgreSQL and handle LIMIT/OFFSET
|
|
select_query = select_query.replace("?", "%s")
|
|
select_query = select_query.replace(
|
|
"LIMIT %s OFFSET %s", "LIMIT %s OFFSET %s")
|
|
cur.execute(select_query, params)
|
|
|
|
if use_pg:
|
|
rows = cur.fetchall()
|
|
else:
|
|
rows = cur.fetchall()
|
|
|
|
for row in rows:
|
|
contacts.append({
|
|
"id": row[0],
|
|
"name": row[1],
|
|
"email": row[2],
|
|
"company": row[3],
|
|
"message": row[4],
|
|
"timeline": row[5],
|
|
"created_at": row[6],
|
|
})
|
|
|
|
return contacts, total
|
|
|
|
|
|
def get_subscribers(
|
|
page: int = 1,
|
|
per_page: int = 50,
|
|
sort_by: str = "subscribed_at",
|
|
sort_order: str = "desc",
|
|
email_filter: str | None = None,
|
|
date_from: str | None = None,
|
|
date_to: str | None = None,
|
|
) -> Tuple[list[dict], int]:
|
|
"""Retrieve newsletter subscribers with pagination, filtering, and sorting."""
|
|
use_pg = is_postgres_enabled()
|
|
offset = (page - 1) * per_page
|
|
|
|
# Build WHERE clause
|
|
where_conditions = []
|
|
params = []
|
|
|
|
if email_filter:
|
|
where_conditions.append("email LIKE ?")
|
|
params.append(f"%{email_filter}%")
|
|
|
|
if date_from:
|
|
where_conditions.append("subscribed_at >= ?")
|
|
params.append(date_from)
|
|
|
|
if date_to:
|
|
where_conditions.append("subscribed_at <= ?")
|
|
params.append(date_to)
|
|
|
|
where_clause = "WHERE " + \
|
|
" AND ".join(where_conditions) if where_conditions else ""
|
|
|
|
# Build ORDER BY clause
|
|
valid_sort_fields = {"email", "subscribed_at"}
|
|
if sort_by not in valid_sort_fields:
|
|
sort_by = "subscribed_at"
|
|
|
|
sort_order = "DESC" if sort_order.lower() == "desc" else "ASC"
|
|
order_clause = f"ORDER BY {sort_by} {sort_order}"
|
|
|
|
# Get total count
|
|
count_query = f"SELECT COUNT(*) FROM subscribers {where_clause}"
|
|
with db_cursor(read_only=True) as (_, cur):
|
|
if use_pg:
|
|
# Convert ? to %s for PostgreSQL
|
|
count_query = count_query.replace("?", "%s")
|
|
cur.execute(count_query, params)
|
|
total = cur.fetchone()[0]
|
|
|
|
# Get paginated results
|
|
select_query = f"""
|
|
SELECT email, subscribed_at
|
|
FROM subscribers {where_clause} {order_clause}
|
|
LIMIT ? OFFSET ?
|
|
"""
|
|
params.extend([per_page, offset])
|
|
|
|
subscribers = []
|
|
with db_cursor(read_only=True) as (_, cur):
|
|
if use_pg:
|
|
# Convert ? to %s for PostgreSQL and handle LIMIT/OFFSET
|
|
select_query = select_query.replace("?", "%s")
|
|
select_query = select_query.replace(
|
|
"LIMIT %s OFFSET %s", "LIMIT %s OFFSET %s")
|
|
cur.execute(select_query, params)
|
|
|
|
rows = cur.fetchall()
|
|
for row in rows:
|
|
subscribers.append({
|
|
"email": row[0],
|
|
"subscribed_at": row[1],
|
|
})
|
|
|
|
return subscribers, total
|
|
|
|
|
|
def delete_contact(contact_id: int) -> bool:
|
|
"""Delete a contact submission by ID. Returns True if deleted."""
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor() as (_, cur):
|
|
if use_pg:
|
|
cur.execute("DELETE FROM contact WHERE id = %s", (contact_id,))
|
|
else:
|
|
cur.execute("DELETE FROM contact WHERE id = ?", (contact_id,))
|
|
return cur.rowcount > 0
|
|
except Exception as exc:
|
|
logging.exception("Failed to delete contact: %s", exc)
|
|
raise
|
|
|
|
|
|
def get_app_settings() -> dict[str, str]:
|
|
"""Retrieve all application settings as a dictionary."""
|
|
settings_dict = {}
|
|
with db_cursor(read_only=True) as (_, cur):
|
|
cur.execute("SELECT key, value FROM app_settings ORDER BY key")
|
|
rows = cur.fetchall()
|
|
for row in rows:
|
|
settings_dict[row[0]] = row[1]
|
|
return settings_dict
|
|
|
|
|
|
def update_app_setting(key: str, value: str) -> bool:
|
|
"""Update or insert an application setting. Returns True on success."""
|
|
from datetime import datetime, timezone
|
|
updated_at = datetime.now(timezone.utc).isoformat()
|
|
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor() as (_, cur):
|
|
if use_pg:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO app_settings (key, value, updated_at)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (key) DO UPDATE SET
|
|
value = EXCLUDED.value,
|
|
updated_at = EXCLUDED.updated_at
|
|
""",
|
|
(key, value, updated_at),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO app_settings (key, value, updated_at)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(key, value, updated_at),
|
|
)
|
|
return True
|
|
except Exception as exc:
|
|
logging.exception("Failed to update app setting: %s", exc)
|
|
raise
|
|
|
|
|
|
def delete_app_setting(key: str) -> bool:
|
|
"""Delete an application setting. Returns True if deleted."""
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor() as (_, cur):
|
|
if use_pg:
|
|
cur.execute("DELETE FROM app_settings WHERE key = %s", (key,))
|
|
else:
|
|
cur.execute("DELETE FROM app_settings WHERE key = ?", (key,))
|
|
return cur.rowcount > 0
|
|
except Exception as exc:
|
|
logging.exception("Failed to delete app setting: %s", exc)
|
|
raise
|
|
|
|
|
|
def save_newsletter(subject: str, content: str, sender_name: str | None = None, send_date: str | None = None, status: str = "draft") -> int:
|
|
"""Save a newsletter and return its ID."""
|
|
from datetime import datetime, timezone
|
|
created_at = datetime.now(timezone.utc).isoformat()
|
|
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor() as (_, cur):
|
|
if use_pg:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO newsletters (subject, content, sender_name, send_date, status, created_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s) RETURNING id
|
|
""",
|
|
(subject, content, sender_name, send_date, status, created_at),
|
|
)
|
|
newsletter_id = cur.fetchone()[0]
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO newsletters (subject, content, sender_name, send_date, status, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(subject, content, sender_name, send_date, status, created_at),
|
|
)
|
|
newsletter_id = cur.lastrowid
|
|
return newsletter_id
|
|
except Exception as exc:
|
|
logging.exception("Failed to save newsletter: %s", exc)
|
|
raise
|
|
|
|
|
|
def get_newsletters(page: int = 1, per_page: int = 20, status_filter: str | None = None) -> tuple[list[dict], int]:
|
|
"""Get newsletters with pagination and optional status filtering."""
|
|
use_pg = is_postgres_enabled()
|
|
newsletters = []
|
|
total = 0
|
|
|
|
offset = (page - 1) * per_page
|
|
|
|
try:
|
|
with db_cursor(read_only=True) as (_, cur):
|
|
# Get total count
|
|
count_query = "SELECT COUNT(*) FROM newsletters"
|
|
count_params = []
|
|
|
|
if status_filter:
|
|
count_query += " WHERE status = %s" if use_pg else " WHERE status = ?"
|
|
count_params.append(status_filter)
|
|
|
|
cur.execute(count_query, count_params)
|
|
total = cur.fetchone()[0]
|
|
|
|
# Get newsletters
|
|
select_query = """
|
|
SELECT id, subject, sender_name, send_date, status, created_at, sent_at
|
|
FROM newsletters
|
|
"""
|
|
params = []
|
|
|
|
if status_filter:
|
|
select_query += " WHERE status = %s" if use_pg else " WHERE status = ?"
|
|
params.append(status_filter)
|
|
|
|
select_query += " ORDER BY created_at DESC"
|
|
select_query += " LIMIT %s OFFSET %s" if use_pg else " LIMIT ? OFFSET ?"
|
|
params.extend([per_page, offset])
|
|
|
|
cur.execute(select_query, params)
|
|
|
|
rows = cur.fetchall()
|
|
for row in rows:
|
|
newsletters.append({
|
|
"id": row[0],
|
|
"subject": row[1],
|
|
"sender_name": row[2],
|
|
"send_date": row[3],
|
|
"status": row[4],
|
|
"created_at": row[5],
|
|
"sent_at": row[6],
|
|
})
|
|
|
|
except Exception as exc:
|
|
logging.exception("Failed to get newsletters: %s", exc)
|
|
raise
|
|
|
|
return newsletters, total
|
|
|
|
|
|
def update_newsletter_status(newsletter_id: int, status: str, sent_at: str | None = None) -> bool:
|
|
"""Update newsletter status and optionally sent_at timestamp."""
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor() as (_, cur):
|
|
if sent_at:
|
|
if use_pg:
|
|
cur.execute(
|
|
"UPDATE newsletters SET status = %s, sent_at = %s WHERE id = %s",
|
|
(status, sent_at, newsletter_id),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"UPDATE newsletters SET status = ?, sent_at = ? WHERE id = ?",
|
|
(status, sent_at, newsletter_id),
|
|
)
|
|
else:
|
|
if use_pg:
|
|
cur.execute(
|
|
"UPDATE newsletters SET status = %s WHERE id = %s",
|
|
(status, newsletter_id),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"UPDATE newsletters SET status = ? WHERE id = ?",
|
|
(status, newsletter_id),
|
|
)
|
|
return cur.rowcount > 0
|
|
except Exception as exc:
|
|
logging.exception("Failed to update newsletter status: %s", exc)
|
|
raise
|
|
|
|
|
|
def get_newsletter_by_id(newsletter_id: int) -> dict | None:
|
|
"""Get a specific newsletter by ID."""
|
|
use_pg = is_postgres_enabled()
|
|
try:
|
|
with db_cursor(read_only=True) as (_, cur):
|
|
if use_pg:
|
|
cur.execute(
|
|
"SELECT id, subject, content, sender_name, send_date, status, created_at, sent_at FROM newsletters WHERE id = %s",
|
|
(newsletter_id,),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"SELECT id, subject, content, sender_name, send_date, status, created_at, sent_at FROM newsletters WHERE id = ?",
|
|
(newsletter_id,),
|
|
)
|
|
|
|
row = cur.fetchone()
|
|
if row:
|
|
return {
|
|
"id": row[0],
|
|
"subject": row[1],
|
|
"content": row[2],
|
|
"sender_name": row[3],
|
|
"send_date": row[4],
|
|
"status": row[5],
|
|
"created_at": row[6],
|
|
"sent_at": row[7],
|
|
}
|
|
except Exception as exc:
|
|
logging.exception("Failed to get newsletter by ID: %s", exc)
|
|
raise
|
|
|
|
return None
|