v1
Some checks failed
CI / test (3.11) (push) Failing after 5m36s
CI / build-image (push) Has been skipped

This commit is contained in:
2025-10-22 16:48:55 +02:00
commit 4cefd4e3ab
53 changed files with 5837 additions and 0 deletions

9
.dockerignore Normal file
View File

@@ -0,0 +1,9 @@
*.pyc
__pycache__/
.venv/
.env
*.db
.git/
.DS_Store
docs/
*.log

49
.env.example Normal file
View File

@@ -0,0 +1,49 @@
# Flask configuration
FLASK_SECRET_KEY=change-me
# Admin authentication
ADMIN_USERNAME=admin
ADMIN_PASSWORD=change-me
# Logging
ENABLE_JSON_LOGS=false
ENABLE_REQUEST_LOGS=true
# SMTP configuration for notification emails
SMTP_HOST=smtp.example.com
SMTP_PORT=465
SMTP_USERNAME=your-username
SMTP_PASSWORD=your-password
SMTP_SENDER=web@example.com
SMTP_RECIPIENTS=team@example.com
SMTP_USE_TLS=true
# Set to 1 to enable SMTP integration tests during CI/CD (requires valid SMTP settings)
RUN_SMTP_INTEGRATION_TEST=0
# database configuration (SQLite by default, POSTGRES_URL for PostgreSQL)
DATABASE_URL=sqlite:///./forms.db
POSTGRES_URL=postgresql://user:password@hostname:5432/dbname
# Rate limiting (submissions per window)
RATE_LIMIT_MAX=10
RATE_LIMIT_WINDOW=60
# Optional Redis for distributed rate limiting (leave empty to use in-memory limiter)
REDIS_URL=
# Origin checking (optional)
STRICT_ORIGIN_CHECK=false
ALLOWED_ORIGIN=
# Sentry (optional)
SENTRY_DSN=
SENTRY_TRACES_SAMPLE_RATE=0.0
# Gunicorn tuning (used in Docker runtime)
GUNICORN_WORKERS=
GUNICORN_TIMEOUT=30
# Registry used by CI (set on GitHub as repository secrets, not required locally)
REGISTRY_URL=git.allucanget.biz
REGISTRY_USERNAME=
REGISTRY_PASSWORD=

84
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,84 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.11]
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Cache pip
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
- name: Run tests
run: |
pytest -q tests
- name: Upload test results (artifact)
if: always()
uses: actions/upload-artifact@v4
with:
name: pytest-results
path: tests
build-image:
if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
needs: test
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU and Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to registry (best-effort)
if: ${{ github.ref == 'refs/heads/main' }}
uses: docker/login-action@v3
continue-on-error: true
with:
registry: ${{ secrets.REGISTRY_URL }}
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Build (and optionally push) image
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile
push: ${{ github.ref == 'refs/heads/main' && github.event_name != 'pull_request' && (secrets.REGISTRY_URL != '' && secrets.REGISTRY_USERNAME != '' && secrets.REGISTRY_PASSWORD != '') }}
tags: |
${{ secrets.REGISTRY_URL }}/allucanget/contact.allucanget.biz:latest
${{ secrets.REGISTRY_URL }}/allucanget/contact.allucanget.biz:${{ github.sha }}
- name: Upload built image metadata
if: always()
uses: actions/upload-artifact@v4
with:
name: image-build-info
path: .

39
.gitignore vendored Normal file
View File

