"""Business logic for newsletter subscriptions.""" from __future__ import annotations from datetime import datetime, timezone from typing import cast from ..database import save_subscriber, delete_subscriber, update_subscriber from ..utils import is_valid_email from .email_settings import load_effective_smtp_settings from .email_templates import get_template_content def validate_email(email: str) -> bool: """Return True when the provided email passes a basic sanity check.""" return is_valid_email(email) def subscribe(email: str) -> bool: """Persist the subscription and return False when it already exists.""" created_at = datetime.now(timezone.utc).isoformat() return save_subscriber(email, created_at=created_at) def unsubscribe(email: str) -> bool: """Remove the subscription and return True if it existed.""" return delete_subscriber(email) def update_email(old_email: str, new_email: str) -> bool: """Update the email for a subscription. Return True if updated.""" return update_subscriber(old_email, new_email) def send_subscription_confirmation(email: str) -> bool: """Send a confirmation email to the subscriber.""" import logging smtp_config = load_effective_smtp_settings() if not smtp_config.get("notify_newsletter_signups"): logging.info( "Newsletter signup notifications disabled; skipping email") return False host = smtp_config.get("host") port = int(smtp_config.get("port") or 0) username = smtp_config.get("username") password = smtp_config.get("password") or "" if not host: logging.info("SMTP not configured; skipping confirmation email") return False # Get template content (persisted or default) template = get_template_content("newsletter_confirmation") try: import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart sender = smtp_config.get("sender") or "noreply@example.com" # Create message msg = MIMEMultipart('alternative') msg['Subject'] = "Newsletter Subscription Confirmation" msg['From'] = sender msg['To'] = email # Add HTML content html_part = MIMEText(template, 'html') msg.attach(html_part) # Send email with smtplib.SMTP(cast(str, host), port, timeout=15) as server: if smtp_config.get("use_tls"): server.starttls() if username: server.login(username, password) server.send_message(msg) logging.info("Confirmation email sent to %s", email) return True except Exception as exc: logging.exception( "Failed to send confirmation email to %s: %s", email, exc) return False def send_newsletter_to_subscribers(subject: str, content: str, emails: list[str], sender_name: str | None = None) -> int: """Send newsletter to list of email addresses. Returns count of successful sends.""" import logging smtp_config = load_effective_smtp_settings() host = smtp_config.get("host") port = int(smtp_config.get("port") or 0) username = smtp_config.get("username") password = smtp_config.get("password") if not host: logging.error("SMTP not configured, cannot send newsletter") return 0 try: import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # Create message msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = smtp_config.get("sender") or "noreply@example.com" # Format content formatted_content = content.replace('\n', '
') html_content = f""" {formatted_content} """ # Add HTML content html_part = MIMEText(html_content, 'html') msg.attach(html_part) # Send to each recipient individually for better deliverability success_count = 0 with smtplib.SMTP(cast(str, host), port) as server: if smtp_config.get("use_tls"): server.starttls() if username and password is not None: server.login(username, password) for email in emails: try: # Create a fresh copy for each recipient recipient_msg = MIMEMultipart('alternative') recipient_msg['Subject'] = subject recipient_msg['From'] = msg['From'] recipient_msg['To'] = email # Add HTML content recipient_msg.attach(MIMEText(html_content, 'html')) server.sendmail(msg['From'], email, recipient_msg.as_string()) success_count += 1 except Exception as exc: logging.exception( "Failed to send newsletter to %s: %s", email, exc) return success_count except Exception as exc: logging.exception("Failed to send newsletter: %s", exc) return 0