"""Admin routes for application management.""" from __future__ import annotations from flask import Blueprint, render_template, jsonify, request import logging from .. import auth, settings from ..database import delete_app_setting, get_app_settings, get_subscribers, update_app_setting bp = Blueprint("admin", __name__, url_prefix="/admin") @bp.route("/") @auth.login_required def dashboard(): """Display admin dashboard overview.""" return render_template("admin_dashboard.html") @bp.route("/newsletter") @auth.login_required def newsletter_subscribers(): """Display newsletter subscriber management page.""" return render_template("admin_newsletter.html") @bp.route("/newsletter/create") @auth.login_required def newsletter_create(): """Display newsletter creation and sending page.""" return render_template("admin_newsletter_create.html") @bp.route("/settings") @auth.login_required def settings_page(): """Display current application settings.""" # Gather settings to display app_settings = { "Database": { "DATABASE_URL": settings.DATABASE_URL or "sqlite:///./data/forms.db", "POSTGRES_URL": settings.POSTGRES_URL or "Not configured", "SQLite Path": str(settings.SQLITE_DB_PATH), }, "SMTP": { "Host": settings.SMTP_SETTINGS["host"] or "Not configured", "Port": settings.SMTP_SETTINGS["port"], "Username": settings.SMTP_SETTINGS["username"] or "Not configured", "Sender": settings.SMTP_SETTINGS["sender"] or "Not configured", "Recipients": ", ".join(settings.SMTP_SETTINGS["recipients"]) if settings.SMTP_SETTINGS["recipients"] else "Not configured", "Use TLS": settings.SMTP_SETTINGS["use_tls"], }, "Rate Limiting": { "Max Requests": settings.RATE_LIMIT_MAX, "Window (seconds)": settings.RATE_LIMIT_WINDOW, "Redis URL": settings.REDIS_URL or "Not configured", }, "Security": { "Strict Origin Check": settings.STRICT_ORIGIN_CHECK, "Allowed Origin": settings.ALLOWED_ORIGIN or "Not configured", }, "Logging": { "JSON Logs": settings.ENABLE_JSON_LOGS, "Request Logs": settings.ENABLE_REQUEST_LOGS, }, "Monitoring": { "Sentry DSN": settings.SENTRY_DSN or "Not configured", "Sentry Traces Sample Rate": settings.SENTRY_TRACES_SAMPLE_RATE, }, "Admin": { "Username": settings.ADMIN_USERNAME, }, } return render_template("admin_settings.html", settings=app_settings) @bp.route("/submissions") @auth.login_required def submissions(): """Display contact form submissions page.""" return render_template("admin_submissions.html") @bp.route("/embeds") @auth.login_required def embeds(): """Display embeddable forms management page.""" return render_template("admin_embeds.html") @bp.route("/api/settings", methods=["GET"]) @auth.login_required def get_settings_api(): """Get all application settings via API.""" try: settings_data = get_app_settings() return jsonify({"status": "ok", "settings": settings_data}) except Exception as exc: logging.exception("Failed to retrieve settings: %s", exc) return jsonify({"status": "error", "message": "Failed to retrieve settings."}), 500 def validate_setting(key: str, value: str) -> str | None: """Validate a setting key-value pair. Returns error message or None if valid.""" # Define validation rules for known settings validations = { "maintenance_mode": lambda v: v in ["true", "false"], "contact_form_enabled": lambda v: v in ["true", "false"], "newsletter_enabled": lambda v: v in ["true", "false"], "rate_limit_max": lambda v: v.isdigit() and 0 <= int(v) <= 1000, "rate_limit_window": lambda v: v.isdigit() and 1 <= int(v) <= 3600, } if key in validations and not validations[key](value): return f"Invalid value for {key}" # General validation if len(key) > 100: return "Setting key too long (max 100 characters)" if len(value) > 1000: return "Setting value too long (max 1000 characters)" return None @bp.route("/api/settings/", methods=["PUT"]) @auth.login_required def update_setting_api(key: str): """Update a specific application setting via API.""" try: data = request.get_json(silent=True) or {} value = data.get("value", "").strip() if not value: return jsonify({"status": "error", "message": "Value is required."}), 400 # Validate the setting validation_error = validate_setting(key, value) if validation_error: return jsonify({"status": "error", "message": validation_error}), 400 success = update_app_setting(key, value) if success: return jsonify({"status": "ok", "message": f"Setting '{key}' updated successfully."}) else: return jsonify({"status": "error", "message": "Failed to update setting."}), 500 except Exception as exc: logging.exception("Failed to update setting: %s", exc) return jsonify({"status": "error", "message": "Failed to update setting."}), 500 @bp.route("/api/settings/", methods=["DELETE"]) @auth.login_required def delete_setting_api(key: str): """Delete a specific application setting via API.""" try: deleted = delete_app_setting(key) if deleted: return jsonify({"status": "ok", "message": f"Setting '{key}' deleted successfully."}) else: return jsonify({"status": "error", "message": f"Setting '{key}' not found."}), 404 except Exception as exc: logging.exception("Failed to delete setting: %s", exc) return jsonify({"status": "error", "message": "Failed to delete setting."}), 500 @bp.route("/api/newsletter", methods=["GET"]) @auth.login_required def get_subscribers_api(): """Retrieve newsletter subscribers with pagination, filtering, and sorting.""" try: # Parse query parameters page = int(request.args.get("page", 1)) per_page = min(int(request.args.get("per_page", 50)), 100) # Max 100 per page sort_by = request.args.get("sort_by", "subscribed_at") sort_order = request.args.get("sort_order", "desc") email_filter = request.args.get("email") # Validate sort_by valid_sort_fields = ["email", "subscribed_at"] if sort_by not in valid_sort_fields: sort_by = "subscribed_at" # Get subscribers subscribers, total = get_subscribers( page=page, per_page=per_page, sort_by=sort_by, sort_order=sort_order, email_filter=email_filter, ) return jsonify({ "status": "ok", "subscribers": subscribers, "pagination": { "page": page, "per_page": per_page, "total": total, "pages": (total + per_page - 1) // per_page, }, }) except Exception as exc: logging.exception("Failed to retrieve subscribers: %s", exc) return jsonify({"status": "error", "message": "Failed to retrieve subscribers."}), 500 @bp.route("/api/newsletters", methods=["POST"]) @auth.login_required def create_newsletter_api(): """Create a new newsletter.""" try: data = request.get_json(silent=True) or {} subject = data.get("subject", "").strip() content = data.get("content", "").strip() sender_name = data.get("sender_name", "").strip() or None send_date = data.get("send_date", "").strip() or None status = data.get("status", "draft") if not subject or not content: return jsonify({"status": "error", "message": "Subject and content are required."}), 400 if status not in ["draft", "scheduled", "sent"]: return jsonify({"status": "error", "message": "Invalid status."}), 400 from ..database import save_newsletter newsletter_id = save_newsletter( subject, content, sender_name, send_date, status) return jsonify({ "status": "ok", "message": "Newsletter created successfully.", "newsletter_id": newsletter_id }), 201 except Exception as exc: logging.exception("Failed to create newsletter: %s", exc) return jsonify({"status": "error", "message": "Failed to create newsletter."}), 500 @bp.route("/api/newsletters", methods=["GET"]) @auth.login_required def get_newsletters_api(): """Retrieve newsletters with pagination and filtering.""" try: page = int(request.args.get("page", 1)) per_page = min(int(request.args.get("per_page", 20)), 50) # Max 50 per page status_filter = request.args.get("status") from ..database import get_newsletters newsletters, total = get_newsletters( page=page, per_page=per_page, status_filter=status_filter) return jsonify({ "status": "ok", "newsletters": newsletters, "pagination": { "page": page, "per_page": per_page, "total": total, "pages": (total + per_page - 1) // per_page, }, }) except Exception as exc: logging.exception("Failed to retrieve newsletters: %s", exc) return jsonify({"status": "error", "message": "Failed to retrieve newsletters."}), 500 @bp.route("/api/newsletters//send", methods=["POST"]) @auth.login_required def send_newsletter_api(newsletter_id: int): """Send a newsletter to all subscribers.""" try: from ..database import get_newsletter_by_id, update_newsletter_status, get_subscribers from ..services.newsletter import send_newsletter_to_subscribers from datetime import datetime, timezone # Get the newsletter newsletter = get_newsletter_by_id(newsletter_id) if not newsletter: return jsonify({"status": "error", "message": "Newsletter not found."}), 404 if newsletter["status"] == "sent": return jsonify({"status": "error", "message": "Newsletter has already been sent."}), 400 # Get all subscribers subscribers, _ = get_subscribers( page=1, per_page=10000) # Get all subscribers if not subscribers: return jsonify({"status": "error", "message": "No subscribers found."}), 400 # Send the newsletter success_count = send_newsletter_to_subscribers( newsletter["subject"], newsletter["content"], [sub["email"] for sub in subscribers], newsletter["sender_name"] ) # Update newsletter status sent_at = datetime.now(timezone.utc).isoformat() update_newsletter_status(newsletter_id, "sent", sent_at) return jsonify({ "status": "ok", "message": f"Newsletter sent to {success_count} subscribers.", "sent_count": success_count }) except Exception as exc: logging.exception("Failed to send newsletter: %s", exc) return jsonify({"status": "error", "message": "Failed to send newsletter."}), 500 @bp.route("/api/contact", methods=["GET"]) @auth.login_required def get_contact_submissions_api(): """Retrieve contact form submissions with pagination, filtering, and sorting.""" try: # Parse query parameters page = int(request.args.get("page", 1)) per_page = min(int(request.args.get("per_page", 50)), 100) # Max 100 per page sort_by = request.args.get("sort_by", "created_at") sort_order = request.args.get("sort_order", "desc") email_filter = request.args.get("email") date_from = request.args.get("date_from") date_to = request.args.get("date_to") # Validate sort_by valid_sort_fields = ["id", "name", "email", "created_at"] if sort_by not in valid_sort_fields: sort_by = "created_at" # Get submissions from ..database import get_contacts submissions, total = get_contacts( page=page, per_page=per_page, sort_by=sort_by, sort_order=sort_order, email_filter=email_filter, date_from=date_from, date_to=date_to, ) return jsonify({ "status": "ok", "submissions": submissions, "pagination": { "page": page, "per_page": per_page, "total": total, "pages": (total + per_page - 1) // per_page, }, }) except Exception as exc: logging.exception("Failed to retrieve contact submissions: %s", exc) return jsonify({"status": "error", "message": "Failed to retrieve contact submissions."}), 500 @bp.route("/api/contact/", methods=["DELETE"]) @auth.login_required def delete_contact_submission_api(contact_id: int): """Delete a contact submission by ID.""" try: from ..database import delete_contact deleted = delete_contact(contact_id) if deleted: return jsonify({"status": "ok", "message": f"Contact submission {contact_id} deleted successfully."}) else: return jsonify({"status": "error", "message": f"Contact submission {contact_id} not found."}), 404 except Exception as exc: logging.exception("Failed to delete contact submission: %s", exc) return jsonify({"status": "error", "message": "Failed to delete contact submission."}), 500