@@ -0,0 +1,39 @@
# python
__pycache__/
*.py[cod]
*.pyo
*.pyd
.Python
# virtual environment
.venv/
# distribution / packaging
dist/
build/
*.egg-info/
# data folder
data/
data/*.db
# logs
*.log
# .env files
.env
server/.env
# test folders
.pytest_cache/
tests/__pycache__/
tests/*.db
# coverage reports
htmlcov/
.coverage
# instructions
.github/instructions/
# IDEs
.vscode/
.idea/

59
Dockerfile Normal file
View File

@@ -0,0 +1,59 @@
FROM python:3.11-slim AS builder
WORKDIR /app
# Install build deps
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install into a target directory
COPY /requirements.txt /app/requirements.txt
RUN python -m pip install --upgrade pip && \
# install into a prefix so console_scripts (gunicorn) are placed into /app/_deps/bin
python -m pip install --no-cache-dir --upgrade --prefix /app/_deps -r /app/requirements.txt
COPY . /app/src
FROM python:3.11-slim
WORKDIR /app
# Create non-root user
RUN addgroup --system appgroup && adduser --system --ingroup appgroup appuser
# Copy installed deps from builder
COPY --from=builder /app/_deps /app/_deps
ENV PYTHONPATH=/app/_deps/lib/python3.11/site-packages:/app
ENV PATH=/app/_deps/bin:$PATH
# Copy application code
COPY --from=builder /app/src /app
# Copy entrypoint and make executable
COPY /entrypoint.sh /app/entrypoint.sh
RUN chmod +x /app/entrypoint.sh
# Ensure minimal runtime packages are present (curl used by healthcheck and some runtime scripts)
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
ca-certificates \
&& rm -rf /var/lib/apt/lists/* \
&& mkdir -p /app/data \
&& chown -R appuser:appgroup /app/data
USER appuser
ENV FLASK_APP=app.py
ENV FLASK_RUN_HOST=0.0.0.0
ENV PYTHONUNBUFFERED=1
ENV GUNICORN_WORKERS=2
ENV GUNICORN_TIMEOUT=30
EXPOSE 5002
# Docker HEALTHCHECK: check the /health endpoint
HEALTHCHECK --interval=30s --timeout=5s --retries=3 CMD curl -f http://localhost:5002/health || exit 1
# Default to the entrypoint script which computes worker count if not provided
ENTRYPOINT ["/app/entrypoint.sh"]

204
README.md Normal file
View File

@@ -0,0 +1,204 @@
# Server README
Backend service for the contact website. The app accepts contact and newsletter submissions, persists them, applies rate limiting and origin checks, and sends notification emails when SMTP is configured. Includes admin authentication for accessing application settings and managing dynamic configuration.
## Overview
- Flask application exposed through `app.py` (development) and Gunicorn (container runtime)
- Service and blueprint architecture under `server/` for HTTP routes, business logic, and observability
- SQLite by default with optional PostgreSQL support and Redis-backed distributed rate limiting
- Docker- and docker-compose-friendly with health checks and environment-driven configuration
## Architecture
- `app.py`: simple shim that imports the Flask app for local development.
- `server/`: application package.
- `factory.py`: application factory that wires logging, middleware, routes, and database initialisation.
- `routes/`: Flask blueprints for `contact`, `newsletter`, `monitoring`, `auth`, and `admin` endpoints.
- `services/`: reusable business logic for persisting submissions and sending notifications.
- `database.py`: SQLite/PostgreSQL helpers and connection management.
- `middleware.py`, `rate_limit.py`, `metrics.py`: request guards, throttling, and Prometheus-style instrumentation.
- `settings.py`: environment-driven configuration loader.
- `auth.py`: authentication utilities and login required decorator.
- `templates/`: Jinja2 templates for HTML pages (login, admin dashboard, newsletter management, settings).
- `entrypoint.sh`: container entrypoint that tunes Gunicorn worker count and timeout before booting the WSGI app.
- `Dockerfile`: multi-stage build that installs requirements in a builder image and runs the app behind Gunicorn.
- `docker-compose.yml`: local stack for the API with bind-mounted SQLite data.
- `docker-compose.redis.yml`: optional Redis sidecar for distributed rate limiting tests.
- `tests/`: pytest suite covering API behaviour, services, metrics, and SMTP integration (opt-in).
## Quick Start
1. Create a virtual environment and install dependencies:
```pwsh
python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install -r requirements.txt
```
2. Copy the sample environment file and adjust values:
```pwsh
Copy-Item .env.example .env
```
3. Run the development server:
```pwsh
python app.py
```
The development server listens on `http://127.0.0.1:5002` by default.
### Admin Access
Access the admin interface at `http://127.0.0.1:5002/auth/login` using the configured `ADMIN_USERNAME` and `ADMIN_PASSWORD` (defaults: admin/admin). The admin interface provides a dashboard overview, newsletter subscriber management with search and pagination, newsletter creation and sending capabilities, and dynamic application settings management. The settings page displays current application configuration and allows dynamic management of application settings, while the submissions page allows viewing and managing contact form submissions.
## API Surface
- `POST /api/contact`: accepts `name`, `email`, `message`, and optional `company`, `timeline`.
- `GET /api/contact`: retrieves contact form submissions (admin only, requires authentication). Supports pagination (`page`, `per_page`), filtering (`email`, `date_from`, `date_to`), and sorting (`sort_by`, `sort_order`).
- `GET /api/contact/<id>`: retrieves a specific contact submission by ID (admin only).
- `DELETE /api/contact/<id>`: deletes a contact submission by ID (admin only).
- `POST /api/newsletter`: subscribes an address and optional metadata to the newsletter list.
- `DELETE /api/newsletter`: unsubscribes an email address from the newsletter list.
- `PUT /api/newsletter`: updates a subscriber's email address (requires `old_email` and `new_email`).
- `GET /api/newsletter/manage`: displays HTML form for newsletter subscription management.
- `POST /api/newsletter/manage`: processes subscription management actions (subscribe, unsubscribe, update).
- `GET /health`: lightweight database connectivity check used for container health monitoring.
- `GET /metrics`: Prometheus-compatible metrics endpoint (requires `ENABLE_REQUEST_LOGS` for detailed tracing).
- `GET /admin/api/settings`: retrieves all application settings (admin only).
- `PUT /admin/api/settings/<key>`: updates a specific application setting (admin only).
- `DELETE /admin/api/settings/<key>`: deletes a specific application setting (admin only).
- `GET /admin/api/newsletter`: retrieves newsletter subscribers with pagination, filtering, and sorting (admin only).
- `POST /admin/api/newsletters`: creates a new newsletter (admin only).
- `GET /admin/api/newsletters`: retrieves newsletters with pagination and filtering (admin only).
- `POST /admin/api/newsletters/<id>/send`: sends a newsletter to all subscribers (admin only).
- `GET /admin/api/contact`: retrieves contact form submissions with pagination, filtering, and sorting (admin only).
- `DELETE /admin/api/contact/<id>`: deletes a contact submission by ID (admin only).
## Running With Docker
### Build manually
```pwsh
docker build -t contact.allucanget.biz -f Dockerfile .
```
### Run with explicit environment variables
```pwsh
docker run --rm -p 5002:5002 `
-e FLASK_SECRET_KEY=change-me `
-e SMTP_HOST=smtp.example.com `
-e SMTP_PORT=587 `
-e SMTP_USERNAME=api@example.com `
-e SMTP_PASSWORD=secret `
-e SMTP_RECIPIENTS=hello@example.com `
contact.allucanget.biz
```
### Run using docker-compose
```pwsh
docker-compose up --build
```
- Mounts `./data` into the container for SQLite persistence.
- Exposes port `5002` and wires environment variables from your `.env` file.
To experiment with Redis-backed throttling:
```pwsh
docker-compose -f docker-compose.redis.yml up --build
```
## Environment Variables
See `.env.example` for a complete reference. The most relevant groups are captured below.
### Core runtime
| Variable | Description | Default |
| --------------------- | ------------------------------------------------------------------------------------- | ------- |
| `FLASK_SECRET_KEY` | Secret used by Flask for session signing; set to a strong random value in production. | `dev` |
| `ENABLE_JSON_LOGS` | Emit logs in JSON format when `true`. | `false` |
| `ENABLE_REQUEST_LOGS` | Enable request start/end logging. | `true` |
### Admin authentication
| Variable | Description | Default |
| ---------------- | ------------------------- | ------- |
| `ADMIN_USERNAME` | Username for admin login. | `admin` |
| `ADMIN_PASSWORD` | Password for admin login. | `admin` |
### Database configuration
| Variable | Description | Default |
| -------------- | ---------------------------------------------------------------------------------------------------------------- | --------------------------- |
| `DATABASE_URL` | SQLite URL or filesystem path. Examples: `sqlite:///./forms.db`, `./data/forms.db`. | `sqlite:///./data/forms.db` |
| `POSTGRES_URL` | PostgreSQL connection URI, e.g. `postgresql://user:pass@host:5432/dbname`. Requires `psycopg2-binary` installed. | _(none)_ |
When `POSTGRES_URL` is set and `psycopg2-binary` is available, the app prefers PostgreSQL. Otherwise it falls back to SQLite. A custom `DATABASE_URL` always wins over `POSTGRES_URL` to simplify local development.
### Email delivery
| Variable | Description | Default |
| ----------------- | ----------------------------------------------------------------------- | -------- |
| `SMTP_HOST` | SMTP server hostname or IP. Leave empty to disable email notifications. | _(none)_ |
| `SMTP_PORT` | SMTP server port. | `587` |
| `SMTP_USERNAME` | Username for SMTP authentication. | _(none)_ |
| `SMTP_PASSWORD` | Password or token for SMTP authentication. | _(none)_ |
| `SMTP_SENDER` | Sender email address; defaults to `SMTP_USERNAME` when unset. | _(none)_ |
| `SMTP_RECIPIENTS` | Comma-separated recipient list for notifications. | _(none)_ |
| `SMTP_USE_TLS` | Enables STARTTLS when `true`. | `true` |
### Rate limiting and caching
| Variable | Description | Default |
| ------------------- | ------------------------------------------------------------------------------------------ | -------- |
| `RATE_LIMIT_MAX` | Maximum submissions allowed per window from a single IP. Set to `0` to disable throttling. | `10` |
| `RATE_LIMIT_WINDOW` | Sliding window size (seconds) for rate limiting. | `60` |
| `REDIS_URL` | Redis connection string for distributed rate limiting; when empty, use in-memory storage. | _(none)_ |
### Request hardening
| Variable | Description | Default |
| --------------------- | ------------------------------------------------------------------------------- | -------- |
| `STRICT_ORIGIN_CHECK` | Enforce `Origin`/`Referer` validation when `true`. | `false` |
| `ALLOWED_ORIGIN` | Expected site origin (e.g. `https://example.com`) used by strict origin checks. | _(none)_ |
### Observability
| Variable | Description | Default |
| --------------------------- | --------------------------------------------------- | -------- |
| `SENTRY_DSN` | Sentry project DSN for error reporting. | _(none)_ |
| `SENTRY_TRACES_SAMPLE_RATE` | Sampling rate for Sentry performance tracing (0-1). | `0.0` |
### Docker / Gunicorn runtime
| Variable | Description | Default |
| ------------------ | ----------------------------------------------------------------------------------------------- | -------- |
| `GUNICORN_WORKERS` | Overrides auto-calculated worker count; leave blank to use `(2 × CPU) + 1` capped at 8 workers. | _(auto)_ |
| `GUNICORN_TIMEOUT` | Worker timeout in seconds used by Gunicorn. | `30` |
## Health Checks and Monitoring
- `/health` and Docker health checks verify that the API can reach the configured database.
- `/metrics` surfaces request counters, latency histograms, rate limit metrics, and SMTP success/error counts.
- Enable Sentry by setting `SENTRY_DSN` to forward exceptions and performance traces.
## Testing
```pwsh
pytest -q tests
```
SMTP integration tests are skipped unless `RUN_SMTP_INTEGRATION_TEST=1` and valid SMTP credentials are set. Run `pytest -q tests/test_integration_smtp.py` to target them explicitly.
## Deployment Notes
- A single GitHub Actions workflow (`ci.yml`) runs pytest on every push/pull request, uploads the test directory as an artifact, and optionally builds the Docker image.
- On pushes to `main` (or manual dispatch) the workflow builds the container and, when registry credentials are available, pushes tags to `git.allucanget.biz`.
- For production use, deploy the container behind a load balancer or reverse proxy and supply the appropriate environment variables.

4
__init__.py Normal file
View File

@@ -0,0 +1,4 @@
"""Server package initializer: expose the Flask app instance for convenience."""
from server.app import app, init_db
__all__ = ["app", "init_db"]

8
app.py Normal file
View File

@@ -0,0 +1,8 @@
"""Entrypoint shim for running the Flask application."""
from __future__ import annotations
from server.app import app
if __name__ == "__main__":
app.run(debug=True, port=5002)

38
docker-compose.redis.yml Normal file
View File

@@ -0,0 +1,38 @@
version: "3.8"
services:
redis:
image: redis:7-alpine
restart: unless-stopped
volumes:
- redis-data:/data
server:
build:
context: .
dockerfile: Dockerfile
ports:
- "5002:5002"
environment:
- SMTP_HOST=${SMTP_HOST}
- SMTP_PORT=${SMTP_PORT}
- SMTP_USERNAME=${SMTP_USERNAME}
- SMTP_PASSWORD=${SMTP_PASSWORD}
- SMTP_SENDER=${SMTP_SENDER}
- SMTP_RECIPIENTS=${SMTP_RECIPIENTS}
- SMTP_USE_TLS=${SMTP_USE_TLS}
- REDIS_URL=redis://redis:6379/0
- RATE_LIMIT_MAX=${RATE_LIMIT_MAX}
- RATE_LIMIT_WINDOW=${RATE_LIMIT_WINDOW}
depends_on:
- redis
volumes:
- ./data:/app/data
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:5002/health || exit 1"]
interval: 30s
timeout: 5s
retries: 3
volumes:
redis-data:
driver: local

34
docker-compose.yml Normal file
View File

@@ -0,0 +1,34 @@
services:
server:
build:
context: .
dockerfile: Dockerfile
ports:
- "5002:5002"
environment:
- FLASK_SECRET_KEY=${FLASK_SECRET_KEY}
- SMTP_HOST=${SMTP_HOST}
- SMTP_PORT=${SMTP_PORT}
- SMTP_USERNAME=${SMTP_USERNAME}
- SMTP_PASSWORD=${SMTP_PASSWORD}
- SMTP_SENDER=${SMTP_SENDER}
- SMTP_RECIPIENTS=${SMTP_RECIPIENTS}
- SMTP_USE_TLS=${SMTP_USE_TLS}
- RATE_LIMIT_MAX=${RATE_LIMIT_MAX}
- RATE_LIMIT_WINDOW=${RATE_LIMIT_WINDOW}
- REDIS_URL=${REDIS_URL}
- SENTRY_DSN=${SENTRY_DSN}
- SENTRY_TRACES_SAMPLE_RATE=${SENTRY_TRACES_SAMPLE_RATE}
- GUNICORN_WORKERS=${GUNICORN_WORKERS}
- GUNICORN_TIMEOUT=${GUNICORN_TIMEOUT}
- ENABLE_JSON_LOGS=${ENABLE_JSON_LOGS}
- ENABLE_REQUEST_LOGS=${ENABLE_REQUEST_LOGS}
- STRICT_ORIGIN_CHECK=${STRICT_ORIGIN_CHECK}
- ALLOWED_ORIGIN=${ALLOWED_ORIGIN}
volumes:
- ./data:/app/data
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:5002/health || exit 1"]
interval: 30s
timeout: 5s
retries: 3

28
entrypoint.sh Normal file
View File

@@ -0,0 +1,28 @@
#!/bin/sh
set -e
# If GUNICORN_WORKERS is unset or set to the default placeholder, auto-calc based on CPU
if [ -z "${GUNICORN_WORKERS}" ] || [ "${GUNICORN_WORKERS}" = "2" ]; then
# Default formula: (2 x $CPU) + 1
CPU=$(getconf _NPROCESSORS_ONLN 2>/dev/null || echo 1)
GUNICORN_WORKERS=$((CPU * 2 + 1))
fi
if [ ${GUNICORN_WORKERS} -gt 8 ]; then
GUNICORN_WORKERS=8
fi
: ${GUNICORN_TIMEOUT:=30}
echo "Starting gunicorn with ${GUNICORN_WORKERS} workers and timeout ${GUNICORN_TIMEOUT}s"
# Determine WSGI module: prefer server.app if present, otherwise fall back to app
MODULE="app"
if [ -f "/app/server/app.py" ]; then
MODULE="server.app"
elif [ -f "/app/app.py" ]; then
MODULE="app"
fi
echo "Using WSGI module: ${MODULE}"
exec gunicorn ${MODULE}:app -b 0.0.0.0:5002 -w ${GUNICORN_WORKERS} --timeout ${GUNICORN_TIMEOUT} --log-level info

34
pytest.ini Normal file
View File

@@ -0,0 +1,34 @@
[pytest]
addopts = -ra
testpaths =
tests
markers =
integration: marks tests that require external services such as SMTP
[coverage:run]
branch = True
source =
contact.allucanget.biz
except =
*/tests/*
*/migrations/*
*/.venv/*
*/.git/*
*/.github/*
*/.vscode/*
*/.idea/*
*/.cache/*
*/.pytest_cache/*
[coverage:report]
show_missing = True
skip_covered = True
exclude_lines =
pragma: no cover
if __name__ == .__main__.
raise NotImplementedError
pass
except ImportError:
except Exception as e:
# pragma: no cover

11
requirements.txt Normal file
View File

@@ -0,0 +1,11 @@
Flask>=2.2
python-dotenv>=1.0
pytest>=7.0
pytest-flask>=1.2
gunicorn>=20.0
redis>=5.0
prometheus_client>=0.16
python-json-logger>=2.0
sentry-sdk>=1.8
psycopg2-binary>=2.9
pytest-cov>=4.0

4
server/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
"""Server application package."""
from .factory import create_app
__all__ = ["create_app"]

35
server/app.py Normal file
View File

@@ -0,0 +1,35 @@
"""Compatibility layer exposing the Flask app instance."""
from __future__ import annotations
from pathlib import Path
from .database import (
DB_PATH as _DB_PATH,
DEFAULT_DB_PATH,
db_cursor,
init_db as _init_db,
is_postgres_enabled,
set_db_path,
set_postgres_override,
)
from .factory import create_app
app = create_app()
DB_PATH: Path = _DB_PATH
def init_db() -> None:
"""Initialise the database using the current DB_PATH."""
set_db_path(DB_PATH)
_init_db()
__all__ = [
"app",
"DB_PATH",
"DEFAULT_DB_PATH",
"db_cursor",
"init_db",
"is_postgres_enabled",
"set_postgres_override",
]

16
server/auth.py Normal file
View File

@@ -0,0 +1,16 @@
"""Authentication utilities."""
from __future__ import annotations
from functools import wraps
from flask import redirect, session, url_for
def login_required(f):
"""Decorator to require login for routes."""
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("auth.login"))
return f(*args, **kwargs)
return decorated_function

691
server/database.py Normal file
View File

@@ -0,0 +1,691 @@
"""Database helpers supporting SQLite and optional Postgres."""
from __future__ import annotations
import logging
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator, Tuple
from . import settings
try: # psycopg2 is optional
import psycopg2
except Exception: # pragma: no cover
psycopg2 = None # type: ignore
if TYPE_CHECKING: # pragma: no cover
from .services.contact import ContactSubmission
DB_PATH = Path(settings.SQLITE_DB_PATH)
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
DEFAULT_DB_PATH = DB_PATH
_USE_POSTGRES_OVERRIDE: bool | None = None
# Keep legacy-style flag available for external access.
USE_POSTGRES = False
def set_db_path(new_path: Path | str) -> None:
"""Update the SQLite database path (used primarily in tests)."""
global DB_PATH
DB_PATH = Path(new_path)
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
def set_postgres_override(value: bool | None) -> None:
"""Allow callers to force-enable or disable Postgres usage."""
global _USE_POSTGRES_OVERRIDE
_USE_POSTGRES_OVERRIDE = value
def is_postgres_enabled() -> bool:
"""Return True when Postgres should be used for database operations."""
if _USE_POSTGRES_OVERRIDE is not None:
use_pg = _USE_POSTGRES_OVERRIDE
elif psycopg2 is None or not settings.POSTGRES_URL:
use_pg = False
else:
use_pg = DB_PATH == DEFAULT_DB_PATH
globals()["USE_POSTGRES"] = use_pg
return use_pg
@contextmanager
def db_cursor(*, read_only: bool = False) -> Iterator[Tuple[Any, Any]]:
"""Yield a database cursor for either SQLite or Postgres."""
use_pg = is_postgres_enabled()
if use_pg:
if psycopg2 is None:
raise RuntimeError(
"Postgres requested but psycopg2 is unavailable")
conn = psycopg2.connect(settings.POSTGRES_URL)
else:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
try:
cur = conn.cursor()
try:
yield conn, cur
if use_pg:
if read_only:
conn.rollback()
else:
conn.commit()
elif not read_only:
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
raise
finally:
try:
cur.close()
except Exception:
pass
finally:
conn.close()
def init_db() -> None:
"""Create the required tables if they do not exist."""
if is_postgres_enabled():
with db_cursor() as (_, cur):
cur.execute(
"""
CREATE TABLE IF NOT EXISTS contact (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
company TEXT,
message TEXT NOT NULL,
timeline TEXT,
created_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS subscribers (
email TEXT PRIMARY KEY,
subscribed_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS newsletters (
id SERIAL PRIMARY KEY,
subject TEXT NOT NULL,
content TEXT NOT NULL,
sender_name TEXT,
send_date TEXT,
status TEXT NOT NULL DEFAULT 'draft',
created_at TEXT NOT NULL,
sent_at TEXT
)
"""
)
else:
with db_cursor() as (_, cur):
cur.execute(
"""
CREATE TABLE IF NOT EXISTS contact (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
company TEXT,
message TEXT NOT NULL,
timeline TEXT,
created_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS subscribers (
email TEXT PRIMARY KEY,
subscribed_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS newsletters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject TEXT NOT NULL,
content TEXT NOT NULL,
sender_name TEXT,
send_date TEXT,
status TEXT NOT NULL DEFAULT 'draft',
created_at TEXT NOT NULL,
sent_at TEXT
)
"""
)
def save_contact(submission: "ContactSubmission") -> int:
"""Persist a contact submission and return its identifier."""
record_id = 0
use_pg = is_postgres_enabled()
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"INSERT INTO contact (name, email, company, message, timeline, created_at) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id",
(
submission.name,
submission.email,
submission.company,
submission.message,
submission.timeline,
submission.created_at,
),
)
row = cur.fetchone()
if row:
record_id = int(row[0])
else:
cur.execute(
"INSERT INTO contact (name, email, company, message, timeline, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(
submission.name,
submission.email,
submission.company,
submission.message,
submission.timeline,
submission.created_at,
),
)
record_id = int(cur.lastrowid or 0)
return record_id
def save_subscriber(email: str, *, created_at: str) -> bool:
"""Persist a newsletter subscriber. Returns False on duplicate entries."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"INSERT INTO subscribers (email, subscribed_at) VALUES (%s, %s)",
(email, created_at),
)
else:
cur.execute(
"INSERT INTO subscribers (email, subscribed_at) VALUES (?, ?)",
(email, created_at),
)
return True
except sqlite3.IntegrityError:
return False
except Exception as exc:
if use_pg and psycopg2 is not None and isinstance(exc, psycopg2.IntegrityError):
return False
raise
def delete_subscriber(email: str) -> bool:
"""Remove a newsletter subscriber. Returns True if deleted, False if not found."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"DELETE FROM subscribers WHERE email = %s", (email,))
else:
cur.execute(
"DELETE FROM subscribers WHERE email = ?", (email,))
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to delete subscriber: %s", exc)
raise
def update_subscriber(old_email: str, new_email: str) -> bool:
"""Update a subscriber's email. Returns True if updated, False if old_email not found or new_email exists."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
# Check if old_email exists and new_email doesn't
if use_pg:
cur.execute(
"SELECT 1 FROM subscribers WHERE email = %s", (old_email,))
if not cur.fetchone():
return False
cur.execute(
"SELECT 1 FROM subscribers WHERE email = %s", (new_email,))
if cur.fetchone():
return False
cur.execute(
"UPDATE subscribers SET email = %s WHERE email = %s", (new_email, old_email))
else:
cur.execute(
"SELECT 1 FROM subscribers WHERE email = ?", (old_email,))
if not cur.fetchone():
return False
cur.execute(
"SELECT 1 FROM subscribers WHERE email = ?", (new_email,))
if cur.fetchone():
return False
cur.execute(
"UPDATE subscribers SET email = ? WHERE email = ?", (new_email, old_email))
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to update subscriber: %s", exc)
raise
def get_contacts(
page: int = 1,
per_page: int = 50,
sort_by: str = "created_at",
sort_order: str = "desc",
email_filter: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
) -> Tuple[list[dict], int]:
"""Retrieve contact submissions with pagination, filtering, and sorting."""
use_pg = is_postgres_enabled()
offset = (page - 1) * per_page
# Build WHERE clause
where_conditions = []
params = []
if email_filter:
where_conditions.append("email LIKE ?")
params.append(f"%{email_filter}%")
if date_from:
where_conditions.append("created_at >= ?")
params.append(date_from)
if date_to:
where_conditions.append("created_at <= ?")
params.append(date_to)
where_clause = "WHERE " + \
" AND ".join(where_conditions) if where_conditions else ""
# Build ORDER BY clause
valid_sort_fields = {"id", "name", "email", "created_at"}
if sort_by not in valid_sort_fields:
sort_by = "created_at"
sort_order = "DESC" if sort_order.lower() == "desc" else "ASC"
order_clause = f"ORDER BY {sort_by} {sort_order}"
# Get total count
count_query = f"SELECT COUNT(*) FROM contact {where_clause}"
with db_cursor(read_only=True) as (_, cur):
if use_pg:
# Convert ? to %s for PostgreSQL
count_query = count_query.replace("?", "%s")
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Get paginated results
select_query = f"""
SELECT id, name, email, company, message, timeline, created_at
FROM contact {where_clause} {order_clause}
LIMIT ? OFFSET ?
"""
params.extend([per_page, offset])
contacts = []
with db_cursor(read_only=True) as (_, cur):
if use_pg:
# Convert ? to %s for PostgreSQL and handle LIMIT/OFFSET
select_query = select_query.replace("?", "%s")
select_query = select_query.replace(
"LIMIT %s OFFSET %s", "LIMIT %s OFFSET %s")
cur.execute(select_query, params)
if use_pg:
rows = cur.fetchall()
else:
rows = cur.fetchall()
for row in rows:
contacts.append({
"id": row[0],
"name": row[1],
"email": row[2],
"company": row[3],
"message": row[4],
"timeline": row[5],
"created_at": row[6],
})
return contacts, total
def get_subscribers(
page: int = 1,
per_page: int = 50,
sort_by: str = "subscribed_at",
sort_order: str = "desc",
email_filter: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
) -> Tuple[list[dict], int]:
"""Retrieve newsletter subscribers with pagination, filtering, and sorting."""
use_pg = is_postgres_enabled()
offset = (page - 1) * per_page
# Build WHERE clause
where_conditions = []
params = []
if email_filter:
where_conditions.append("email LIKE ?")
params.append(f"%{email_filter}%")
if date_from:
where_conditions.append("subscribed_at >= ?")
params.append(date_from)
if date_to:
where_conditions.append("subscribed_at <= ?")
params.append(date_to)
where_clause = "WHERE " + \
" AND ".join(where_conditions) if where_conditions else ""
# Build ORDER BY clause
valid_sort_fields = {"email", "subscribed_at"}
if sort_by not in valid_sort_fields:
sort_by = "subscribed_at"
sort_order = "DESC" if sort_order.lower() == "desc" else "ASC"
order_clause = f"ORDER BY {sort_by} {sort_order}"
# Get total count
count_query = f"SELECT COUNT(*) FROM subscribers {where_clause}"
with db_cursor(read_only=True) as (_, cur):
if use_pg:
# Convert ? to %s for PostgreSQL
count_query = count_query.replace("?", "%s")
cur.execute(count_query, params)
total = cur.fetchone()[0]
# Get paginated results
select_query = f"""
SELECT email, subscribed_at
FROM subscribers {where_clause} {order_clause}
LIMIT ? OFFSET ?
"""
params.extend([per_page, offset])
subscribers = []
with db_cursor(read_only=True) as (_, cur):
if use_pg:
# Convert ? to %s for PostgreSQL and handle LIMIT/OFFSET
select_query = select_query.replace("?", "%s")
select_query = select_query.replace(
"LIMIT %s OFFSET %s", "LIMIT %s OFFSET %s")
cur.execute(select_query, params)
rows = cur.fetchall()
for row in rows:
subscribers.append({
"email": row[0],
"subscribed_at": row[1],
})
return subscribers, total
def delete_contact(contact_id: int) -> bool:
"""Delete a contact submission by ID. Returns True if deleted."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute("DELETE FROM contact WHERE id = %s", (contact_id,))
else:
cur.execute("DELETE FROM contact WHERE id = ?", (contact_id,))
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to delete contact: %s", exc)
raise
def get_app_settings() -> dict[str, str]:
"""Retrieve all application settings as a dictionary."""
settings_dict = {}
with db_cursor(read_only=True) as (_, cur):
cur.execute("SELECT key, value FROM app_settings ORDER BY key")
rows = cur.fetchall()
for row in rows:
settings_dict[row[0]] = row[1]
return settings_dict
def update_app_setting(key: str, value: str) -> bool:
"""Update or insert an application setting. Returns True on success."""
from datetime import datetime, timezone
updated_at = datetime.now(timezone.utc).isoformat()
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"""
INSERT INTO app_settings (key, value, updated_at)
VALUES (%s, %s, %s)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at
""",
(key, value, updated_at),
)
else:
cur.execute(
"""
INSERT OR REPLACE INTO app_settings (key, value, updated_at)
VALUES (?, ?, ?)
""",
(key, value, updated_at),
)
return True
except Exception as exc:
logging.exception("Failed to update app setting: %s", exc)
raise
def delete_app_setting(key: str) -> bool:
"""Delete an application setting. Returns True if deleted."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute("DELETE FROM app_settings WHERE key = %s", (key,))
else:
cur.execute("DELETE FROM app_settings WHERE key = ?", (key,))
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to delete app setting: %s", exc)
raise
def save_newsletter(subject: str, content: str, sender_name: str | None = None, send_date: str | None = None, status: str = "draft") -> int:
"""Save a newsletter and return its ID."""
from datetime import datetime, timezone
created_at = datetime.now(timezone.utc).isoformat()
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if use_pg:
cur.execute(
"""
INSERT INTO newsletters (subject, content, sender_name, send_date, status, created_at)
VALUES (%s, %s, %s, %s, %s, %s) RETURNING id
""",
(subject, content, sender_name, send_date, status, created_at),
)
newsletter_id = cur.fetchone()[0]
else:
cur.execute(
"""
INSERT INTO newsletters (subject, content, sender_name, send_date, status, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(subject, content, sender_name, send_date, status, created_at),
)
newsletter_id = cur.lastrowid
return newsletter_id
except Exception as exc:
logging.exception("Failed to save newsletter: %s", exc)
raise
def get_newsletters(page: int = 1, per_page: int = 20, status_filter: str | None = None) -> tuple[list[dict], int]:
"""Get newsletters with pagination and optional status filtering."""
use_pg = is_postgres_enabled()
newsletters = []
total = 0
offset = (page - 1) * per_page
try:
with db_cursor(read_only=True) as (_, cur):
# Get total count
count_query = "SELECT COUNT(*) FROM newsletters"
count_params = []
if status_filter:
count_query += " WHERE status = %s" if use_pg else " WHERE status = ?"
count_params.append(status_filter)
cur.execute(count_query, count_params)
total = cur.fetchone()[0]
# Get newsletters
select_query = """
SELECT id, subject, sender_name, send_date, status, created_at, sent_at
FROM newsletters
"""
params = []
if status_filter:
select_query += " WHERE status = %s" if use_pg else " WHERE status = ?"
params.append(status_filter)
select_query += " ORDER BY created_at DESC"
select_query += " LIMIT %s OFFSET %s" if use_pg else " LIMIT ? OFFSET ?"
params.extend([per_page, offset])
cur.execute(select_query, params)
rows = cur.fetchall()
for row in rows:
newsletters.append({
"id": row[0],
"subject": row[1],
"sender_name": row[2],
"send_date": row[3],
"status": row[4],
"created_at": row[5],
"sent_at": row[6],
})
except Exception as exc:
logging.exception("Failed to get newsletters: %s", exc)
raise
return newsletters, total
def update_newsletter_status(newsletter_id: int, status: str, sent_at: str | None = None) -> bool:
"""Update newsletter status and optionally sent_at timestamp."""
use_pg = is_postgres_enabled()
try:
with db_cursor() as (_, cur):
if sent_at:
if use_pg:
cur.execute(
"UPDATE newsletters SET status = %s, sent_at = %s WHERE id = %s",
(status, sent_at, newsletter_id),
)
else:
cur.execute(
"UPDATE newsletters SET status = ?, sent_at = ? WHERE id = ?",
(status, sent_at, newsletter_id),
)
else:
if use_pg:
cur.execute(
"UPDATE newsletters SET status = %s WHERE id = %s",
(status, newsletter_id),
)
else:
cur.execute(
"UPDATE newsletters SET status = ? WHERE id = ?",
(status, newsletter_id),
)
return cur.rowcount > 0
except Exception as exc:
logging.exception("Failed to update newsletter status: %s", exc)
raise
def get_newsletter_by_id(newsletter_id: int) -> dict | None:
"""Get a specific newsletter by ID."""
use_pg = is_postgres_enabled()
try:
with db_cursor(read_only=True) as (_, cur):
if use_pg:
cur.execute(
"SELECT id, subject, content, sender_name, send_date, status, created_at, sent_at FROM newsletters WHERE id = %s",
(newsletter_id,),
)
else:
cur.execute(
"SELECT id, subject, content, sender_name, send_date, status, created_at, sent_at FROM newsletters WHERE id = ?",
(newsletter_id,),
)
row = cur.fetchone()
if row:
return {
"id": row[0],
"subject": row[1],
"content": row[2],
"sender_name": row[3],
"send_date": row[4],
"status": row[5],
"created_at": row[6],
"sent_at": row[7],
}
except Exception as exc:
logging.exception("Failed to get newsletter by ID: %s", exc)
raise
return None

56
server/factory.py Normal file
View File

@@ -0,0 +1,56 @@
"""Application factory for the Flask server."""
from __future__ import annotations
import logging
from flask import Flask
from . import logging_config, middleware, routes, settings
from .database import init_db, is_postgres_enabled
def _configure_sentry() -> None:
if not settings.SENTRY_DSN:
return
try:
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
sentry_sdk.init(
dsn=settings.SENTRY_DSN,
integrations=[FlaskIntegration()],
traces_sample_rate=settings.SENTRY_TRACES_SAMPLE_RATE,
)
logging.info("Sentry initialized")
except Exception:
logging.exception("Failed to initialize Sentry SDK")
def create_app() -> Flask:
"""Create and configure the Flask application instance."""
logging_config.configure_logging()
if settings.POSTGRES_URL:
try:
import psycopg2 # type: ignore # noqa: F401
except Exception:
logging.warning(
"POSTGRES_URL is set but psycopg2 is not installed; falling back to SQLite"
)
app = Flask(__name__)
app.config.from_mapping(SECRET_KEY=settings.SECRET_KEY)
app.template_folder = str(settings.BASE_DIR / "templates")
middleware.register_request_hooks(app)
routes.register_blueprints(app)
try:
init_db()
except Exception:
logging.exception("Failed to initialize DB at import time")
is_postgres_enabled()
_configure_sentry()
return app

65
server/logging_config.py Normal file
View File

@@ -0,0 +1,65 @@
"""Central logging configuration utilities."""
from __future__ import annotations
import importlib
import logging
from . import settings
JsonFormatter = None
try:
_json_module = importlib.import_module("pythonjsonlogger.json")
JsonFormatter = getattr(_json_module, "JsonFormatter", None)
except Exception:
try:
_json_module = importlib.import_module("pythonjsonlogger.jsonlogger")
JsonFormatter = getattr(_json_module, "JsonFormatter", None)
except Exception:
JsonFormatter = None
class RequestContextFilter(logging.Filter):
"""Inject request metadata into log records when a request context exists."""
def filter(self, record: logging.LogRecord) -> bool:
try:
from flask import has_request_context, request
if has_request_context():
rid = getattr(request, "request_id", None) or request.environ.get("HTTP_X_REQUEST_ID")
record.request_id = rid
record.remote_addr = request.remote_addr
record.path = request.path
record.method = request.method
else:
record.request_id = None
record.remote_addr = None
record.path = None
record.method = None
except Exception:
record.request_id = None
record.remote_addr = None
record.path = None
record.method = None
return True
def configure_logging() -> None:
"""Configure root logging handlers and optional JSON formatting."""
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s in %(module)s: %(message)s",
)
if settings.ENABLE_JSON_LOGS and JsonFormatter is not None:
try:
handler = logging.getLogger().handlers[0]
handler.setFormatter(JsonFormatter("%(asctime)s %(levelname)s %(name)s %(message)s"))
except Exception:
logging.exception("Failed to initialize JSON log formatter")
try:
for handler in logging.getLogger().handlers:
handler.addFilter(RequestContextFilter())
except Exception:
pass

86
server/metrics.py Normal file
View File

@@ -0,0 +1,86 @@
"""Metrics registry and helpers for Prometheus and JSON fallbacks."""
from __future__ import annotations
import logging
import time
from typing import Any, Dict, Tuple
try:
from prometheus_client import CollectorRegistry, Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
except Exception:
CollectorRegistry = None # type: ignore
Counter = None # type: ignore
Histogram = None # type: ignore
generate_latest = None # type: ignore
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
_start_time = time.time()
_total_submissions = 0
_prom_registry = CollectorRegistry() if CollectorRegistry is not None else None
_prom_total_submissions = (
Counter("contact_total_submissions", "Total contact submissions", registry=_prom_registry)
if Counter is not None
else None
)
_prom_request_counter = None
_prom_request_latency = None
if Counter is not None and _prom_registry is not None:
try:
_prom_request_counter = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint"],
registry=_prom_registry,
)
except Exception:
_prom_request_counter = None
if Histogram is not None and _prom_registry is not None:
try:
_prom_request_latency = Histogram(
"http_request_duration_seconds",
"Request duration",
["method", "endpoint"],
registry=_prom_registry,
)
except Exception:
_prom_request_latency = None
def record_submission() -> None:
"""Register a completed contact submission."""
global _total_submissions
_total_submissions += 1
if _prom_total_submissions is not None:
try:
_prom_total_submissions.inc()
except Exception:
logging.debug("Failed to increment Prometheus submission counter", exc_info=True)
def observe_request(method: str, endpoint: str, start_time: float | None, status: int | None = None) -> None:
"""Update request counters and latency histograms."""
if _prom_request_counter is not None:
try:
_prom_request_counter.labels(method=method, endpoint=endpoint).inc()
except Exception:
logging.debug("Failed to increment request counter", exc_info=True)
if _prom_request_latency is not None and start_time:
try:
_prom_request_latency.labels(method=method, endpoint=endpoint).observe(time.time() - start_time)
except Exception:
logging.debug("Failed to observe request latency", exc_info=True)
def export_metrics() -> Tuple[Any, int, Dict[str, str]]:
"""Return a Flask-style response tuple for the metrics endpoint."""
uptime = int(time.time() - _start_time)
if generate_latest is not None and _prom_registry is not None:
payload = generate_latest(_prom_registry)
headers = {"Content-Type": CONTENT_TYPE_LATEST}
return payload, 200, headers
body = {"uptime_seconds": uptime, "total_submissions": _total_submissions}
return body, 200, {"Content-Type": "application/json"}

68
server/middleware.py Normal file
View File

@@ -0,0 +1,68 @@
"""HTTP middleware helpers (Flask request hooks)."""
from __future__ import annotations
import logging
import time
from flask import Flask, g, request
from . import settings
from .metrics import observe_request
from .utils import generate_request_id
def register_request_hooks(app: Flask) -> None:
"""Attach before/after request handlers for logging and correlation."""
@app.before_request
def attach_request_id_and_log(): # type: ignore[unused-ignore]
rid = request.headers.get("X-Request-Id")
if not rid:
rid = generate_request_id()
request.environ["HTTP_X_REQUEST_ID"] = rid
request.request_id = rid # type: ignore[attr-defined]
if settings.ENABLE_REQUEST_LOGS:
try:
logging.info(
"request.start",
extra={
"request_id": rid,
"method": request.method,
"path": request.path,
"remote_addr": request.remote_addr,
},
)
except Exception:
pass
try:
g._start_time = time.time()
except Exception:
g._start_time = None # type: ignore[attr-defined]
@app.after_request
def add_request_id_header(response): # type: ignore[unused-ignore]
try:
rid = getattr(request, "request_id", None) or request.environ.get("HTTP_X_REQUEST_ID")
if rid:
response.headers["X-Request-Id"] = rid
if settings.ENABLE_REQUEST_LOGS:
try:
logging.info(
"request.end",
extra={
"request_id": rid,
"status": response.status_code,
"path": request.path,
},
)
except Exception:
pass
start_time = getattr(g, "_start_time", None)
observe_request(request.method, request.path, start_time, response.status_code)
except Exception:
pass
return response

75
server/rate_limit.py Normal file
View File

@@ -0,0 +1,75 @@
"""Rate limiting helpers with optional Redis support."""
from __future__ import annotations
import logging
import os
import time
from collections import defaultdict, deque
from typing import DefaultDict, Deque
from . import settings
try:
import redis
except Exception: # redis is optional
redis = None # type: ignore
_rate_tracker: DefaultDict[str, Deque[float]] = defaultdict(deque)
def allow_request(client_ip: str) -> bool:
"""Return True when the client is allowed to make a request."""
if settings.RATE_LIMIT_MAX <= 0:
return True
if settings.REDIS_URL and redis is not None:
try:
client = redis.from_url(settings.REDIS_URL, decode_responses=True)
key = f"rl:{client_ip}"
lua = (
"local key=KEYS[1]\n"
"local now=tonumber(ARGV[1])\n"
"local window=tonumber(ARGV[2])\n"
"local limit=tonumber(ARGV[3])\n"
"local member=ARGV[4]\n"
"redis.call('ZADD', key, now, member)\n"
"redis.call('ZREMRANGEBYSCORE', key, 0, now - window)\n"
"local cnt = redis.call('ZCARD', key)\n"
"redis.call('EXPIRE', key, window)\n"
"if cnt > limit then return 0 end\n"
"return cnt\n"
)
now_ts = int(time.time() * 1000)
member = f"{now_ts}-{os.getpid()}-{int(time.time_ns() % 1000000)}"
result = client.eval(
lua,
1,
key,
str(now_ts),
str(settings.RATE_LIMIT_WINDOW * 1000),
str(settings.RATE_LIMIT_MAX),
member,
)
try:
count = int(str(result))
except Exception:
logging.exception("Unexpected Redis eval result: %r", result)
return False
return count != 0
except Exception as exc:
logging.exception("Redis rate limiter error, falling back to memory: %s", exc)
now = time.time()
bucket = _rate_tracker[client_ip]
while bucket and now - bucket[0] > settings.RATE_LIMIT_WINDOW:
bucket.popleft()
if len(bucket) >= settings.RATE_LIMIT_MAX:
return False
bucket.append(now)
if len(bucket) > settings.RATE_LIMIT_MAX * 2:
while len(bucket) > settings.RATE_LIMIT_MAX:
bucket.popleft()
return True

15
server/routes/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
"""Blueprint registration for the server application."""
from __future__ import annotations
from flask import Flask
from . import admin, auth, contact, monitoring, newsletter
def register_blueprints(app: Flask) -> None:
"""Register all HTTP blueprints with the Flask app."""
app.register_blueprint(contact.bp)
app.register_blueprint(newsletter.bp)
app.register_blueprint(monitoring.bp)
app.register_blueprint(auth.bp)
app.register_blueprint(admin.bp)

377
server/routes/admin.py Normal file
View File

@@ -0,0 +1,377 @@
"""Admin routes for application management."""
from __future__ import annotations
from flask import Blueprint, render_template, jsonify, request
import logging
from .. import auth, settings
from ..database import delete_app_setting, get_app_settings, get_subscribers, update_app_setting
bp = Blueprint("admin", __name__, url_prefix="/admin")
@bp.route("/")
@auth.login_required
def dashboard():
"""Display admin dashboard overview."""
return render_template("admin_dashboard.html")
@bp.route("/newsletter")
@auth.login_required
def newsletter_subscribers():
"""Display newsletter subscriber management page."""
return render_template("admin_newsletter.html")
@bp.route("/newsletter/create")
@auth.login_required
def newsletter_create():
"""Display newsletter creation and sending page."""
return render_template("admin_newsletter_create.html")
@bp.route("/settings")
@auth.login_required
def settings_page():
"""Display current application settings."""
# Gather settings to display
app_settings = {
"Database": {
"DATABASE_URL": settings.DATABASE_URL or "sqlite:///./data/forms.db",
"POSTGRES_URL": settings.POSTGRES_URL or "Not configured",
"SQLite Path": str(settings.SQLITE_DB_PATH),
},
"SMTP": {
"Host": settings.SMTP_SETTINGS["host"] or "Not configured",
"Port": settings.SMTP_SETTINGS["port"],
"Username": settings.SMTP_SETTINGS["username"] or "Not configured",
"Sender": settings.SMTP_SETTINGS["sender"] or "Not configured",
"Recipients": ", ".join(settings.SMTP_SETTINGS["recipients"]) if settings.SMTP_SETTINGS["recipients"] else "Not configured",
"Use TLS": settings.SMTP_SETTINGS["use_tls"],
},
"Rate Limiting": {
"Max Requests": settings.RATE_LIMIT_MAX,
"Window (seconds)": settings.RATE_LIMIT_WINDOW,
"Redis URL": settings.REDIS_URL or "Not configured",
},
"Security": {
"Strict Origin Check": settings.STRICT_ORIGIN_CHECK,
"Allowed Origin": settings.ALLOWED_ORIGIN or "Not configured",
},
"Logging": {
"JSON Logs": settings.ENABLE_JSON_LOGS,
"Request Logs": settings.ENABLE_REQUEST_LOGS,
},
"Monitoring": {
"Sentry DSN": settings.SENTRY_DSN or "Not configured",
"Sentry Traces Sample Rate": settings.SENTRY_TRACES_SAMPLE_RATE,
},
"Admin": {
"Username": settings.ADMIN_USERNAME,
},
}
return render_template("admin_settings.html", settings=app_settings)
@bp.route("/submissions")
@auth.login_required
def submissions():
"""Display contact form submissions page."""
return render_template("admin_submissions.html")
@bp.route("/api/settings", methods=["GET"])
@auth.login_required
def get_settings_api():
"""Get all application settings via API."""
try:
settings_data = get_app_settings()
return jsonify({"status": "ok", "settings": settings_data})
except Exception as exc:
logging.exception("Failed to retrieve settings: %s", exc)
return jsonify({"status": "error", "message": "Failed to retrieve settings."}), 500
def validate_setting(key: str, value: str) -> str | None:
"""Validate a setting key-value pair. Returns error message or None if valid."""
# Define validation rules for known settings
validations = {
"maintenance_mode": lambda v: v in ["true", "false"],
"contact_form_enabled": lambda v: v in ["true", "false"],
"newsletter_enabled": lambda v: v in ["true", "false"],
"rate_limit_max": lambda v: v.isdigit() and 0 <= int(v) <= 1000,
"rate_limit_window": lambda v: v.isdigit() and 1 <= int(v) <= 3600,
}
if key in validations and not validations[key](value):
return f"Invalid value for {key}"
# General validation
if len(key) > 100:
return "Setting key too long (max 100 characters)"
if len(value) > 1000:
return "Setting value too long (max 1000 characters)"
return None
@bp.route("/api/settings/<key>", methods=["PUT"])
@auth.login_required
def update_setting_api(key: str):
"""Update a specific application setting via API."""
try:
data = request.get_json(silent=True) or {}
value = data.get("value", "").strip()
if not value:
return jsonify({"status": "error", "message": "Value is required."}), 400
# Validate the setting
validation_error = validate_setting(key, value)
if validation_error:
return jsonify({"status": "error", "message": validation_error}), 400
success = update_app_setting(key, value)
if success:
return jsonify({"status": "ok", "message": f"Setting '{key}' updated successfully."})
else:
return jsonify({"status": "error", "message": "Failed to update setting."}), 500
except Exception as exc:
logging.exception("Failed to update setting: %s", exc)
return jsonify({"status": "error", "message": "Failed to update setting."}), 500
@bp.route("/api/settings/<key>", methods=["DELETE"])
@auth.login_required
def delete_setting_api(key: str):
"""Delete a specific application setting via API."""
try:
deleted = delete_app_setting(key)
if deleted:
return jsonify({"status": "ok", "message": f"Setting '{key}' deleted successfully."})
else:
return jsonify({"status": "error", "message": f"Setting '{key}' not found."}), 404
except Exception as exc:
logging.exception("Failed to delete setting: %s", exc)
return jsonify({"status": "error", "message": "Failed to delete setting."}), 500
@bp.route("/api/newsletter", methods=["GET"])
@auth.login_required
def get_subscribers_api():
"""Retrieve newsletter subscribers with pagination, filtering, and sorting."""
try:
# Parse query parameters
page = int(request.args.get("page", 1))
per_page = min(int(request.args.get("per_page", 50)),
100) # Max 100 per page
sort_by = request.args.get("sort_by", "subscribed_at")
sort_order = request.args.get("sort_order", "desc")
email_filter = request.args.get("email")
# Validate sort_by
valid_sort_fields = ["email", "subscribed_at"]
if sort_by not in valid_sort_fields:
sort_by = "subscribed_at"
# Get subscribers
subscribers, total = get_subscribers(
page=page,
per_page=per_page,
sort_by=sort_by,
sort_order=sort_order,
email_filter=email_filter,
)
return jsonify({
"status": "ok",
"subscribers": subscribers,
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page,
},
})
except Exception as exc:
logging.exception("Failed to retrieve subscribers: %s", exc)
return jsonify({"status": "error", "message": "Failed to retrieve subscribers."}), 500
@bp.route("/api/newsletters", methods=["POST"])
@auth.login_required
def create_newsletter_api():
"""Create a new newsletter."""
try:
data = request.get_json(silent=True) or {}
subject = data.get("subject", "").strip()
content = data.get("content", "").strip()
sender_name = data.get("sender_name", "").strip() or None
send_date = data.get("send_date", "").strip() or None
status = data.get("status", "draft")
if not subject or not content:
return jsonify({"status": "error", "message": "Subject and content are required."}), 400
if status not in ["draft", "scheduled", "sent"]:
return jsonify({"status": "error", "message": "Invalid status."}), 400
from ..database import save_newsletter
newsletter_id = save_newsletter(
subject, content, sender_name, send_date, status)
return jsonify({
"status": "ok",
"message": "Newsletter created successfully.",
"newsletter_id": newsletter_id
}), 201
except Exception as exc:
logging.exception("Failed to create newsletter: %s", exc)
return jsonify({"status": "error", "message": "Failed to create newsletter."}), 500
@bp.route("/api/newsletters", methods=["GET"])
@auth.login_required
def get_newsletters_api():
"""Retrieve newsletters with pagination and filtering."""
try:
page = int(request.args.get("page", 1))
per_page = min(int(request.args.get("per_page", 20)),
50) # Max 50 per page
status_filter = request.args.get("status")
from ..database import get_newsletters
newsletters, total = get_newsletters(
page=page, per_page=per_page, status_filter=status_filter)
return jsonify({
"status": "ok",
"newsletters": newsletters,
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page,
},
})
except Exception as exc:
logging.exception("Failed to retrieve newsletters: %s", exc)
return jsonify({"status": "error", "message": "Failed to retrieve newsletters."}), 500
@bp.route("/api/newsletters/<int:newsletter_id>/send", methods=["POST"])
@auth.login_required
def send_newsletter_api(newsletter_id: int):
"""Send a newsletter to all subscribers."""
try:
from ..database import get_newsletter_by_id, update_newsletter_status, get_subscribers
from ..services.newsletter import send_newsletter_to_subscribers
from datetime import datetime, timezone
# Get the newsletter
newsletter = get_newsletter_by_id(newsletter_id)
if not newsletter:
return jsonify({"status": "error", "message": "Newsletter not found."}), 404
if newsletter["status"] == "sent":
return jsonify({"status": "error", "message": "Newsletter has already been sent."}), 400
# Get all subscribers
subscribers, _ = get_subscribers(
page=1, per_page=10000) # Get all subscribers
if not subscribers:
return jsonify({"status": "error", "message": "No subscribers found."}), 400
# Send the newsletter
success_count = send_newsletter_to_subscribers(
newsletter["subject"],
newsletter["content"],
[sub["email"] for sub in subscribers],
newsletter["sender_name"]
)
# Update newsletter status
sent_at = datetime.now(timezone.utc).isoformat()
update_newsletter_status(newsletter_id, "sent", sent_at)
return jsonify({
"status": "ok",
"message": f"Newsletter sent to {success_count} subscribers.",
"sent_count": success_count
})
except Exception as exc:
logging.exception("Failed to send newsletter: %s", exc)
return jsonify({"status": "error", "message": "Failed to send newsletter."}), 500
@bp.route("/api/contact", methods=["GET"])
@auth.login_required
def get_contact_submissions_api():
"""Retrieve contact form submissions with pagination, filtering, and sorting."""
try:
# Parse query parameters
page = int(request.args.get("page", 1))
per_page = min(int(request.args.get("per_page", 50)),
100) # Max 100 per page
sort_by = request.args.get("sort_by", "created_at")
sort_order = request.args.get("sort_order", "desc")
email_filter = request.args.get("email")
date_from = request.args.get("date_from")
date_to = request.args.get("date_to")
# Validate sort_by
valid_sort_fields = ["id", "name", "email", "created_at"]
if sort_by not in valid_sort_fields:
sort_by = "created_at"
# Get submissions
from ..database import get_contacts
submissions, total = get_contacts(
page=page,
per_page=per_page,
sort_by=sort_by,
sort_order=sort_order,
email_filter=email_filter,
date_from=date_from,
date_to=date_to,
)
return jsonify({
"status": "ok",
"submissions": submissions,
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page,
},
})
except Exception as exc:
logging.exception("Failed to retrieve contact submissions: %s", exc)
return jsonify({"status": "error", "message": "Failed to retrieve contact submissions."}), 500
@bp.route("/api/contact/<int:contact_id>", methods=["DELETE"])
@auth.login_required
def delete_contact_submission_api(contact_id: int):
"""Delete a contact submission by ID."""
try:
from ..database import delete_contact
deleted = delete_contact(contact_id)
if deleted:
return jsonify({"status": "ok", "message": f"Contact submission {contact_id} deleted successfully."})
else:
return jsonify({"status": "error", "message": f"Contact submission {contact_id} not found."}), 404
except Exception as exc:
logging.exception("Failed to delete contact submission: %s", exc)
return jsonify({"status": "error", "message": "Failed to delete contact submission."}), 500

31
server/routes/auth.py Normal file
View File

@@ -0,0 +1,31 @@
"""Authentication routes for admin access."""
from __future__ import annotations
from flask import Blueprint, flash, redirect, render_template, request, session, url_for
from .. import settings
bp = Blueprint("auth", __name__, url_prefix="/auth")
@bp.route("/login", methods=["GET", "POST"])
def login():
"""Handle user login."""
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
session["logged_in"] = True
return redirect("/admin/")
else:
flash("Invalid credentials")
return render_template("login.html")
@bp.route("/logout")
def logout():
"""Handle user logout."""
session.pop("logged_in", None)
return redirect("/auth/login")

134
server/routes/contact.py Normal file
View File

@@ -0,0 +1,134 @@
"""Contact submission routes."""
from __future__ import annotations
import logging
from flask import Blueprint, jsonify, request
from .. import auth, settings
from ..database import delete_contact, get_contacts
from ..rate_limit import allow_request
from ..services.contact import persist_submission, send_notification, validate_submission
bp = Blueprint("contact", __name__, url_prefix="/api")
@bp.route("/contact", methods=["POST"])
def receive_contact():
payload = request.form or request.get_json(silent=True) or {}
if settings.STRICT_ORIGIN_CHECK:
origin = request.headers.get("Origin")
referer = request.headers.get("Referer")
allowed = settings.ALLOWED_ORIGIN
if allowed:
if origin and origin != allowed and not (referer and referer.startswith(allowed)):
logging.warning(
"Origin/Referer mismatch (origin=%s, referer=%s)", origin, referer)
return jsonify({"status": "error", "message": "Invalid request origin."}), 403
else:
logging.warning(
"STRICT_ORIGIN_CHECK enabled but ALLOWED_ORIGIN not set; skipping enforcement")
client_ip_source = request.headers.get(
"X-Forwarded-For", request.remote_addr or "unknown")
client_ip = client_ip_source.split(
",")[0].strip() if client_ip_source else "unknown"
if not allow_request(client_ip):
logging.warning("Rate limit reached for %s", client_ip)
return (
jsonify(
{"status": "error", "message": "Too many submissions, please try later."}),
429,
)
submission, errors = validate_submission(payload)
if errors:
return jsonify({"status": "error", "errors": errors}), 400
assert submission is not None
try:
record_id = persist_submission(submission)
except Exception as exc: # pragma: no cover - logged for diagnostics
logging.exception("Failed to persist submission: %s", exc)
return (
jsonify({"status": "error", "message": "Could not store submission."}),
500,
)
email_sent = send_notification(submission)
status = 201 if email_sent else 202
body = {
"status": "ok",
"id": record_id,
"email": "sent" if email_sent else "pending",
}
if not email_sent:
body["message"] = "Submission stored but email dispatch is not configured."
return jsonify(body), status
@bp.route("/contact", methods=["GET"])
@auth.login_required
def get_submissions():
"""Retrieve contact form submissions with pagination, filtering, and sorting."""
try:
# Parse query parameters
page = int(request.args.get("page", 1))
per_page = min(int(request.args.get("per_page", 50)), 100) # Max 100 per page
sort_by = request.args.get("sort_by", "created_at")
sort_order = request.args.get("sort_order", "desc")
email_filter = request.args.get("email")
date_from = request.args.get("date_from")
date_to = request.args.get("date_to")
# Validate sort_by
valid_sort_fields = ["id", "name", "email", "created_at"]
if sort_by not in valid_sort_fields:
sort_by = "created_at"
# Get submissions
submissions, total = get_contacts(
page=page,
per_page=per_page,
sort_by=sort_by,
sort_order=sort_order,
email_filter=email_filter,
date_from=date_from,
date_to=date_to,
)
return jsonify({
"status": "ok",
"submissions": submissions,
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page,
},
})
except Exception as exc:
logging.exception("Failed to retrieve submissions: %s", exc)
return jsonify({"status": "error", "message": "Failed to retrieve submissions."}), 500
@bp.route("/contact/<int:contact_id>", methods=["DELETE"])
@auth.login_required
def delete_submission(contact_id: int):
"""Delete a contact submission by ID."""
try:
deleted = delete_contact(contact_id)
if not deleted:
return jsonify({"status": "error", "message": "Submission not found."}), 404
return jsonify({"status": "ok", "message": "Submission deleted successfully."})
except Exception as exc:
logging.exception("Failed to delete submission: %s", exc)
return jsonify({"status": "error", "message": "Failed to delete submission."}), 500

View File

@@ -0,0 +1,33 @@
"""Operational monitoring routes."""
from __future__ import annotations
import logging
from flask import Blueprint, jsonify
from ..database import db_cursor
from ..metrics import export_metrics
bp = Blueprint("monitoring", __name__)
@bp.route("/health", methods=["GET"])
def health():
"""Simple health endpoint used by orchestrators and Docker HEALTHCHECK."""
try:
with db_cursor(read_only=True) as (_, cur):
cur.execute("SELECT 1")
cur.fetchone()
except Exception as exc: # pragma: no cover - logged for operators
logging.exception("Health check DB failure: %s", exc)
return jsonify({"status": "unhealthy"}), 500
return jsonify({"status": "ok"}), 200
@bp.route("/metrics", methods=["GET"])
def metrics():
payload, status, headers = export_metrics()
if isinstance(payload, dict):
return jsonify(payload), status
return payload, status, headers

133
server/routes/newsletter.py Normal file
View File

@@ -0,0 +1,133 @@
"""Newsletter subscription routes."""
from __future__ import annotations
import logging
from flask import Blueprint, jsonify, request, render_template
from ..services import newsletter
bp = Blueprint("newsletter", __name__, url_prefix="/api")
@bp.route("/newsletter", methods=["POST"])
def subscribe():
payload = request.form or request.get_json(silent=True) or {}
email = (payload.get("email") or "").strip()
if not newsletter.validate_email(email):
return jsonify({"status": "error", "message": "Valid email is required."}), 400
try:
created = newsletter.subscribe(email)
except Exception as exc: # pragma: no cover - errors are logged
logging.exception("Failed to persist subscriber: %s", exc)
return jsonify({"status": "error", "message": "Could not store subscription."}), 500
if not created:
logging.info("Newsletter subscription ignored (duplicate): %s", email)
return jsonify({"status": "error", "message": "Email is already subscribed."}), 409
logging.info("New newsletter subscription: %s", email)
return jsonify({"status": "ok", "message": "Subscribed successfully."}), 201
@bp.route("/newsletter", methods=["DELETE"])
def unsubscribe():
payload = request.form or request.get_json(silent=True) or {}
email = (payload.get("email") or "").strip()
if not newsletter.validate_email(email):
return jsonify({"status": "error", "message": "Valid email is required."}), 400
try:
deleted = newsletter.unsubscribe(email)
except Exception as exc: # pragma: no cover - errors are logged
logging.exception("Failed to remove subscriber: %s", exc)
return jsonify({"status": "error", "message": "Could not remove subscription."}), 500
if not deleted:
logging.info(
"Newsletter unsubscription ignored (not subscribed): %s", email)
return jsonify({"status": "error", "message": "Email is not subscribed."}), 404
logging.info("Newsletter unsubscription: %s", email)
return jsonify({"status": "ok", "message": "Unsubscribed successfully."}), 200
@bp.route("/newsletter", methods=["PUT"])
def update_subscription():
payload = request.form or request.get_json(silent=True) or {}
old_email = (payload.get("old_email") or "").strip()
new_email = (payload.get("new_email") or "").strip()
if not newsletter.validate_email(old_email) or not newsletter.validate_email(new_email):
return jsonify({"status": "error", "message": "Valid old and new emails are required."}), 400
try:
updated = newsletter.update_email(old_email, new_email)
except Exception as exc: # pragma: no cover - errors are logged
logging.exception("Failed to update subscriber: %s", exc)
return jsonify({"status": "error", "message": "Could not update subscription."}), 500
if not updated:
return jsonify({"status": "error", "message": "Old email not found or new email already exists."}), 404
logging.info("Newsletter subscription updated: %s -> %s",
old_email, new_email)
return jsonify({"status": "ok", "message": "Subscription updated successfully."}), 200
@bp.route("/newsletter/manage", methods=["GET", "POST"])
def manage_subscription():
"""Display newsletter subscription management page."""
message = None
message_type = None
if request.method == "POST":
action = request.form.get("action")
email = (request.form.get("email") or "").strip()
if not newsletter.validate_email(email):
message = "Please enter a valid email address."
message_type = "error"
else:
try:
if action == "subscribe":
created = newsletter.subscribe(email)
if created:
message = "Successfully subscribed to newsletter!"
message_type = "success"
else:
message = "This email is already subscribed."
message_type = "info"
elif action == "unsubscribe":
deleted = newsletter.unsubscribe(email)
if deleted:
message = "Successfully unsubscribed from newsletter."
message_type = "success"
else:
message = "This email is not currently subscribed."
message_type = "info"
elif action == "update":
old_email = (request.form.get("old_email") or "").strip()
if not newsletter.validate_email(old_email):
message = "Please enter a valid current email address."
message_type = "error"
elif old_email == email:
message = "New email must be different from current email."
message_type = "error"
else:
updated = newsletter.update_email(old_email, email)
if updated:
message = "Email address updated successfully!"
message_type = "success"
else:
message = "Current email not found or new email already exists."
message_type = "error"
except Exception as exc:
logging.exception("Failed to manage subscription: %s", exc)
message = "An error occurred. Please try again."
message_type = "error"
return render_template("newsletter_manage.html", message=message, message_type=message_type)

View File

@@ -0,0 +1 @@
"""Service layer namespace."""

112
server/services/contact.py Normal file
View File

@@ -0,0 +1,112 @@
"""Business logic for contact submissions."""
from __future__ import annotations
import logging
import smtplib
from dataclasses import dataclass
from datetime import datetime, timezone
from email.message import EmailMessage
from typing import Any, Dict, Tuple
from .. import settings
from ..database import save_contact
from ..metrics import record_submission
from ..utils import is_valid_email
@dataclass
class ContactSubmission:
name: str
email: str
company: str | None
message: str
timeline: str | None
created_at: str = datetime.now(timezone.utc).isoformat()
def validate_submission(raw: Dict[str, Any]) -> Tuple[ContactSubmission | None, Dict[str, str]]:
"""Validate the incoming payload and return a submission object."""
name = (raw.get("name") or "").strip()
email = (raw.get("email") or "").strip()
message = (raw.get("message") or "").strip()
consent = raw.get("consent")
company = (raw.get("company") or "").strip()
errors: Dict[str, str] = {}
if not name:
errors["name"] = "Name is required."
elif len(name) > 200:
errors["name"] = "Name is too long (max 200 chars)."
if not is_valid_email(email):
errors["email"] = "Valid email is required."
if not message:
errors["message"] = "Message is required."
elif len(message) > 5000:
errors["message"] = "Message is too long (max 5000 chars)."
if not consent:
errors["consent"] = "Consent is required."
if company and len(company) > 200:
errors["company"] = "Organisation name is too long (max 200 chars)."
if errors:
return None, errors
submission = ContactSubmission(
name=name,
email=email,
company=company or None,
message=message,
timeline=(raw.get("timeline") or "").strip() or None,
)
return submission, {}
def persist_submission(submission: ContactSubmission) -> int:
"""Persist the submission and update metrics."""
record_id = save_contact(submission)
record_submission()
return record_id
def send_notification(submission: ContactSubmission) -> bool:
"""Send an email notification for the submission if SMTP is configured."""
if not settings.SMTP_SETTINGS["host"] or not settings.SMTP_SETTINGS["recipients"]:
logging.info("SMTP not configured; skipping email notification")
return False
sender = settings.SMTP_SETTINGS["sender"] or "no-reply@example.com"
recipients = settings.SMTP_SETTINGS["recipients"]
msg = EmailMessage()
msg["Subject"] = f"Neue Kontaktanfrage von {submission.name}"
msg["From"] = sender
msg["To"] = ", ".join(recipients)
msg.set_content(
"\n".join(
[
f"Name: {submission.name}",
f"E-Mail: {submission.email}",
f"Organisation: {submission.company or ''}",
f"Zeithorizont: {submission.timeline or ''}",
"",
"Nachricht:",
submission.message,
"",
f"Eingang: {submission.created_at}",
]
)
)
try:
with smtplib.SMTP(settings.SMTP_SETTINGS["host"], settings.SMTP_SETTINGS["port"], timeout=15) as server:
if settings.SMTP_SETTINGS["use_tls"]:
server.starttls()
if settings.SMTP_SETTINGS["username"]:
server.login(
settings.SMTP_SETTINGS["username"], settings.SMTP_SETTINGS["password"] or "")
server.send_message(msg)
logging.info("Notification email dispatched to %s", recipients)
return True
except Exception as exc: # pragma: no cover - SMTP failures are logged only
logging.error("Failed to send notification email: %s", exc)
return False

View File

@@ -0,0 +1,96 @@
"""Business logic for newsletter subscriptions."""
from __future__ import annotations
from datetime import datetime, timezone
from ..database import save_subscriber, delete_subscriber, update_subscriber
from ..utils import is_valid_email
def validate_email(email: str) -> bool:
"""Return True when the provided email passes a basic sanity check."""
return is_valid_email(email)
def subscribe(email: str) -> bool:
"""Persist the subscription and return False when it already exists."""
created_at = datetime.now(timezone.utc).isoformat()
return save_subscriber(email, created_at=created_at)
def unsubscribe(email: str) -> bool:
"""Remove the subscription and return True if it existed."""
return delete_subscriber(email)
def update_email(old_email: str, new_email: str) -> bool:
"""Update the email for a subscription. Return True if updated."""
return update_subscriber(old_email, new_email)
def send_newsletter_to_subscribers(subject: str, content: str, emails: list[str], sender_name: str | None = None) -> int:
"""Send newsletter to list of email addresses. Returns count of successful sends."""
import logging
from .. import settings
if not settings.SMTP_SETTINGS["host"]:
logging.error("SMTP not configured, cannot send newsletter")
return 0
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Create message
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = settings.SMTP_SETTINGS["sender"] or "noreply@example.com"
# Format content
formatted_content = content.replace('\n', '<br>')
html_content = f"""
<html>
<body>
{formatted_content}
</body>
</html>
"""
# Add HTML content
html_part = MIMEText(html_content, 'html')
msg.attach(html_part)
# Send to each recipient individually for better deliverability
success_count = 0
with smtplib.SMTP(settings.SMTP_SETTINGS["host"], settings.SMTP_SETTINGS["port"]) as server:
if settings.SMTP_SETTINGS["use_tls"]:
server.starttls()
if settings.SMTP_SETTINGS["username"] and settings.SMTP_SETTINGS["password"]:
server.login(
settings.SMTP_SETTINGS["username"], settings.SMTP_SETTINGS["password"])
for email in emails:
try:
# Create a fresh copy for each recipient
recipient_msg = MIMEMultipart('alternative')
recipient_msg['Subject'] = subject
recipient_msg['From'] = msg['From']
recipient_msg['To'] = email
# Add HTML content
recipient_msg.attach(MIMEText(html_content, 'html'))
server.sendmail(msg['From'], email,
recipient_msg.as_string())
success_count += 1
except Exception as exc:
logging.exception(
"Failed to send newsletter to %s: %s", email, exc)
return success_count
except Exception as exc:
logging.exception("Failed to send newsletter: %s", exc)
return 0

65
server/settings.py Normal file
View File

@@ -0,0 +1,65 @@
"""Environment driven configuration values."""
from __future__ import annotations
import os
import re
from pathlib import Path
from dotenv import load_dotenv
from .utils import normalize_recipients
load_dotenv()
BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = os.getenv("FLASK_SECRET_KEY", "dev")
SENTRY_DSN = os.getenv("SENTRY_DSN")
SENTRY_TRACES_SAMPLE_RATE = float(
os.getenv("SENTRY_TRACES_SAMPLE_RATE", "0.0"))
ENABLE_REQUEST_LOGS = os.getenv("ENABLE_REQUEST_LOGS", "true").lower() in {
"1", "true", "yes"}
ENABLE_JSON_LOGS = os.getenv("ENABLE_JSON_LOGS", "false").lower() in {
"1", "true", "yes"}
DATABASE_URL = os.getenv("DATABASE_URL")
POSTGRES_URL = os.getenv("POSTGRES_URL")
def resolve_sqlite_path() -> Path:
"""Resolve the configured SQLite path honoring DATABASE_URL."""
if DATABASE_URL:
if DATABASE_URL.startswith("sqlite:"):
match = re.match(r"sqlite:(?:////?|)(.+)", DATABASE_URL)
if match:
return Path(match.group(1))
return Path("data/forms.db")
return Path(DATABASE_URL)
return BASE_DIR / "data" / "forms.db"
SQLITE_DB_PATH = resolve_sqlite_path()
RATE_LIMIT_MAX = int(os.getenv("RATE_LIMIT_MAX", "10"))
RATE_LIMIT_WINDOW = int(os.getenv("RATE_LIMIT_WINDOW", "60"))
REDIS_URL = os.getenv("REDIS_URL")
STRICT_ORIGIN_CHECK = os.getenv("STRICT_ORIGIN_CHECK", "false").lower() in {
"1", "true", "yes"}
ALLOWED_ORIGIN = os.getenv("ALLOWED_ORIGIN")
SMTP_SETTINGS = {
"host": os.getenv("SMTP_HOST"),
"port": int(os.getenv("SMTP_PORT", "587")),
"username": os.getenv("SMTP_USERNAME"),
"password": os.getenv("SMTP_PASSWORD"),
"sender": os.getenv("SMTP_SENDER"),
"use_tls": os.getenv("SMTP_USE_TLS", "true").lower() in {"1", "true", "yes"},
"recipients": normalize_recipients(os.getenv("SMTP_RECIPIENTS")),
}
if not SMTP_SETTINGS["sender"] and SMTP_SETTINGS["username"]:
SMTP_SETTINGS["sender"] = SMTP_SETTINGS["username"]
ADMIN_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin")

23
server/utils.py Normal file
View File

@@ -0,0 +1,23 @@
"""Common utility helpers for the server package."""
from __future__ import annotations
import uuid
from typing import Iterable, List
def normalize_recipients(value: str | None) -> List[str]:
"""Split a comma separated string of emails into a clean list."""
if not value:
return []
return [item.strip() for item in value.split(",") if item.strip()]
def is_valid_email(value: str) -> bool:
"""Perform a very small sanity check for email addresses."""
value = value.strip()
return bool(value and "@" in value)
def generate_request_id() -> str:
"""Return a UUID4 string for request correlation."""
return str(uuid.uuid4())

View File

@@ -0,0 +1,188 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Admin Dashboard</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #333;
text-align: center;
margin-bottom: 30px;
}
.dashboard-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 20px;
margin-bottom: 40px;
}
.dashboard-card {
border: 1px solid #ddd;
border-radius: 8px;
padding: 20px;
background-color: #f9f9f9;
transition: box-shadow 0.3s ease;
}
.dashboard-card:hover {
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
}
.dashboard-card h2 {
color: #555;
margin-top: 0;
margin-bottom: 15px;
}
.dashboard-card p {
color: #666;
margin-bottom: 15px;
}
.dashboard-card a {
display: inline-block;
background-color: #007bff;
color: white;
padding: 8px 16px;
text-decoration: none;
border-radius: 4px;
transition: background-color 0.3s ease;
}
.dashboard-card a:hover {
background-color: #0056b3;
}
.logout {
text-align: center;
margin-top: 40px;
}
.logout a {
color: #dc3545;
text-decoration: none;
}
.logout a:hover {
text-decoration: underline;
}
.stats {
display: flex;
justify-content: space-around;
margin-bottom: 30px;
flex-wrap: wrap;
}
.stat-card {
background-color: #e9ecef;
padding: 15px;
border-radius: 8px;
text-align: center;
min-width: 150px;
margin: 5px;
}
.stat-card h3 {
margin: 0;
color: #495057;
font-size: 2em;
}
.stat-card p {
margin: 5px 0 0 0;
color: #6c757d;
font-size: 0.9em;
}
</style>
</head>
<body>
<h1>Admin Dashboard</h1>
<div class="stats">
<div class="stat-card">
<h3 id="contact-count">--</h3>
<p>Contact Submissions</p>
</div>
<div class="stat-card">
<h3 id="newsletter-count">--</h3>
<p>Newsletter Subscribers</p>
</div>
<div class="stat-card">
<h3 id="settings-count">--</h3>
<p>App Settings</p>
</div>
</div>
<div class="dashboard-grid">
<div class="dashboard-card">
<h2>Contact Form Submissions</h2>
<p>
View and manage contact form submissions from your website visitors.
</p>
<a href="/admin/submissions">Manage Submissions</a>
</div>
<div class="dashboard-card">
<h2>Newsletter Subscribers</h2>
<p>
Manage newsletter subscriptions and send newsletters to your
subscribers.
</p>
<a href="/admin/newsletter">Manage Subscribers</a>
</div>
<div class="dashboard-card">
<h2>Application Settings</h2>
<p>Configure application settings and environment variables.</p>
<a href="/admin/settings">Manage Settings</a>
</div>
<div class="dashboard-card">
<h2>Create Newsletter</h2>
<p>Create and send newsletters to your subscribers.</p>
<a href="/admin/newsletter/create">Create Newsletter</a>
</div>
</div>
<div class="logout">
<a href="/auth/logout">Logout</a>
</div>
<script>
// Load dashboard statistics
async function loadStats() {
try {
// Load contact submissions count
const contactResponse = await fetch(
"/admin/api/contact?page=1&per_page=1"
);
if (contactResponse.ok) {
const contactData = await contactResponse.json();
document.getElementById("contact-count").textContent =
contactData.pagination.total;
}
// Load newsletter subscribers count
const newsletterResponse = await fetch(
"/admin/api/newsletter?page=1&per_page=1"
);
if (newsletterResponse.ok) {
const newsletterData = await newsletterResponse.json();
document.getElementById("newsletter-count").textContent =
newsletterData.pagination.total;
}
// Load settings count
const settingsResponse = await fetch("/admin/api/settings");
if (settingsResponse.ok) {
const settingsData = await settingsResponse.json();
document.getElementById("settings-count").textContent = Object.keys(
settingsData.settings
).length;
}
} catch (error) {
console.error("Failed to load dashboard stats:", error);
}
}
// Load stats when page loads
loadStats();
</script>
</body>
</html>

View File

@@ -0,0 +1,363 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Newsletter Subscribers</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #333;
margin-bottom: 20px;
}
.nav {
margin-bottom: 20px;
padding: 10px;
background-color: #f8f9fa;
border-radius: 5px;
}
.nav a {
margin-right: 15px;
color: #007bff;
text-decoration: none;
}
.nav a:hover {
text-decoration: underline;
}
.filters {
margin-bottom: 20px;
padding: 15px;
background-color: #f8f9fa;
border-radius: 5px;
display: flex;
gap: 15px;
align-items: center;
flex-wrap: wrap;
}
.filters input,
.filters select {
padding: 8px;
border: 1px solid #ddd;
border-radius: 4px;
min-width: 150px;
}
.filters button {
padding: 8px 16px;
background-color: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.filters button:hover {
background-color: #0056b3;
}
.subscribers-table {
width: 100%;
border-collapse: collapse;
margin-bottom: 20px;
}
.subscribers-table th,
.subscribers-table td {
padding: 12px;
text-align: left;
border-bottom: 1px solid #ddd;
}
.subscribers-table th {
background-color: #f8f9fa;
font-weight: bold;
}
.subscribers-table tr:hover {
background-color: #f5f5f5;
}
.pagination {
display: flex;
justify-content: center;
align-items: center;
gap: 10px;
margin-top: 20px;
}
.pagination button {
padding: 8px 12px;
border: 1px solid #ddd;
background-color: white;
cursor: pointer;
border-radius: 4px;
}
.pagination button:hover {
background-color: #f8f9fa;
}
.pagination button:disabled {
background-color: #e9ecef;
cursor: not-allowed;
}
.pagination .current-page {
font-weight: bold;
color: #007bff;
}
.loading {
text-align: center;
padding: 20px;
color: #666;
}
.message {
padding: 10px;
margin-bottom: 20px;
border-radius: 4px;
}
.message.success {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.message.error {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.actions {
display: flex;
gap: 5px;
}
.btn {
padding: 4px 8px;
border: none;
border-radius: 3px;
cursor: pointer;
font-size: 12px;
}
.btn-danger {
background-color: #dc3545;
color: white;
}
.btn-danger:hover {
background-color: #c82333;
}
</style>
</head>
<body>
<div class="nav">
<a href="/admin/">Dashboard</a>
<a href="/admin/submissions">Contact Submissions</a>
<a href="/admin/settings">Settings</a>
<a href="/auth/logout">Logout</a>
</div>
<h1>Newsletter Subscribers</h1>
<div id="message"></div>
<div class="filters">
<input type="text" id="emailFilter" placeholder="Filter by email..." />
<select id="sortBy">
<option value="subscribed_at">Sort by Date</option>
<option value="email">Sort by Email</option>
</select>
<select id="sortOrder">
<option value="desc">Newest First</option>
<option value="asc">Oldest First</option>
</select>
<button onclick="applyFilters()">Apply Filters</button>
<button onclick="clearFilters()">Clear</button>
</div>
<div id="loading" class="loading">Loading subscribers...</div>
<table
id="subscribersTable"
class="subscribers-table"
style="display: none"
>
<thead>
<tr>
<th>Email</th>
<th>Subscribed Date</th>
<th>Actions</th>
</tr>
</thead>
<tbody id="subscribersBody"></tbody>
</table>
<div id="pagination" class="pagination" style="display: none">
<button id="prevBtn" onclick="changePage(currentPage - 1)">
Previous
</button>
<span id="pageInfo"></span>
<button id="nextBtn" onclick="changePage(currentPage + 1)">Next</button>
</div>
<script>
let currentPage = 1;
let currentFilters = {
email: "",
sort_by: "subscribed_at",
sort_order: "desc",
};
// Load subscribers on page load
document.addEventListener("DOMContentLoaded", function () {
loadSubscribers();
});
function applyFilters() {
currentFilters.email = document
.getElementById("emailFilter")
.value.trim();
currentFilters.sort_by = document.getElementById("sortBy").value;
currentFilters.sort_order = document.getElementById("sortOrder").value;
currentPage = 1;
loadSubscribers();
}
function clearFilters() {
document.getElementById("emailFilter").value = "";
document.getElementById("sortBy").value = "subscribed_at";
document.getElementById("sortOrder").value = "desc";
currentFilters = {
email: "",
sort_by: "subscribed_at",
sort_order: "desc",
};
currentPage = 1;
loadSubscribers();
}
function loadSubscribers() {
document.getElementById("loading").style.display = "block";
document.getElementById("subscribersTable").style.display = "none";
document.getElementById("pagination").style.display = "none";
const params = new URLSearchParams({
page: currentPage,
per_page: 50,
sort_by: currentFilters.sort_by,
sort_order: currentFilters.sort_order,
});
if (currentFilters.email) {
params.append("email", currentFilters.email);
}
fetch(`/admin/api/newsletter?${params}`)
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
displaySubscribers(data.subscribers);
updatePagination(data.pagination);
} else {
showMessage(
"Error loading subscribers: " +
(data.message || "Unknown error"),
"error"
);
}
})
.catch((error) => {
console.error("Error:", error);
showMessage("Error loading subscribers", "error");
})
.finally(() => {
document.getElementById("loading").style.display = "none";
});
}
function displaySubscribers(subscribers) {
const tbody = document.getElementById("subscribersBody");
tbody.innerHTML = "";
if (subscribers.length === 0) {
tbody.innerHTML =
'<tr><td colspan="3" style="text-align: center; padding: 40px; color: #666;">No subscribers found</td></tr>';
} else {
subscribers.forEach((subscriber) => {
const row = document.createElement("tr");
row.innerHTML = `
<td>${escapeHtml(subscriber.email)}</td>
<td>${new Date(
subscriber.subscribed_at
).toLocaleDateString()}</td>
<td class="actions">
<button class="btn btn-danger" onclick="unsubscribe('${escapeHtml(
subscriber.email
)}')">Unsubscribe</button>
</td>
`;
tbody.appendChild(row);
});
}
document.getElementById("subscribersTable").style.display = "table";
}
function updatePagination(pagination) {
const pageInfo = document.getElementById("pageInfo");
const prevBtn = document.getElementById("prevBtn");
const nextBtn = document.getElementById("nextBtn");
pageInfo.textContent = `Page ${pagination.page} of ${pagination.pages} (${pagination.total} total)`;
prevBtn.disabled = pagination.page <= 1;
nextBtn.disabled = pagination.page >= pagination.pages;
document.getElementById("pagination").style.display = "flex";
}
function changePage(page) {
currentPage = page;
loadSubscribers();
}
function unsubscribe(email) {
if (!confirm(`Are you sure you want to unsubscribe ${email}?`)) {
return;
}
fetch("/api/newsletter", {
method: "DELETE",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ email: email }),
})
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
showMessage("Subscriber unsubscribed successfully", "success");
loadSubscribers(); // Reload the list
} else {
showMessage(
"Error unsubscribing: " + (data.message || "Unknown error"),
"error"
);
}
})
.catch((error) => {
console.error("Error:", error);
showMessage("Error unsubscribing subscriber", "error");
});
}
function showMessage(text, type) {
const messageDiv = document.getElementById("message");
messageDiv.className = `message ${type}`;
messageDiv.textContent = text;
messageDiv.style.display = "block";
setTimeout(() => {
messageDiv.style.display = "none";
}, 5000);
}
function escapeHtml(text) {
const div = document.createElement("div");
div.textContent = text;
return div.innerHTML;
}
</script>
</body>
</html>

View File

@@ -0,0 +1,459 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Create Newsletter</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #333;
margin-bottom: 20px;
}
.nav {
margin-bottom: 20px;
padding: 10px;
background-color: #f8f9fa;
border-radius: 5px;
}
.nav a {
margin-right: 15px;
color: #007bff;
text-decoration: none;
}
.nav a:hover {
text-decoration: underline;
}
.form-section {
margin-bottom: 30px;
padding: 20px;
border: 1px solid #ddd;
border-radius: 8px;
background-color: #f9f9f9;
}
.form-section h2 {
margin-top: 0;
color: #555;
border-bottom: 1px solid #ddd;
padding-bottom: 10px;
}
.form-group {
margin-bottom: 15px;
}
.form-group label {
display: block;
margin-bottom: 5px;
font-weight: bold;
color: #333;
}
.form-group input,
.form-group textarea,
.form-group select {
width: 100%;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
font-size: 14px;
box-sizing: border-box;
}
.form-group textarea {
min-height: 200px;
resize: vertical;
}
.form-row {
display: flex;
gap: 15px;
margin-bottom: 15px;
}
.form-row .form-group {
flex: 1;
margin-bottom: 0;
}
.btn {
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
margin-right: 10px;
}
.btn-primary {
background-color: #007bff;
color: white;
}
.btn-primary:hover {
background-color: #0056b3;
}
.btn-secondary {
background-color: #6c757d;
color: white;
}
.btn-secondary:hover {
background-color: #545b62;
}
.btn-success {
background-color: #28a745;
color: white;
}
.btn-success:hover {
background-color: #1e7e34;
}
.btn-danger {
background-color: #dc3545;
color: white;
}
.btn-danger:hover {
background-color: #c82333;
}
.message {
padding: 10px;
margin-bottom: 20px;
border-radius: 4px;
}
.message.success {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.message.error {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.message.info {
background-color: #d1ecf1;
color: #0c5460;
border: 1px solid #bee5eb;
}
.newsletter-preview {
margin-top: 20px;
padding: 20px;
background-color: white;
border: 1px solid #ddd;
border-radius: 4px;
}
.newsletter-preview h3 {
margin-top: 0;
color: #555;
}
.newsletter-stats {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-bottom: 20px;
}
.stat-card {
background-color: white;
padding: 15px;
border-radius: 8px;
border: 1px solid #ddd;
text-align: center;
}
.stat-card h4 {
margin: 0 0 10px 0;
color: #666;
font-size: 14px;
}
.stat-card .number {
font-size: 24px;
font-weight: bold;
color: #333;
}
.loading {
text-align: center;
padding: 20px;
color: #666;
}
.hidden {
display: none;
}
</style>
</head>
<body>
<div class="nav">
<a href="/admin/">Dashboard</a>
<a href="/admin/newsletter">Subscribers</a>
<a href="/admin/settings">Settings</a>
<a href="/auth/logout">Logout</a>
</div>
<h1>Create Newsletter</h1>
<div id="message"></div>
<div class="newsletter-stats">
<div class="stat-card">
<h4>Total Subscribers</h4>
<div class="number" id="totalSubscribers">--</div>
</div>
<div class="stat-card">
<h4>Active Subscribers</h4>
<div class="number" id="activeSubscribers">--</div>
</div>
<div class="stat-card">
<h4>Last Sent</h4>
<div class="number" id="lastSent">--</div>
</div>
</div>
<form id="newsletterForm">
<div class="form-section">
<h2>Newsletter Details</h2>
<div class="form-row">
<div class="form-group">
<label for="subject">Subject Line *</label>
<input
type="text"
id="subject"
name="subject"
required
placeholder="Enter newsletter subject"
/>
</div>
<div class="form-group">
<label for="senderName">Sender Name</label>
<input
type="text"
id="senderName"
name="sender_name"
placeholder="Your Name"
/>
</div>
</div>
<div class="form-group">
<label for="content">Content *</label>
<textarea
id="content"
name="content"
required
placeholder="Write your newsletter content here..."
></textarea>
</div>
<div class="form-row">
<div class="form-group">
<label for="sendDate">Send Date (optional)</label>
<input type="datetime-local" id="sendDate" name="send_date" />
</div>
<div class="form-group">
<label for="status">Status</label>
<select id="status" name="status">
<option value="draft">Draft</option>
<option value="scheduled">Scheduled</option>
<option value="sent">Sent</option>
</select>
</div>
</div>
</div>
<div class="form-section">
<h2>Actions</h2>
<button
type="button"
class="btn btn-secondary"
onclick="previewNewsletter()"
>
Preview
</button>
<button type="button" class="btn btn-primary" onclick="saveDraft()">
Save Draft
</button>
<button
type="button"
class="btn btn-success"
onclick="sendNewsletter()"
>
Send Newsletter
</button>
<button type="button" class="btn btn-danger" onclick="clearForm()">
Clear
</button>
</div>
</form>
<div id="previewSection" class="newsletter-preview hidden">
<h3>Newsletter Preview</h3>
<div id="previewContent"></div>
</div>
<script>
let newsletterStats = {};
// Load newsletter stats on page load
document.addEventListener("DOMContentLoaded", function () {
loadNewsletterStats();
});
function loadNewsletterStats() {
// Load subscriber count
fetch("/admin/api/newsletter?page=1&per_page=1")
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
document.getElementById("totalSubscribers").textContent =
data.pagination.total;
document.getElementById("activeSubscribers").textContent =
data.pagination.total; // For now, assume all are active
newsletterStats.totalSubscribers = data.pagination.total;
}
})
.catch((error) => {
console.error("Error loading subscriber stats:", error);
});
// For now, set last sent as N/A
document.getElementById("lastSent").textContent = "N/A";
}
function previewNewsletter() {
const subject = document.getElementById("subject").value.trim();
const content = document.getElementById("content").value.trim();
const senderName = document.getElementById("senderName").value.trim();
if (!subject || !content) {
showMessage("Subject and content are required for preview.", "error");
return;
}
const previewContent = document.getElementById("previewContent");
previewContent.innerHTML = `
<h2>${escapeHtml(subject)}</h2>
${
senderName
? `<p><strong>From:</strong> ${escapeHtml(senderName)}</p>`
: ""
}
<div style="margin-top: 20px; line-height: 1.6;">
${content.replace(/\n/g, "<br>")}
</div>
`;
document.getElementById("previewSection").classList.remove("hidden");
showMessage("Newsletter preview generated.", "info");
}
function saveDraft() {
const formData = new FormData(document.getElementById('newsletterForm'));
const newsletterData = {
subject: formData.get('subject'),
content: formData.get('content'),
sender_name: formData.get('sender_name'),
send_date: formData.get('send_date'),
status: 'draft'
};
if (!newsletterData.subject || !newsletterData.content) {
showMessage('Subject and content are required.', 'error');
return;
}
fetch('/admin/api/newsletters', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(newsletterData)
})
.then(response => response.json())
.then(data => {
if (data.status === 'ok') {
showMessage('Newsletter draft saved successfully!', 'success');
} else {
showMessage(data.message || 'Failed to save draft.', 'error');
}
})
.catch(error => {
console.error('Error saving draft:', error);
showMessage('Failed to save draft.', 'error');
});
}
function sendNewsletter() {
const formData = new FormData(document.getElementById('newsletterForm'));
const newsletterData = {
subject: formData.get('subject'),
content: formData.get('content'),
sender_name: formData.get('sender_name'),
send_date: formData.get('send_date'),
status: 'sent'
};
if (!newsletterData.subject || !newsletterData.content) {
showMessage('Subject and content are required.', 'error');
return;
}
if (!confirm(`Are you sure you want to send this newsletter to ${newsletterStats.totalSubscribers || 0} subscribers?`)) {
return;
}
// First save the newsletter
fetch('/admin/api/newsletters', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(newsletterData)
})
.then(response => response.json())
.then(data => {
if (data.status === 'ok') {
const newsletterId = data.newsletter_id;
// Now send it
return fetch(`/admin/api/newsletters/${newsletterId}/send`, {
method: 'POST'
});
} else {
throw new Error(data.message || 'Failed to save newsletter.');
}
})
.then(response => response.json())
.then(data => {
if (data.status === 'ok') {
showMessage(`Newsletter sent successfully to ${data.sent_count} subscribers!`, 'success');
} else {
showMessage(data.message || 'Failed to send newsletter.', 'error');
}
})
.catch(error => {
console.error('Error sending newsletter:', error);
showMessage('Failed to send newsletter.', 'error');
});
}
function clearForm() {
if (
confirm(
"Are you sure you want to clear the form? All unsaved changes will be lost."
)
) {
document.getElementById("newsletterForm").reset();
document.getElementById("previewSection").classList.add("hidden");
showMessage("Form cleared.", "info");
}
}
function showMessage(text, type) {
const messageDiv = document.getElementById("message");
messageDiv.className = `message ${type}`;
messageDiv.textContent = text;
messageDiv.style.display = "block";
setTimeout(() => {
messageDiv.style.display = "none";
}, 5000);
}
function escapeHtml(text) {
const div = document.createElement("div");
div.textContent = text;
return div.innerHTML;
}
</script>
</body>
</html>

View File

@@ -0,0 +1,422 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Admin Settings</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
h1 {
color: #333;
}
h2 {
color: #555;
border-bottom: 1px solid #ddd;
padding-bottom: 5px;
}
.setting-group {
margin-bottom: 20px;
}
.setting {
margin: 5px 0;
}
.setting strong {
display: inline-block;
width: 200px;
}
.logout {
margin-top: 20px;
}
.logout a {
color: #007bff;
text-decoration: none;
}
.logout a:hover {
text-decoration: underline;
}
.settings-management {
margin-top: 40px;
padding: 20px;
border: 1px solid #ddd;
border-radius: 5px;
}
.settings-list {
margin-bottom: 20px;
}
.setting-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 10px;
border-bottom: 1px solid #eee;
}
.setting-item:last-child {
border-bottom: none;
}
.setting-info {
flex-grow: 1;
}
.setting-actions {
display: flex;
gap: 10px;
}
.btn {
padding: 5px 10px;
border: none;
border-radius: 3px;
cursor: pointer;
text-decoration: none;
display: inline-block;
}
.btn-primary {
background: #007bff;
color: white;
}
.btn-danger {
background: #dc3545;
color: white;
}
.btn-secondary {
background: #6c757d;
color: white;
}
.btn:hover {
opacity: 0.8;
}
.form-group {
margin-bottom: 15px;
}
.form-group label {
display: block;
margin-bottom: 5px;
font-weight: bold;
}
.form-group input,
.form-group select {
width: 100%;
padding: 8px;
border: 1px solid #ccc;
border-radius: 3px;
}
.form-row {
display: flex;
gap: 10px;
align-items: end;
}
.form-row .form-group {
flex: 1;
margin-bottom: 0;
}
.message {
padding: 10px;
margin: 10px 0;
border-radius: 3px;
}
.message.success {
background: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.message.error {
background: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.edit-form {
display: none;
margin-top: 10px;
padding: 10px;
background: #f8f9fa;
border-radius: 3px;
}
</style>
</head>
<body>
<div class="logout">
<a
href="/admin/"
style="color: #007bff; text-decoration: none; margin-right: 20px"
>Dashboard</a
>
<a
href="/admin/submissions"
style="color: #007bff; text-decoration: none; margin-right: 20px"
>View Submissions</a
>
<a
href="{{ url_for('auth.logout') }}"
style="color: #007bff; text-decoration: none"
>Logout</a
>
</div>
<h1>Application Settings</h1>
{% for category, category_settings in settings.items() %}
<div class="setting-group">
<h2>{{ category }}</h2>
{% for key, value in category_settings.items() %}
<div class="setting"><strong>{{ key }}:</strong> {{ value }}</div>
{% endfor %}
</div>
{% endfor %}
<div class="settings-management">
<h2>Dynamic Settings Management</h2>
<div id="message"></div>
<div class="settings-list" id="settingsList">
<p>Loading settings...</p>
</div>
<h3>Add New Setting</h3>
<form id="addSettingForm">
<div class="form-row">
<div class="form-group">
<label for="newKey">Setting Key:</label>
<input
type="text"
id="newKey"
name="key"
required
placeholder="e.g., maintenance_mode"
/>
</div>
<div class="form-group">
<label for="newValue">Setting Value:</label>
<input
type="text"
id="newValue"
name="value"
required
placeholder="e.g., false"
/>
</div>
<div class="form-group">
<button type="submit" class="btn btn-primary">Add Setting</button>
</div>
</div>
</form>
</div>
<script>
let appSettings = {};
// Load settings on page load
document.addEventListener("DOMContentLoaded", function () {
loadSettings();
});
// Handle add setting form
document
.getElementById("addSettingForm")
.addEventListener("submit", function (e) {
e.preventDefault();
const formData = new FormData(this);
const key = formData.get("key").trim();
const value = formData.get("value").trim();
if (!key || !value) {
showMessage("Both key and value are required.", "error");
return;
}
addSetting(key, value);
});
function loadSettings() {
fetch("/admin/api/settings")
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
appSettings = data.settings;
displaySettings();
} else {
showMessage(
"Error loading settings: " + (data.message || "Unknown error"),
"error"
);
}
})
.catch((error) => {
console.error("Error:", error);
showMessage("Error loading settings", "error");
});
}
function displaySettings() {
const container = document.getElementById("settingsList");
if (Object.keys(appSettings).length === 0) {
container.innerHTML = "<p>No dynamic settings configured.</p>";
return;
}
const settingsHtml = Object.entries(appSettings)
.map(
([key, value]) => `
<div class="setting-item">
<div class="setting-info">
<strong>${escapeHtml(key)}:</strong> ${escapeHtml(value)}
</div>
<div class="setting-actions">
<button class="btn btn-secondary" onclick="editSetting('${escapeHtml(
key
)}')">Edit</button>
<button class="btn btn-danger" onclick="deleteSetting('${escapeHtml(
key
)}')">Delete</button>
</div>
</div>
<div class="edit-form" id="edit-${escapeHtml(key)}">
<div class="form-row">
<div class="form-group">
<label>Key:</label>
<input type="text" value="${escapeHtml(key)}" readonly>
</div>
<div class="form-group">
<label>New Value:</label>
<input type="text" id="edit-value-${escapeHtml(
key
)}" value="${escapeHtml(value)}" required>
</div>
<div class="form-group">
<button class="btn btn-primary" onclick="updateSetting('${escapeHtml(
key
)}')">Update</button>
<button class="btn btn-secondary" onclick="cancelEdit('${escapeHtml(
key
)}')">Cancel</button>
</div>
</div>
</div>
`
)
.join("");
container.innerHTML = settingsHtml;
}
function addSetting(key, value) {
fetch(`/admin/api/settings/${encodeURIComponent(key)}`, {
method: "PUT",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ value: value }),
})
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
appSettings[key] = value;
displaySettings();
document.getElementById("addSettingForm").reset();
showMessage("Setting added successfully!", "success");
} else {
showMessage(
"Error adding setting: " + (data.message || "Unknown error"),
"error"
);
}
})
.catch((error) => {
console.error("Error:", error);
showMessage("Error adding setting", "error");
});
}
function editSetting(key) {
document.getElementById(`edit-${key}`).style.display = "block";
}
function cancelEdit(key) {
document.getElementById(`edit-${key}`).style.display = "none";
}
function updateSetting(key) {
const newValue = document
.getElementById(`edit-value-${key}`)
.value.trim();
if (!newValue) {
showMessage("Value cannot be empty.", "error");
return;
}
fetch(`/admin/api/settings/${encodeURIComponent(key)}`, {
method: "PUT",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ value: newValue }),
})
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
appSettings[key] = newValue;
displaySettings();
cancelEdit(key);
showMessage("Setting updated successfully!", "success");
} else {
showMessage(
"Error updating setting: " + (data.message || "Unknown error"),
"error"
);
}
})
.catch((error) => {
console.error("Error:", error);
showMessage("Error updating setting", "error");
});
}
function deleteSetting(key) {
if (!confirm(`Are you sure you want to delete the setting "${key}"?`)) {
return;
}
fetch(`/admin/api/settings/${encodeURIComponent(key)}`, {
method: "DELETE",
})
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
delete appSettings[key];
displaySettings();
showMessage("Setting deleted successfully!", "success");
} else {
showMessage(
"Error deleting setting: " + (data.message || "Unknown error"),
"error"
);
}
})
.catch((error) => {
console.error("Error:", error);
showMessage("Error deleting setting", "error");
});
}
function showMessage(text, type) {
const messageDiv = document.getElementById("message");
messageDiv.className = `message ${type}`;
messageDiv.textContent = text;
messageDiv.style.display = "block";
setTimeout(() => {
messageDiv.style.display = "none";
}, 5000);
}
function escapeHtml(text) {
const div = document.createElement("div");
div.textContent = text;
return div.innerHTML;
}
</script>
</body>
</html>

View File

@@ -0,0 +1,437 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Contact Submissions</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #333;
margin-bottom: 20px;
}
.nav {
margin-bottom: 20px;
}
.nav a {
color: #007bff;
text-decoration: none;
margin-right: 20px;
}
.nav a:hover {
text-decoration: underline;
}
.filters {
background: #f8f9fa;
padding: 15px;
border-radius: 5px;
margin-bottom: 20px;
}
.filters form {
display: flex;
gap: 10px;
flex-wrap: wrap;
align-items: end;
}
.filters label {
display: block;
margin-bottom: 5px;
font-weight: bold;
}
.filters input,
.filters select {
padding: 8px;
border: 1px solid #ccc;
border-radius: 4px;
}
.filters button {
padding: 8px 15px;
background: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.filters button:hover {
background: #0056b3;
}
.filters .clear-btn {
background: #6c757d;
}
.filters .clear-btn:hover {
background: #545b62;
}
table {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}
th,
td {
padding: 12px;
text-align: left;
border-bottom: 1px solid #ddd;
}
th {
background-color: #f8f9fa;
font-weight: bold;
cursor: pointer;
}
th:hover {
background-color: #e9ecef;
}
th.sort-asc::after {
content: " ↑";
}
th.sort-desc::after {
content: " ↓";
}
tr:hover {
background-color: #f5f5f5;
}
.message {
padding: 8px;
margin: 10px 0;
border-radius: 4px;
}
.message.success {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.message.error {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.pagination {
margin-top: 20px;
text-align: center;
}
.pagination button {
padding: 8px 12px;
margin: 0 2px;
border: 1px solid #ddd;
background: white;
cursor: pointer;
}
.pagination button:hover {
background: #f8f9fa;
}
.pagination button.active {
background: #007bff;
color: white;
border-color: #007bff;
}
.pagination button:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.delete-btn {
background: #dc3545;
color: white;
border: none;
padding: 4px 8px;
border-radius: 3px;
cursor: pointer;
}
.delete-btn:hover {
background: #c82333;
}
.loading {
text-align: center;
padding: 20px;
}
.no-data {
text-align: center;
padding: 40px;
color: #666;
}
.submission-details {
max-width: 300px;
word-wrap: break-word;
}
</style>
</head>
<body>
<div class="nav">
<a href="/admin/">Dashboard</a>
<a href="/admin/settings">Settings</a>
<a href="{{ url_for('auth.logout') }}">Logout</a>
</div>
<h1>Contact Form Submissions</h1>
<div id="message"></div>
<div class="filters">
<form id="filterForm">
<div>
<label for="email">Email Filter:</label>
<input
type="text"
id="email"
name="email"
placeholder="Filter by email"
/>
</div>
<div>
<label for="date_from">Date From:</label>
<input type="date" id="date_from" name="date_from" />
</div>
<div>
<label for="date_to">Date To:</label>
<input type="date" id="date_to" name="date_to" />
</div>
<div>
<label for="per_page">Items per page:</label>
<select id="per_page" name="per_page">
<option value="25">25</option>
<option value="50" selected>50</option>
<option value="100">100</option>
</select>
</div>
<button type="submit">Apply Filters</button>
<button type="button" class="clear-btn" onclick="clearFilters()">
Clear
</button>
</form>
</div>
<div id="loading" class="loading" style="display: none">Loading...</div>
<table id="submissionsTable">
<thead>
<tr>
<th data-sort="id">ID</th>
<th data-sort="name">Name</th>
<th data-sort="email">Email</th>
<th data-sort="company">Company</th>
<th>Message</th>
<th data-sort="created_at">Date</th>
<th>Actions</th>
</tr>
</thead>
<tbody id="submissionsBody">
<tr>
<td colspan="7" class="no-data">Loading submissions...</td>
</tr>
</tbody>
</table>
<div class="pagination" id="pagination"></div>
<script>
let currentPage = 1;
let currentSortBy = "created_at";
let currentSortOrder = "desc";
// Load submissions on page load
document.addEventListener("DOMContentLoaded", function () {
loadSubmissions();
});
// Handle filter form submission
document
.getElementById("filterForm")
.addEventListener("submit", function (e) {
e.preventDefault();
currentPage = 1;
loadSubmissions();
});
// Handle table header sorting
document.querySelectorAll("th[data-sort]").forEach((header) => {
header.addEventListener("click", function () {
const sortBy = this.dataset.sort;
if (currentSortBy === sortBy) {
currentSortOrder = currentSortOrder === "asc" ? "desc" : "asc";
} else {
currentSortBy = sortBy;
currentSortOrder = "asc";
}
currentPage = 1;
loadSubmissions();
});
});
function clearFilters() {
document.getElementById("email").value = "";
document.getElementById("date_from").value = "";
document.getElementById("date_to").value = "";
document.getElementById("per_page").value = "50";
currentPage = 1;
currentSortBy = "created_at";
currentSortOrder = "desc";
loadSubmissions();
}
function loadSubmissions() {
const loading = document.getElementById("loading");
const table = document.getElementById("submissionsTable");
loading.style.display = "block";
table.style.opacity = "0.5";
const params = new URLSearchParams({
page: currentPage,
per_page: document.getElementById("per_page").value,
sort_by: currentSortBy,
sort_order: currentSortOrder,
email: document.getElementById("email").value,
date_from: document.getElementById("date_from").value,
date_to: document.getElementById("date_to").value,
});
fetch(`/api/contact?${params}`)
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
displaySubmissions(data.submissions);
displayPagination(data.pagination);
} else {
showMessage(
"Error loading submissions: " +
(data.message || "Unknown error"),
"error"
);
}
})
.catch((error) => {
console.error("Error:", error);
showMessage("Error loading submissions", "error");
})
.finally(() => {
loading.style.display = "none";
table.style.opacity = "1";
});
}
function displaySubmissions(submissions) {
const tbody = document.getElementById("submissionsBody");
if (submissions.length === 0) {
tbody.innerHTML =
'<tr><td colspan="7" class="no-data">No submissions found</td></tr>';
return;
}
tbody.innerHTML = submissions
.map(
(submission) => `
<tr>
<td>${submission.id}</td>
<td>${escapeHtml(submission.name)}</td>
<td>${escapeHtml(submission.email)}</td>
<td>${escapeHtml(submission.company || "")}</td>
<td class="submission-details">${escapeHtml(
submission.message
)}</td>
<td>${new Date(submission.created_at).toLocaleString()}</td>
<td><button class="delete-btn" onclick="deleteSubmission(${
submission.id
})">Delete</button></td>
</tr>
`
)
.join("");
}
function displayPagination(pagination) {
const paginationDiv = document.getElementById("pagination");
if (pagination.pages <= 1) {
paginationDiv.innerHTML = "";
return;
}
let buttons = [];
// Previous button
buttons.push(
`<button ${
pagination.page <= 1 ? "disabled" : ""
} onclick="changePage(${pagination.page - 1})">Previous</button>`
);
// Page numbers
const startPage = Math.max(1, pagination.page - 2);
const endPage = Math.min(pagination.pages, pagination.page + 2);
for (let i = startPage; i <= endPage; i++) {
buttons.push(
`<button class="${
i === pagination.page ? "active" : ""
}" onclick="changePage(${i})">${i}</button>`
);
}
// Next button
buttons.push(
`<button ${
pagination.page >= pagination.pages ? "disabled" : ""
} onclick="changePage(${pagination.page + 1})">Next</button>`
);
paginationDiv.innerHTML = buttons.join("");
}
function changePage(page) {
currentPage = page;
loadSubmissions();
window.scrollTo(0, 0);
}
function deleteSubmission(id) {
if (!confirm("Are you sure you want to delete this submission?")) {
return;
}
fetch(`/api/contact/${id}`, {
method: "DELETE",
})
.then((response) => response.json())
.then((data) => {
if (data.status === "ok") {
showMessage("Submission deleted successfully", "success");
loadSubmissions(); // Reload the current page
} else {
showMessage(
"Error deleting submission: " +
(data.message || "Unknown error"),
"error"
);
}
})
.catch((error) => {
console.error("Error:", error);
showMessage("Error deleting submission", "error");
});
}
function showMessage(text, type) {
const messageDiv = document.getElementById("message");
messageDiv.className = `message ${type}`;
messageDiv.textContent = text;
messageDiv.style.display = "block";
// Auto-hide after 5 seconds
setTimeout(() => {
messageDiv.style.display = "none";
}, 5000);
}
function escapeHtml(text) {
const div = document.createElement("div");
div.textContent = text;
return div.innerHTML;
}
</script>
</body>
</html>

47
templates/login.html Normal file
View File

@@ -0,0 +1,47 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Admin Login</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 40px;
}
form {
max-width: 300px;
margin: auto;
}
input {
display: block;
margin: 10px 0;
padding: 8px;
width: 100%;
}
button {
padding: 10px;
background: #007bff;
color: white;
border: none;
cursor: pointer;
}
.flash {
color: red;
}
</style>
</head>
<body>
<h1>Admin Login</h1>
{% with messages = get_flashed_messages() %} {% if messages %}
<div class="flash">
{% for message in messages %} {{ message }} {% endfor %}
</div>
{% endif %} {% endwith %}
<form method="post">
<input type="text" name="username" placeholder="Username" required />
<input type="password" name="password" placeholder="Password" required />
<button type="submit">Login</button>
</form>
</body>
</html>

View File

@@ -0,0 +1,137 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Newsletter Management</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
max-width: 600px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #333;
text-align: center;
}
.message {
padding: 10px;
margin: 10px 0;
border-radius: 4px;
}
.message.success {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.message.error {
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}
.message.info {
background-color: #d1ecf1;
color: #0c5460;
border: 1px solid #bee5eb;
}
.form-section {
margin: 20px 0;
padding: 20px;
border: 1px solid #ddd;
border-radius: 8px;
}
.form-section h2 {
margin-top: 0;
color: #555;
}
form {
display: flex;
flex-direction: column;
gap: 10px;
}
input[type="email"] {
padding: 8px;
border: 1px solid #ccc;
border-radius: 4px;
}
button {
padding: 10px;
background-color: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:hover {
background-color: #0056b3;
}
.unsubscribe-btn {
background-color: #dc3545;
}
.unsubscribe-btn:hover {
background-color: #c82333;
}
.update-section {
display: flex;
gap: 10px;
}
.update-section input {
flex: 1;
}
</style>
</head>
<body>
<h1>Newsletter Subscription Management</h1>
{% if message %}
<div class="message {{ message_type }}">{{ message }}</div>
{% endif %}
<div class="form-section">
<h2>Subscribe to Newsletter</h2>
<form method="post">
<input type="hidden" name="action" value="subscribe" />
<input
type="email"
name="email"
placeholder="Enter your email address"
required
/>
<button type="submit">Subscribe</button>
</form>
</div>
<div class="form-section">
<h2>Unsubscribe from Newsletter</h2>
<form method="post">
<input type="hidden" name="action" value="unsubscribe" />
<input
type="email"
name="email"
placeholder="Enter your email address"
required
/>
<button type="submit" class="unsubscribe-btn">Unsubscribe</button>
</form>
</div>
<div class="form-section">
<h2>Update Email Address</h2>
<form method="post">
<input type="hidden" name="action" value="update" />
<div class="update-section">
<input
type="email"
name="old_email"
placeholder="Current email"
required
/>
<input type="email" name="email" placeholder="New email" required />
</div>
<button type="submit">Update Email</button>
</form>
</div>
</body>
</html>

30
tests/conftest.py Normal file
View File

@@ -0,0 +1,30 @@
import os
import tempfile
import pytest
import importlib
import sys
from pathlib import Path
# Ensure the repository root is on sys.path so tests can import the server package.
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
server_app_module = importlib.import_module("server.app")
# Expose app and init_db from the imported module
app = server_app_module.app
init_db = server_app_module.init_db
@pytest.fixture(autouse=True, scope="function")
def setup_tmp_db(tmp_path, monkeypatch):
"""Set up the database for each test function."""
tmp_db = tmp_path / "forms.db"
# Patch the module attribute directly to avoid package name collisions
monkeypatch.setattr(server_app_module, "DB_PATH", tmp_db, raising=False)
monkeypatch.setattr("server.settings.ADMIN_USERNAME", "admin")
monkeypatch.setattr("server.settings.ADMIN_PASSWORD", "admin")
init_db()
yield

97
tests/test_admin.py Normal file
View File

@@ -0,0 +1,97 @@
"""Tests for admin routes."""
import pytest
from server.app import app
@pytest.fixture
def client():
with app.test_client() as client:
yield client
@pytest.fixture(autouse=True)
def setup_admin_creds(monkeypatch):
monkeypatch.setattr("server.settings.ADMIN_USERNAME", "admin")
monkeypatch.setattr("server.settings.ADMIN_PASSWORD", "admin")
def test_admin_settings_requires_login(client):
"""Test admin settings page requires login."""
resp = client.get("/admin/settings")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_admin_settings_with_login(client):
"""Test admin settings page displays when logged in."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Access settings
resp = client.get("/admin/settings")
assert resp.status_code == 200
assert b"Application Settings" in resp.data
assert b"Database" in resp.data
assert b"SMTP" in resp.data
assert b"Logout" in resp.data
def test_admin_dashboard_requires_login(client):
"""Test admin dashboard requires login."""
resp = client.get("/admin/")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_admin_dashboard_with_login(client):
"""Test admin dashboard displays when logged in."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Access dashboard
resp = client.get("/admin/")
assert resp.status_code == 200
assert b"Admin Dashboard" in resp.data
assert b"Newsletter Subscribers" in resp.data
assert b"Logout" in resp.data
def test_admin_newsletter_subscribers_requires_login(client):
"""Test newsletter subscribers page requires login."""
resp = client.get("/admin/newsletter")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_admin_newsletter_subscribers_with_login(client):
"""Test newsletter subscribers page displays when logged in."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Access newsletter subscribers
resp = client.get("/admin/newsletter")
assert resp.status_code == 200
assert b"Newsletter Subscribers" in resp.data
assert b"Logout" in resp.data
def test_admin_newsletter_create_requires_login(client):
"""Test newsletter create page requires login."""
resp = client.get("/admin/newsletter/create")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_admin_newsletter_create_with_login(client):
"""Test newsletter create page displays when logged in."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Access newsletter create
resp = client.get("/admin/newsletter/create")
assert resp.status_code == 200
assert b"Create Newsletter" in resp.data
assert b"Subject Line" in resp.data
assert b"Content" in resp.data
assert b"Logout" in resp.data

View File

@@ -0,0 +1,174 @@
import sqlite3
import importlib
import pytest
server_app_module = importlib.import_module("server.app")
# Expose app and init_db from the imported module
app = server_app_module.app
init_db = server_app_module.init_db
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def test_get_contact_submissions_requires_auth(client):
"""Test that getting contact submissions requires authentication."""
resp = client.get("/api/contact")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_get_contact_submissions_with_auth(client):
"""Test getting contact submissions when authenticated."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Create some test submissions
client.post("/api/contact", data={"name": "Test User 1",
"email": "test1@example.com", "message": "Message 1", "consent": "on"})
client.post("/api/contact", data={"name": "Test User 2",
"email": "test2@example.com", "message": "Message 2", "consent": "on"})
resp = client.get("/api/contact")
assert resp.status_code == 200
data = resp.get_json()
assert data["status"] == "ok"
assert "submissions" in data
assert len(data["submissions"]) == 2
# Check pagination info
assert "pagination" in data
assert data["pagination"]["total"] == 2
assert data["pagination"]["page"] == 1
assert data["pagination"]["per_page"] == 50
def test_admin_get_contact_submissions_requires_auth(client):
"""Test that getting contact submissions via admin API requires authentication."""
resp = client.get("/admin/api/contact")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_admin_get_contact_submissions_with_auth(client):
"""Test getting contact submissions via admin API when authenticated."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Create some test submissions
client.post("/api/contact", data={"name": "Test User 1",
"email": "test1@example.com", "message": "Message 1", "consent": "on"})
client.post("/api/contact", data={"name": "Test User 2",
"email": "test2@example.com", "message": "Message 2", "consent": "on"})
resp = client.get("/admin/api/contact")
assert resp.status_code == 200
data = resp.get_json()
assert data["status"] == "ok"
assert "submissions" in data
assert len(data["submissions"]) == 2
# Check pagination info
assert "pagination" in data
assert data["pagination"]["total"] == 2
assert data["pagination"]["page"] == 1
assert data["pagination"]["per_page"] == 50
def test_delete_contact_submission_requires_auth(client):
"""Test that deleting contact submissions requires authentication."""
resp = client.delete("/api/contact/1")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_delete_contact_submission_with_auth(client):
"""Test deleting contact submissions when authenticated."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Create a test submission
resp = client.post("/api/contact", data={"name": "Test User",
"email": "test@example.com", "message": "Message", "consent": "on"})
submission_id = resp.get_json()["id"]
# Delete the submission
resp = client.delete(f"/api/contact/{submission_id}")
assert resp.status_code == 200
data = resp.get_json()
assert data["status"] == "ok"
assert "deleted successfully" in data["message"]
# Verify it's gone
resp = client.get("/api/contact")
data = resp.get_json()
assert len(data["submissions"]) == 0
def test_admin_submissions_page_requires_auth(client):
"""Test that admin submissions page requires authentication."""
resp = client.get("/admin/submissions")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_admin_submissions_page_with_auth(client):
"""Test admin submissions page loads when authenticated."""
# Login and access submissions page
client.post("/auth/login", data={"username": "admin", "password": "admin"})
resp = client.get("/admin/submissions")
assert resp.status_code == 200
assert b"Contact Form Submissions" in resp.data
assert b"Loading submissions" in resp.data
def test_admin_delete_contact_submission_requires_auth(client):
"""Test that deleting contact submissions via admin API requires authentication."""
resp = client.delete("/admin/api/contact/1")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_admin_delete_contact_submission_with_auth(client):
"""Test deleting contact submissions via admin API when authenticated."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Create a test submission
client.post("/api/contact", data={"name": "Test User",
"email": "test@example.com", "message": "Message", "consent": "on"})
# Get the submission to find its ID
resp = client.get("/admin/api/contact")
data = resp.get_json()
submission_id = data["submissions"][0]["id"]
# Delete the submission
resp = client.delete(f"/admin/api/contact/{submission_id}")
assert resp.status_code == 200
delete_data = resp.get_json()
assert delete_data["status"] == "ok"
# Verify it's deleted
resp = client.get("/admin/api/contact")
data = resp.get_json()
assert len(data["submissions"]) == 0
def test_admin_delete_nonexistent_contact_submission(client):
"""Test deleting a non-existent contact submission."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Try to delete a non-existent submission
resp = client.delete("/admin/api/contact/999")
assert resp.status_code == 404
data = resp.get_json()
assert data["status"] == "error"
assert "not found" in data["message"]

View File

@@ -0,0 +1,115 @@
"""Tests for admin newsletter API endpoints."""
import pytest
from server.app import app
@pytest.fixture
def client():
with app.test_client() as client:
yield client
@pytest.fixture(autouse=True)
def setup_admin_creds(monkeypatch):
monkeypatch.setattr("server.settings.ADMIN_USERNAME", "admin")
monkeypatch.setattr("server.settings.ADMIN_PASSWORD", "admin")
def test_create_newsletter_requires_login(client):
"""Test creating newsletter requires login."""
resp = client.post("/admin/api/newsletters", json={
"subject": "Test Subject",
"content": "Test content"
})
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_create_newsletter_with_login(client):
"""Test creating newsletter when logged in."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Create newsletter
resp = client.post("/admin/api/newsletters", json={
"subject": "Test Subject",
"content": "Test content",
"sender_name": "Test Sender"
})
assert resp.status_code == 201
data = resp.get_json()
assert data["status"] == "ok"
assert "newsletter_id" in data
def test_create_newsletter_missing_fields(client):
"""Test creating newsletter with missing required fields."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Try without subject
resp = client.post("/admin/api/newsletters", json={
"content": "Test content"
})
assert resp.status_code == 400
data = resp.get_json()
assert data["status"] == "error"
assert "required" in data["message"]
# Try without content
resp = client.post("/admin/api/newsletters", json={
"subject": "Test Subject"
})
assert resp.status_code == 400
data = resp.get_json()
assert data["status"] == "error"
assert "required" in data["message"]
def test_get_newsletters_requires_login(client):
"""Test getting newsletters requires login."""
resp = client.get("/admin/api/newsletters")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_get_newsletters_with_login(client):
"""Test getting newsletters when logged in."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Create a newsletter first
client.post("/admin/api/newsletters", json={
"subject": "Test Subject",
"content": "Test content"
})
# Get newsletters
resp = client.get("/admin/api/newsletters")
assert resp.status_code == 200
data = resp.get_json()
assert data["status"] == "ok"
assert "newsletters" in data
assert "pagination" in data
assert len(data["newsletters"]) >= 1
def test_send_newsletter_requires_login(client):
"""Test sending newsletter requires login."""
resp = client.post("/admin/api/newsletters/1/send")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_send_newsletter_not_found(client):
"""Test sending non-existent newsletter."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Try to send non-existent newsletter
resp = client.post("/admin/api/newsletters/999/send")
assert resp.status_code == 404
data = resp.get_json()
assert data["status"] == "error"
assert "not found" in data["message"]

View File

@@ -0,0 +1,79 @@
import sqlite3
import importlib
import pytest
server_app_module = importlib.import_module("server.app")
# Expose app and init_db from the imported module
app = server_app_module.app
init_db = server_app_module.init_db
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def test_get_app_settings_api_requires_auth(client):
"""Test that getting app settings requires authentication."""
resp = client.get("/admin/api/settings")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_get_app_settings_api_with_auth(client):
"""Test getting app settings via API when authenticated."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
resp = client.get("/admin/api/settings")
assert resp.status_code == 200
data = resp.get_json()
assert data["status"] == "ok"
assert "settings" in data
assert isinstance(data["settings"], dict)
def test_update_app_setting_api_requires_auth(client):
"""Test that updating app settings requires authentication."""
resp = client.put("/admin/api/settings/test_key",
json={"value": "test_value"})
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
def test_update_app_setting_api_with_auth(client):
"""Test updating app settings via API when authenticated."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Update a setting
resp = client.put("/admin/api/settings/test_key",
json={"value": "test_value"})
assert resp.status_code == 200
data = resp.get_json()
assert data["status"] == "ok"
assert "updated successfully" in data["message"]
# Verify it was saved
resp = client.get("/admin/api/settings")
data = resp.get_json()
assert data["settings"]["test_key"] == "test_value"
def test_delete_app_setting_api_with_auth(client):
"""Test deleting app settings via API when authenticated."""
# Login first
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Add a setting first
client.put("/admin/api/settings/delete_test", json={"value": "to_delete"})
# Delete the setting
resp = client.delete("/admin/api/settings/delete_test")
assert resp.status_code == 200
data = resp.get_json()
assert data["status"] == "ok"
assert "deleted successfully" in data["message"]

27
tests/test_api.py Normal file
View File

@@ -0,0 +1,27 @@
import sqlite3
import importlib
import pytest
server_app_module = importlib.import_module("server.app")
# Expose app and init_db from the imported module
app = server_app_module.app
init_db = server_app_module.init_db
@pytest.fixture(autouse=True)
def setup_tmp_db(tmp_path, monkeypatch):
tmp_db = tmp_path / "forms.db"
# Patch the module attribute directly to avoid package name collisions
monkeypatch.setattr(server_app_module, "DB_PATH", tmp_db, raising=False)
monkeypatch.setattr("server.settings.ADMIN_USERNAME", "admin")
monkeypatch.setattr("server.settings.ADMIN_PASSWORD", "admin")
init_db()
yield
@pytest.fixture
def client():
with app.test_client() as client:
yield client

69
tests/test_auth.py Normal file
View File

@@ -0,0 +1,69 @@
"""Tests for authentication functionality."""
import pytest
from server.app import app
@pytest.fixture
def client():
with app.test_client() as client:
yield client
@pytest.fixture(autouse=True)
def setup_admin_creds(monkeypatch):
monkeypatch.setattr("server.settings.ADMIN_USERNAME", "admin")
monkeypatch.setattr("server.settings.ADMIN_PASSWORD", "admin")
def test_login_page_get(client):
"""Test login page renders."""
resp = client.get("/auth/login")
assert resp.status_code == 200
assert b"Admin Login" in resp.data
def test_login_success(client):
"""Test successful login."""
resp = client.post(
"/auth/login", data={"username": "admin", "password": "admin"})
assert resp.status_code == 302 # Redirect to admin dashboard
assert resp.headers["Location"] == "/admin/"
# Check session
with client.session_transaction() as sess:
assert sess["logged_in"] is True
def test_login_failure(client):
"""Test failed login."""
resp = client.post(
"/auth/login", data={"username": "wrong", "password": "wrong"})
assert resp.status_code == 200
assert b"Invalid credentials" in resp.data
# Check session not set
with client.session_transaction() as sess:
assert "logged_in" not in sess
def test_logout(client):
"""Test logout."""
# First login
client.post("/auth/login", data={"username": "admin", "password": "admin"})
# Then logout
resp = client.get("/auth/logout")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"
# Check session cleared
with client.session_transaction() as sess:
assert "logged_in" not in sess
def test_protected_route_without_login(client):
"""Test accessing protected route without login redirects to login."""
resp = client.get("/admin/settings")
assert resp.status_code == 302
assert resp.headers["Location"] == "/auth/login"

39
tests/test_contact_api.py Normal file
View File

@@ -0,0 +1,39 @@
import importlib
import pytest
server_app_module = importlib.import_module("server.app")
# Expose app and init_db from the imported module
app = server_app_module.app
init_db = server_app_module.init_db
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def post(client, data):
return client.post("/api/contact", data=data)
def test_valid_submission_creates_record_and_returns_201(client):
resp = post(
client,
{"name": "Test User", "email": "test@example.com",
"message": "Hello", "consent": "on"},
)
assert resp.status_code in (201, 202)
body = resp.get_json()
assert body["status"] == "ok"
assert isinstance(body.get("id"), int)
def test_missing_required_fields_returns_400(client):
resp = post(client, {"name": "", "email": "", "message": ""})
assert resp.status_code == 400
body = resp.get_json()
assert body["status"] == "error"
assert "errors" in body

View File

@@ -0,0 +1,94 @@
from __future__ import annotations
from email.message import EmailMessage
from typing import Any, cast
import pytest
from server.services import contact as contact_service # noqa: E402 pylint: disable=wrong-import-position
@pytest.fixture
def patched_settings(monkeypatch):
original = contact_service.settings.SMTP_SETTINGS.copy()
patched = original.copy()
monkeypatch.setattr(contact_service.settings, "SMTP_SETTINGS", patched)
return patched
def test_send_notification_returns_false_when_unconfigured(monkeypatch, patched_settings):
patched_settings.update({"host": None, "recipients": []})
# Ensure we do not accidentally open a socket if called
monkeypatch.setattr(contact_service.smtplib, "SMTP", pytest.fail)
submission = contact_service.ContactSubmission(
name="Test",
email="test@example.com",
company=None,
message="Hello",
timeline=None,
)
assert contact_service.send_notification(submission) is False
def test_send_notification_sends_email(monkeypatch, patched_settings):
patched_settings.update(
{
"host": "smtp.example.com",
"port": 2525,
"sender": "sender@example.com",
"username": "user",
"password": "secret",
"use_tls": True,
"recipients": ["owner@example.com"],
}
)
smtp_calls: dict[str, Any] = {}
class DummySMTP:
def __init__(self, host, port, timeout=None):
smtp_calls["init"] = (host, port, timeout)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def starttls(self):
smtp_calls["starttls"] = True
def login(self, username, password):
smtp_calls["login"] = (username, password)
def send_message(self, message):
smtp_calls["message"] = message
monkeypatch.setattr(contact_service.smtplib, "SMTP", DummySMTP)
submission = contact_service.ContactSubmission(
name="Alice",
email="alice@example.com",
company="Example Co",
message="Hello there",
timeline="Soon",
)
assert contact_service.send_notification(submission) is True
assert smtp_calls["init"] == (
patched_settings["host"],
patched_settings["port"],
15,
)
assert smtp_calls["starttls"] is True
assert smtp_calls["login"] == (
patched_settings["username"], patched_settings["password"])
message = cast(EmailMessage, smtp_calls["message"])
assert message["Subject"] == "Neue Kontaktanfrage von Alice"
assert message["To"] == "owner@example.com"
assert "Hello there" in message.get_content()

90
tests/test_database.py Normal file
View File

@@ -0,0 +1,90 @@
import sqlite3
import importlib
import pytest
server_app_module = importlib.import_module("server.app")
# Expose app and init_db from the imported module
app = server_app_module.app
init_db = server_app_module.init_db
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def test_is_postgres_enabled():
"""Test postgres detection logic."""
from server.database import is_postgres_enabled, set_postgres_override
# Test override functionality
set_postgres_override(True)
assert is_postgres_enabled()
set_postgres_override(False)
assert not is_postgres_enabled()
set_postgres_override(None) # Reset to default
def test_db_cursor_context_manager():
"""Test database cursor context manager."""
from server.database import db_cursor
with db_cursor() as (conn, cur):
assert conn is not None
assert cur is not None
# Test that we can execute a query
cur.execute("SELECT 1")
result = cur.fetchone()
assert result[0] == 1
def test_get_app_settings_empty():
"""Test getting app settings when none exist."""
from server.database import get_app_settings
settings = get_app_settings()
assert isinstance(settings, dict)
assert len(settings) == 0
def test_update_app_setting():
"""Test updating app settings."""
from server.database import update_app_setting, get_app_settings
# Update a setting
update_app_setting("test_key", "test_value")
# Verify it was saved
settings = get_app_settings()
assert settings["test_key"] == "test_value"
def test_delete_app_setting():
"""Test deleting app settings."""
from server.database import update_app_setting, delete_app_setting, get_app_settings
# Add a setting
update_app_setting("delete_test", "to_delete")
# Delete it
delete_app_setting("delete_test")
# Verify it's gone
settings = get_app_settings()
assert "delete_test" not in settings
def test_get_contacts_pagination():
"""Test contact pagination."""
from server.database import get_contacts
# Get first page
submissions, total = get_contacts(page=1, per_page=10)
assert isinstance(submissions, list)
assert isinstance(total, int)
assert total >= 0

View File

@@ -0,0 +1,79 @@
"""SMTP integration tests relying on real infrastructure."""
from __future__ import annotations
import os
import smtplib
from email.message import EmailMessage
import pytest
from server.services import contact as contact_service
RUN_INTEGRATION = os.getenv("RUN_SMTP_INTEGRATION_TEST")
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(
not RUN_INTEGRATION,
reason="Set RUN_SMTP_INTEGRATION_TEST=1 to enable SMTP integration tests.",
),
]
def _require_smtp_settings():
settings = contact_service.settings.SMTP_SETTINGS
if not settings["host"] or not settings["recipients"] or not settings["username"]:
pytest.skip("SMTP settings not fully configured via environment")
return settings
def _build_submission() -> contact_service.ContactSubmission:
settings = contact_service.settings.SMTP_SETTINGS
return contact_service.ContactSubmission(
name="Integration Test",
email=settings["sender"] or settings["username"] or "integration@example.com",
company="Integration",
message="Integration test notification",
timeline=None,
)
'''
Test sending a notification via SMTP using real settings.
This requires a properly configured SMTP server and valid credentials.
Commenting out to avoid accidental execution during local runs.
@pytest.mark.skip(reason="Requires real SMTP server configuration")
'''
'''
def test_send_notification_real_smtp():
settings = _require_smtp_settings()
submission = _build_submission()
assert contact_service.send_notification(submission) is True
def test_direct_smtp_connection():
settings = _require_smtp_settings()
use_ssl = settings["port"] == 465
client_cls = smtplib.SMTP_SSL if use_ssl else smtplib.SMTP
with client_cls(settings["host"], settings["port"], timeout=10) as client:
client.ehlo()
if settings["use_tls"] and not use_ssl:
client.starttls()
client.ehlo()
client.login(settings["username"], settings["password"] or "")
message = EmailMessage()
message["Subject"] = "SMTP integration check"
message["From"] = settings["sender"] or settings["username"]
message["To"] = settings["recipients"][0]
message.set_content(
"This is a test email for SMTP integration checks.")
client.send_message(message)
'''

56
tests/test_metrics.py Normal file
View File

@@ -0,0 +1,56 @@
import importlib
import time
import pytest
server_app_module = importlib.import_module("server.app")
# Expose app and init_db from the imported module
app = server_app_module.app
init_db = server_app_module.init_db
@pytest.fixture(autouse=True)
def setup_tmp_db(tmp_path, monkeypatch):
tmp_db = tmp_path / "forms.db"
monkeypatch.setattr(server_app_module, "DB_PATH", tmp_db, raising=False)
init_db()
yield
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def test_metrics_endpoint_reports_uptime_and_total(client):
# Ensure a simple GET to /metrics succeeds and returns recent uptime
resp = client.get("/metrics")
assert resp.status_code == 200
# If prometheus_client isn't installed, metrics returns JSON
if resp.content_type.startswith("application/json"):
body = resp.get_json()
assert "uptime_seconds" in body
assert "total_submissions" in body
else:
# If prometheus_client is present, the response is the Prometheus text format
text = resp.get_data(as_text=True)
assert "# HELP" in text or "http_request_duration_seconds" in text
def test_request_metrics_increment_on_request(client):
# Make sure we have a baseline
before = client.get("/metrics").get_data(as_text=True)
# Trigger a contact submission attempt (invalid payload will still count the request)
client.post("/api/contact", data={})
# Wait a tiny bit for histogram observation
time.sleep(0.01)
after = client.get("/metrics").get_data(as_text=True)
# If prometheus_client isn't present, the JSON will include total_submissions
if client.get("/metrics").content_type.startswith("application/json"):
before_json = client.get("/metrics").get_json()
after_json = client.get("/metrics").get_json()
assert after_json["uptime_seconds"] >= before_json["uptime_seconds"]
else:
# Ensure some metrics text exists and that it changed (best-effort)
assert after != before

View File

@@ -0,0 +1,118 @@
import sqlite3
import importlib
import pytest
server_app_module = importlib.import_module("server.app")
# Expose app and init_db from the imported module
app = server_app_module.app
init_db = server_app_module.init_db
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def test_newsletter_subscription_creates_record(client):
resp = client.post("/api/newsletter", json={"email": "test@example.com"})
assert resp.status_code == 201
body = resp.get_json()
assert body["status"] == "ok"
# Note: The API doesn't return an ID in the response
def test_newsletter_duplicate_subscription_returns_conflict(client):
# First subscription
client.post("/api/newsletter", json={"email": "test@example.com"})
# Duplicate subscription
resp = client.post("/api/newsletter", json={"email": "test@example.com"})
assert resp.status_code == 409
body = resp.get_json()
assert body["status"] == "error"
assert "already subscribed" in body["message"].lower()
def test_newsletter_unsubscribe(client):
# Subscribe first
client.post("/api/newsletter", json={"email": "test@example.com"})
# Unsubscribe
resp = client.delete("/api/newsletter", json={"email": "test@example.com"})
assert resp.status_code == 200
body = resp.get_json()
assert body["status"] == "ok"
assert "unsubscribed" in body["message"].lower()
def test_newsletter_unsubscribe_not_subscribed(client):
resp = client.delete("/api/newsletter", json={"email": "test@example.com"})
assert resp.status_code == 404
body = resp.get_json()
assert body["status"] == "error"
assert "not subscribed" in body["message"].lower()
def test_newsletter_update_email(client):
# Subscribe first
client.post("/api/newsletter", json={"email": "old@example.com"})
# Update email
resp = client.put(
"/api/newsletter", json={"old_email": "old@example.com", "new_email": "new@example.com"})
assert resp.status_code == 200
body = resp.get_json()
assert body["status"] == "ok"
assert "updated" in body["message"].lower()
def test_newsletter_update_email_not_found(client):
resp = client.put(
"/api/newsletter", json={"old_email": "nonexistent@example.com", "new_email": "new@example.com"})
assert resp.status_code == 404
body = resp.get_json()
assert body["status"] == "error"
assert "not found" in body["message"].lower()
def test_newsletter_manage_page_get(client):
resp = client.get("/api/newsletter/manage")
assert resp.status_code == 200
assert b"Newsletter Subscription Management" in resp.data
def test_newsletter_manage_subscribe(client):
resp = client.post("/api/newsletter/manage",
data={"email": "manage@example.com", "action": "subscribe"})
assert resp.status_code == 200
assert b"Successfully subscribed" in resp.data
def test_newsletter_manage_unsubscribe(client):
# Subscribe first
client.post("/api/newsletter/manage",
data={"email": "manage@example.com", "action": "subscribe"})
# Unsubscribe
resp = client.post("/api/newsletter/manage",
data={"email": "manage@example.com", "action": "unsubscribe"})
assert resp.status_code == 200
assert b"Successfully unsubscribed" in resp.data
def test_newsletter_manage_update(client):
# Subscribe first
client.post("/api/newsletter/manage",
data={"email": "old@example.com", "action": "subscribe"})
# Update
resp = client.post("/api/newsletter/manage", data={
"old_email": "old@example.com", "new_email": "updated@example.com", "action": "update"})
assert resp.status_code == 200
# Check that some success message is displayed
assert b"success" in resp.data.lower() or b"updated" in resp.data.lower()
def test_newsletter_manage_invalid_email(client):
resp = client.post("/api/newsletter/manage",
data={"email": "invalid-email", "action": "subscribe"})
assert resp.status_code == 200
assert b"Please enter a valid email address" in resp.data