"""Business logic for newsletter subscriptions."""
from __future__ import annotations
from datetime import datetime, timezone
from ..database import save_subscriber, delete_subscriber, update_subscriber
from ..utils import is_valid_email
def validate_email(email: str) -> bool:
"""Return True when the provided email passes a basic sanity check."""
return is_valid_email(email)
def subscribe(email: str) -> bool:
"""Persist the subscription and return False when it already exists."""
created_at = datetime.now(timezone.utc).isoformat()
return save_subscriber(email, created_at=created_at)
def unsubscribe(email: str) -> bool:
"""Remove the subscription and return True if it existed."""
return delete_subscriber(email)
def update_email(old_email: str, new_email: str) -> bool:
"""Update the email for a subscription. Return True if updated."""
return update_subscriber(old_email, new_email)
def send_subscription_confirmation(email: str) -> bool:
"""Send a confirmation email to the subscriber."""
import logging
from .. import settings
if not settings.SMTP_SETTINGS["host"]:
logging.info("SMTP not configured; skipping confirmation email")
return False
# Get template from settings or default
template = getattr(settings, 'NEWSLETTER_CONFIRMATION_TEMPLATE', """
Welcome to our Newsletter!
Thank you for subscribing to our newsletter. You're now part of our community and will receive updates on our latest news and offers.
If you wish to unsubscribe at any time, you can do so by visiting our subscription management page.
Best regards,
The Team
""").strip()
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
sender = settings.SMTP_SETTINGS["sender"] or "noreply@example.com"
# Create message
msg = MIMEMultipart('alternative')
msg['Subject'] = "Newsletter Subscription Confirmation"
msg['From'] = sender
msg['To'] = email
# Add HTML content
html_part = MIMEText(template, 'html')
msg.attach(html_part)
# Send email
with smtplib.SMTP(settings.SMTP_SETTINGS["host"], settings.SMTP_SETTINGS["port"], timeout=15) as server:
if settings.SMTP_SETTINGS["use_tls"]:
server.starttls()
if settings.SMTP_SETTINGS["username"]:
server.login(
settings.SMTP_SETTINGS["username"], settings.SMTP_SETTINGS["password"] or "")
server.send_message(msg)
logging.info("Confirmation email sent to %s", email)
return True
except Exception as exc:
logging.exception("Failed to send confirmation email to %s: %s", email, exc)
return False
def send_newsletter_to_subscribers(subject: str, content: str, emails: list[str], sender_name: str | None = None) -> int:
"""Send newsletter to list of email addresses. Returns count of successful sends."""
import logging
from .. import settings
if not settings.SMTP_SETTINGS["host"]:
logging.error("SMTP not configured, cannot send newsletter")
return 0
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Create message
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = settings.SMTP_SETTINGS["sender"] or "noreply@example.com"
# Format content
formatted_content = content.replace('\n', '
')
html_content = f"""
{formatted_content}
"""
# Add HTML content
html_part = MIMEText(html_content, 'html')
msg.attach(html_part)
# Send to each recipient individually for better deliverability
success_count = 0
with smtplib.SMTP(settings.SMTP_SETTINGS["host"], settings.SMTP_SETTINGS["port"]) as server:
if settings.SMTP_SETTINGS["use_tls"]:
server.starttls()
if settings.SMTP_SETTINGS["username"] and settings.SMTP_SETTINGS["password"]:
server.login(
settings.SMTP_SETTINGS["username"], settings.SMTP_SETTINGS["password"])
for email in emails:
try:
# Create a fresh copy for each recipient
recipient_msg = MIMEMultipart('alternative')
recipient_msg['Subject'] = subject
recipient_msg['From'] = msg['From']
recipient_msg['To'] = email
# Add HTML content
recipient_msg.attach(MIMEText(html_content, 'html'))
server.sendmail(msg['From'], email,
recipient_msg.as_string())
success_count += 1
except Exception as exc:
logging.exception(
"Failed to send newsletter to %s: %s", email, exc)
return success_count
except Exception as exc:
logging.exception("Failed to send newsletter: %s", exc)
return 0