Compare commits

..

15 Commits

Author SHA1 Message Date
f8a5b1b5ef fix: update scrape function to handle HTML response and improve status messages
Some checks failed
CI/CD Pipeline / test (push) Successful in 1m34s
CI/CD Pipeline / build-image (push) Failing after 1m53s
2025-11-30 10:51:16 +01:00
02e3e77f78 fix: update fetch logic to skip jobs fetched within the last 24 hours and adjust retry attempts in scraper
Some checks failed
CI/CD Pipeline / test (push) Failing after 20s
CI/CD Pipeline / build-image (push) Has been skipped
2025-11-28 20:54:39 +01:00
e0bc295936 feat: Enhance CI/CD pipeline with Docker image build and push steps
Some checks failed
CI/CD Pipeline / test (push) Successful in 21s
CI/CD Pipeline / build-image (push) Failing after 1m9s
2025-11-28 19:16:28 +01:00
2185a07ff0 feat: Implement email sending utilities and templates for job notifications
Some checks failed
CI/CD Pipeline / test (push) Failing after 4m9s
- Added email_service.py for sending emails with SMTP configuration.
- Introduced email_templates.py to render job alert email subjects and bodies.
- Enhanced scraper.py to extract contact information from job listings.
- Updated settings.js to handle negative keyword input validation.
- Created email.html and email_templates.html for managing email subscriptions and templates in the admin interface.
- Modified base.html to include links for email alerts and templates.
- Expanded user settings.html to allow management of negative keywords.
- Updated utils.py to include functions for retrieving negative keywords and email settings.
- Enhanced job filtering logic to exclude jobs containing negative keywords.
2025-11-28 18:15:08 +01:00
8afb208985 fix: update .gitignore to include GitHub Copilot files and add TODO.md
Some checks failed
CI/CD Pipeline / test (push) Failing after 3m35s
2025-11-03 19:04:34 +01:00
d9a224fc36 fix: remove redundant checkout step from CI/CD pipeline
Some checks failed
CI/CD Pipeline / test (push) Failing after 32s
2025-11-01 19:47:41 +01:00
1678e1366e fix: remove unnecessary build and push jobs from CI/CD pipeline
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-11-01 19:47:02 +01:00
fee955f01d fix: update Dockerfile and documentation for APT_CACHER_NG configuration
Some checks failed
CI/CD Pipeline / test (push) Successful in 18s
CI/CD Pipeline / build (push) Successful in 1m3s
CI/CD Pipeline / push (push) Failing after 35s
2025-11-01 19:41:58 +01:00
a51d500777 feat: enhance CI/CD pipeline with build and push steps for Docker images
Some checks failed
CI/CD Pipeline / test (push) Successful in 18s
CI/CD Pipeline / build (push) Successful in 16s
CI/CD Pipeline / push (push) Failing after 5s
2025-11-01 19:04:05 +01:00
2238a286d4 fix: cexclude linting and deployment for now
All checks were successful
CI/CD Pipeline / test (push) Successful in 3m57s
2025-11-01 18:35:07 +01:00
92b6efb550 fix: adjust exponential backoff timing in scrape_jobs_with_retry
Some checks failed
CI/CD Pipeline / test (push) Failing after 42s
CI/CD Pipeline / deploy (push) Has been skipped
2025-11-01 18:31:35 +01:00
f48f5dc036 fix: missing variable in job_details() 2025-11-01 18:24:37 +01:00
053a9988a8 feat: add CI pipeline 2025-11-01 18:07:57 +01:00
504dc8e2b0 implement automated job scraping scheduler with retry logic and logging 2025-11-01 18:00:59 +01:00
8e3a6f4f41 add logs table definition to database schema 2025-11-01 16:10:42 +01:00
31 changed files with 2901 additions and 76 deletions

98
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,98 @@
name: CI/CD Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests
run: |
python -m pytest tests/ -v
# - name: Run linting
# run: |
# python -m flake8 web/ tests/ --max-line-length=120
build-image:
runs-on: ubuntu-latest
needs: test
env:
DEFAULT_BRANCH: main
REGISTRY_URL: ${{ secrets.REGISTRY_URL }}
REGISTRY_USERNAME: ${{ secrets.REGISTRY_USERNAME }}
REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }}
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Collect workflow metadata
id: meta
shell: bash
run: |
ref_name="${GITHUB_REF_NAME:-${GITHUB_REF##*/}}"
event_name="${GITHUB_EVENT_NAME:-}"
sha="${GITHUB_SHA:-}"
if [ "$ref_name" = "${DEFAULT_BRANCH:-main}" ]; then
echo "on_default=true" >> "$GITHUB_OUTPUT"
else
echo "on_default=false" >> "$GITHUB_OUTPUT"
fi
echo "ref_name=$ref_name" >> "$GITHUB_OUTPUT"
echo "event_name=$event_name" >> "$GITHUB_OUTPUT"
echo "sha=$sha" >> "$GITHUB_OUTPUT"
- name: Set up QEMU and Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to registry (best-effort)
if: ${{ steps.meta.outputs.on_default == 'true' }}
uses: docker/login-action@v3
continue-on-error: true
with:
registry: ${{ env.REGISTRY_URL }}
username: ${{ env.REGISTRY_USERNAME }}
password: ${{ env.REGISTRY_PASSWORD }}
- name: Build (and optionally push) image
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile
push: ${{ steps.meta.outputs.on_default == 'true' && steps.meta.outputs.event_name != 'pull_request' && (env.REGISTRY_URL != '' && env.REGISTRY_USERNAME != '' && env.REGISTRY_PASSWORD != '') }}
tags: |
${{ env.REGISTRY_URL }}/allucanget/jobs:latest
${{ env.REGISTRY_URL }}/allucanget/jobs:${{ steps.meta.outputs.sha }}
# deploy:
# runs-on: ubuntu-latest
# needs: test
# if: github.ref == 'refs/heads/main'
# steps:
# - name: Checkout code
# uses: actions/checkout@v4
# - name: Deploy to production
# run: |
# echo "Deploying to production..."
# docker-compose up -d

4
.gitignore vendored
View File

@@ -1,4 +1,3 @@
.github/copilot*
cache/ cache/
logs/ logs/
@@ -165,3 +164,6 @@ cython_debug/
#.idea/ #.idea/
docs/online.md docs/online.md
.github/copilot*
.github/TODO.md
.vscode/launch.json

View File

@@ -5,14 +5,9 @@ FROM python:3.11-slim-bookworm
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONDONTWRITEBYTECODE=1
ENV FLASK_ENV=production ENV FLASK_ENV=production
ENV FLASK_SECRET=production-secret-change-me
# Find location of apt sources list and change to a faster mirror # Add apt-cacher-ng configuration (if APT_CACHER_NG is set)
RUN [ -f /etc/apt/sources.list ] && \ RUN if [ -n "$APT_CACHER_NG" ]; then echo 'Acquire::http { Proxy "'"$APT_CACHER_NG"'/"; };' > /etc/apt/apt.conf.d/01proxy; fi
echo "/etc/apt/sources.list exists, proceeding to modify it." \
&& sed -i 's|http://deb.debian.org/debian|https://mirror.init7.net/debian|g' /etc/apt/sources.list \
|| \
(echo "/etc/apt/sources.list does not exist, exiting.")
# Install system dependencies # Install system dependencies
RUN apt-get update && apt-get install -y \ RUN apt-get update && apt-get install -y \

View File

@@ -58,7 +58,8 @@ This application is a Craigslist job scraper with a Flask web interface.
### Environment Variables ### Environment Variables
- `FLASK_ENV`: Set to `production` - `FLASK_ENV`: Set to `production`
- `FLASK_SECRET`: Secret key for Flask sessions - `FLASK_SECRET`: Secret key for Flask sessions (required)
- `APT_CACHER_NG`: Optional URL for apt-cacher-ng proxy to speed up package downloads (e.g., `http://192.168.88.14:3142`)
### Database Configuration ### Database Configuration
@@ -309,6 +310,7 @@ services:
# Optional configuration # Optional configuration
- GUNICORN_WORKERS=${GUNICORN_WORKERS:-4} - GUNICORN_WORKERS=${GUNICORN_WORKERS:-4}
- APT_CACHER_NG=${APT_CACHER_NG}
volumes: volumes:
- type: bind - type: bind
source: ./cache source: ./cache

231
README.md
View File

@@ -9,11 +9,32 @@ job scraper
- Users can search for job listings by keywords and region - Users can search for job listings by keywords and region
- Selection of job listings based on user preferences - Selection of job listings based on user preferences
## Requirements ## Architecture Overview
- Database (MySQL/MariaDB) The application is built as a modular Flaskbased service with clear separation of concerns:
- Python 3.x
- Required Python packages (see requirements.txt) | Layer | Module | Responsibility |
| ----------------------------- | ---------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| **Web UI** | `web/app.py` | Flask application that serves HTML pages, REST endpoints, and admin interfaces (users, taxonomy, health, email management). |
| **Orchestrator** | `web/craigslist.py` | Coordinates the scraping workflow: schedules runs, fetches listings, updates the DB, and triggers email alerts. |
| **Scraper** | `web/scraper.py` | Contains the lowlevel HTML parsing logic (`scrape_job_data`, `scrape_job_page`, `extract_contact_info`). |
| **Persistence** | `web/db.py` | SQLAlchemy ORM models (`User`, `JobListing`, `JobDescription`, `UserInteraction`, `Region`, `Keyword`, `EmailSubscription`, **`EmailTemplate`**) and helper functions for upserts, queries, and subscription management. |
| **Email Rendering** | `web/email_templates.py` | Renders jobalert emails using a pluggable template system. Supports default placeholders (`{count_label}`, `{scope}`, `{timestamp}`, `{jobs_section}`, `{jobs_message}`) and custom admindefined templates. |
| **Email Delivery** | `web/email_service.py` | Sends rendered messages via SMTP, handling TLS/SSL, authentication, and graceful disabling. |
| **Configuration** | `config/settings.json` | Centralised JSON config for database, HTTP, scraper options, negative keywords, and email settings. |
| **Static Assets & Templates** | `web/static/`, `web/templates/` | Frontend resources (JS, CSS) and Jinja2 templates for the public UI and admin pages (including the new **Email Templates** management UI). |
| **Scheduler** | `schedule` (used in `web/craigslist.py`) | Runs the scraper automatically at configurable intervals (default hourly). |
| **Testing** | `tests/` | Pytest suite covering scheduler, scraper, DB helpers, email service, and the new admin UI for email subscriptions and templates. |
**Key architectural notes**
- **Email Subscriptions** are stored in the `email_subscriptions` table and managed via `/admin/emails`.
- **Email Templates** are persisted in the new `email_templates` table, editable through `/admin/email-templates`, and used by the alert system.
- The orchestrator (`fetch_listings`) returns a detailed result dict (`discovered`, `new`, `by_search`) that drives UI metrics and health checks.
- Contact information (`reply_url`, `contact_email`, `contact_phone`, `contact_name`) extracted by the scraper is saved in `job_descriptions`.
- Negative keyword filtering is applied early in the pipeline to prevent unwanted listings from reaching the DB or email alerts.
This layered design makes it straightforward to extend the scraper to new sources, swap out the email backend, or add additional admin features without impacting other components.
## Installation ## Installation
@@ -22,3 +43,205 @@ job scraper
3. Install dependencies 3. Install dependencies
4. Set up environment variables 4. Set up environment variables
5. Run the application 5. Run the application
## Scheduler Configuration
The application includes an automated scheduler that runs the job scraping process every hour. The scheduler is implemented in `web/craigslist.py` and includes:
- **Automatic Scheduling**: Scraping runs every hour automatically
- **Failure Handling**: Retry logic with exponential backoff (up to 3 attempts)
- **Background Operation**: Runs in a separate daemon thread
- **Graceful Error Recovery**: Continues running even if individual scraping attempts fail
### Scheduler Features
- **Retry Mechanism**: Automatically retries failed scraping attempts
- **Logging**: Comprehensive logging of scheduler operations and failures
- **Testing**: Comprehensive test suite in `tests/test_scheduler.py`
To modify the scheduling interval, edit the `start_scheduler()` function in `web/craigslist.py`.
## Job Scraping Output
The `fetch_listings()` function in `web/craigslist.py` extends its output to provide detailed metrics about each scraping operation. It returns a dictionary containing:
- **discovered**: Total number of unique job URLs discovered across all region/keyword combinations
- **new**: Total number of newly added jobs (jobs not previously in the database)
- **by_search**: List of dictionaries, each containing:
- **region**: The region name for this search
- **keyword**: The keyword used for this search
- **count**: Number of jobs fetched for this specific region/keyword combination
### Example Output
```python
{
"discovered": 150,
"new": 42,
"by_search": [
{"region": "sfbay", "keyword": "python", "count": 25},
{"region": "sfbay", "keyword": "java", "count": 18},
{"region": "losangeles", "keyword": "python", "count": 45},
{"region": "losangeles", "keyword": "java", "count": 62}
]
}
```
This per-search breakdown allows for better monitoring and debugging of the scraping process, enabling identification of searches that may be failing or returning fewer results than expected.
## Contact Information Extraction
The scraper now automatically extracts contact information from job listing pages:
### Extracted Fields
When scraping individual job listings, the following contact information is extracted and stored:
- **contact_email**: Email address extracted from reply button or contact form links
- **contact_phone**: Phone number extracted from tel links or contact parameters
- **contact_name**: Contact person or department name if available
- **reply_url**: The full reply/contact URL from the job listing
### How Contact Information is Extracted
The `extract_contact_info()` function intelligently parses various types of reply URLs:
1. **Mailto Links**: `mailto:jobs@company.com?subject=...`
- Extracts the email address directly
2. **Phone Links**: `tel:+1234567890`
- Extracts the phone number
3. **URL Parameters**: `https://apply.company.com?email=hr@company.com&phone=555-1234&name=HR%20Team`
- Searches for common parameter names: `email`, `phone`, `contact_name`, etc.
4. **Graceful Fallback**: If contact information cannot be extracted, the fields are set to `"N/A"`
### Database Storage
Contact information is stored in the `job_descriptions` table with the following columns:
- `reply_url` (VARCHAR(512)): The complete reply/contact URL
- `contact_email` (VARCHAR(255)): Extracted email address
- `contact_phone` (VARCHAR(255)): Extracted phone number
- `contact_name` (VARCHAR(255)): Extracted contact person/department name
### Example
For a job listing with reply button `mailto:hiring@acme.com?subject=Job%20Application`:
```python
{
"reply_url": "mailto:hiring@acme.com?subject=Job%20Application",
"contact_email": "hiring@acme.com",
"contact_phone": "N/A",
"contact_name": "N/A"
}
```
This contact information is automatically extracted during job page scraping and persisted to the database for easy access and filtering.
## Negative Keyword Filtering
The scraper inspects each jobs title, company, location, and description for configurable “negative” keywords. When a keyword matches, the scraped result indicates the match so downstream workflows can skip or flag the job.
### Email Configuration
Define keywords in `config/settings.json` under `scraper.negative_keywords`. Keywords are matched case-insensitively and should be supplied without surrounding whitespace:
```json
{
"scraper": {
"negative_keywords": ["scam", "mlm", "unpaid"]
}
}
```
### Scrape Output
Each `scrape_job_page` result contains three new fields:
- `is_negative_match`: `True` when any keyword matches
- `negative_keyword_match`: the keyword that triggered the match
- `negative_match_field`: which field (title, company, location, description) contained the keyword
### Processing Behavior
- `process_job_url` stops when `is_negative_match` is `True`, yielding a log message and calling `remove_job` so stale results never remain in `job_listings`.
- `upsert_job_details` now returns immediately for negative matches, ensuring `job_descriptions` never stores filtered listings.
- Regression coverage lives in `tests/test_scraper.py::TestScraperPipelineNegativeFiltering` and `tests/test_db_negative_filtering.py::test_upsert_job_details_skips_negative_match`.
Together, these checks mean negative matches are dropped before any persistence and never shown in the UI.
### User-Specific Negative Keywords
In addition to the global negative keywords defined in `settings.json`, users can define their own personal negative keywords via the **Preferences** page (`/settings`).
- **Management**: Users can add new negative keywords and remove existing ones.
- **Filtering**: Jobs matching any of the user's negative keywords are filtered out from the job listings view (`/` and `/jobs`).
- **Validation**: The UI prevents adding duplicate keywords.
- **Storage**: User-specific negative keywords are stored in the database (`negative_keywords` and `user_negative_keywords` tables).
## Email Notifications
Optional job-alert emails are generated whenever the scraper discovers new listings.
### Configuration
Edit `config/settings.json` under the `email` section:
```json
{
"email": {
"enabled": true,
"from_address": "jobs@example.com",
"recipients": ["alerts@example.com"],
"smtp": {
"host": "smtp.example.com",
"port": 587,
"username": "smtp-user",
"password": "secret",
"use_tls": true,
"use_ssl": false,
"timeout": 30
}
}
}
```
- Leave `enabled` set to `false` for local development or when credentials are unavailable.
- Provide at least one recipient; otherwise alerts are skipped with a log message.
- Omit real credentials from source control—inject them via environment variables or a secrets manager in production.
### How Alerts Are Sent
- After `fetch_listings()` completes, the scraper gathers new listings and, when configured, renders a plaintext digest via `web.email_templates.render_job_alert_email`.
- Delivery is handled by `web.email_service.send_email`, which supports TLS/SSL SMTP connections and gracefully skips when disabled.
- Success or failure is streamed in the scraper log output (`Job alert email sent.` or the reason for skipping).
### Managing Recipients
- Admin users can visit `/admin/emails` to add or deactivate subscription addresses through the web UI.
- Deactivated rows remain in the table so they can be reactivated later; the scraper only mails active recipients.
- The navigation bar exposes an **Email Alerts** link to the management screen after logging in as an admin user.
### Customising Templates
- Use the **Email Templates** admin page (`/admin/email-templates`) to create, edit, preview, or delete alert templates.
- Templates support placeholder tokens such as `{count_label}`, `{scope}`, `{timestamp}`, `{jobs_section}`, and `{jobs_message}`; the UI lists all available tokens.
- Preview renders the selected template with sample data so changes can be reviewed before saving.
### Tests
- `tests/test_email_templates.py` verifies the rendered subject/body for both populated and empty alerts.
- `tests/test_email_service.py` covers SMTP configuration, disabled mode, and login/send flows using fakes.
- `tests/test_admin_email.py` exercises the admin UI for listing, subscribing, and unsubscribing recipients.
- `tests/test_admin_email_templates.py` verifies CRUD operations and previews for template management.
- `tests/test_scraper.py::TestScraperEmailNotifications` ensures the scraping pipeline invokes the alert sender when new jobs are found.
## Docker Deployment
Please see [README-Docker.md](README-Docker.md) for instructions on deploying the application using Docker.

View File

@@ -9,7 +9,7 @@
} }
}, },
"http": { "http": {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:141.0) Gecko/20100101 Firefox/141.0", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0",
"request_timeout": 30, "request_timeout": 30,
"max_retries": 3, "max_retries": 3,
"backoff_factor": 2, "backoff_factor": 2,
@@ -22,7 +22,22 @@
}, },
"scraper": { "scraper": {
"base_url": "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel", "base_url": "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel",
"config_dir": "config" "config_dir": "config",
"negative_keywords": []
},
"email": {
"enabled": false,
"from_address": "jobs@example.com",
"recipients": [],
"smtp": {
"host": "smtp.example.com",
"port": 587,
"username": "",
"password": "",
"use_tls": true,
"use_ssl": false,
"timeout": 30
}
}, },
"users": [ "users": [
{ "username": "anonymous", "is_admin": false, "password": "" }, { "username": "anonymous", "is_admin": false, "password": "" },

View File

@@ -18,6 +18,7 @@ services:
# Optional configuration # Optional configuration
- GUNICORN_WORKERS=${GUNICORN_WORKERS:-4} - GUNICORN_WORKERS=${GUNICORN_WORKERS:-4}
- APT_CACHER_NG=${APT_CACHER_NG}
volumes: volumes:
- type: bind - type: bind
source: ./cache source: ./cache

12
main.py
View File

@@ -4,8 +4,20 @@ starts webserver
""" """
import web.app as app import web.app as app
import threading
from web.craigslist import start_scheduler
def start_background_scheduler():
"""Start the scheduler in a background thread."""
scheduler_thread = threading.Thread(target=start_scheduler, daemon=True)
scheduler_thread.start()
print("Background scheduler started")
if __name__ == "__main__": if __name__ == "__main__":
# Start scheduler in background thread
start_background_scheduler()
# start web server # start web server
app.main() app.main()

View File

@@ -3,6 +3,7 @@ flask
flask-wtf flask-wtf
pytest pytest
requests requests
schedule
sqlalchemy sqlalchemy
pymysql pymysql
gunicorn gunicorn

84
tests/test_admin_email.py Normal file
View File

@@ -0,0 +1,84 @@
import pytest
from sqlalchemy import text
from web.app import app
from web.db import (
db_init,
create_or_update_user,
subscribe_email,
list_email_subscriptions,
_ensure_session,
)
@pytest.fixture(scope="function", autouse=True)
def initialize_app():
app.config.update(TESTING=True, WTF_CSRF_ENABLED=False)
with app.app_context():
db_init()
create_or_update_user("admin", password="secret",
is_admin=True, is_active=True)
# Clear subscriptions before and after each test to avoid leakage
with _ensure_session() as session:
session.execute(text("DELETE FROM email_subscriptions"))
session.commit()
yield
with _ensure_session() as session:
session.execute(text("DELETE FROM email_subscriptions"))
session.commit()
@pytest.fixture
def client():
with app.test_client() as test_client:
with test_client.session_transaction() as sess:
sess["username"] = "admin"
yield test_client
@pytest.fixture
def anon_client():
with app.test_client() as test_client:
# Ensure no admin session present
with test_client.session_transaction() as sess:
sess.pop("username", None)
yield test_client
def test_admin_emails_requires_admin(anon_client):
response = anon_client.get("/admin/emails")
assert response.status_code == 302
assert "/login" in response.headers.get("Location", "")
def test_admin_emails_lists_subscriptions(client):
subscribe_email("alice@example.com")
response = client.get("/admin/emails")
assert response.status_code == 200
assert b"alice@example.com" in response.data
def test_admin_emails_can_subscribe(client):
response = client.post(
"/admin/emails",
data={"action": "subscribe", "email": "bob@example.com"},
follow_redirects=False,
)
assert response.status_code == 302
emails = list_email_subscriptions()
assert any(sub["email"] == "bob@example.com" and sub["is_active"]
for sub in emails)
def test_admin_emails_can_unsubscribe(client):
subscribe_email("carol@example.com")
response = client.post(
"/admin/emails",
data={"action": "unsubscribe", "email": "carol@example.com"},
follow_redirects=False,
)
assert response.status_code == 302
emails = list_email_subscriptions()
matching = [sub for sub in emails if sub["email"] == "carol@example.com"]
assert matching
assert matching[0]["is_active"] is False

View File

@@ -0,0 +1,138 @@
import pytest
from sqlalchemy import text
from web.app import app
from web.db import (
db_init,
create_or_update_user,
list_email_templates,
update_email_template,
_ensure_session,
ensure_default_email_template,
)
from web.email_templates import render_job_alert_email
@pytest.fixture(scope="function", autouse=True)
def setup_database():
app.config.update(TESTING=True, WTF_CSRF_ENABLED=False)
with app.app_context():
db_init()
create_or_update_user("admin", password="secret", is_admin=True, is_active=True)
with _ensure_session() as session:
session.execute(text("DELETE FROM email_templates"))
session.commit()
ensure_default_email_template()
yield
with _ensure_session() as session:
session.execute(text("DELETE FROM email_templates"))
session.commit()
ensure_default_email_template()
@pytest.fixture
def client():
with app.test_client() as test_client:
with test_client.session_transaction() as sess:
sess["username"] = "admin"
yield test_client
@pytest.fixture
def anon_client():
with app.test_client() as test_client:
with test_client.session_transaction() as sess:
sess.pop("username", None)
yield test_client
def test_email_templates_requires_admin(anon_client):
response = anon_client.get("/admin/email-templates")
assert response.status_code == 302
assert "/login" in response.headers.get("Location", "")
def test_email_templates_lists_default(client):
response = client.get("/admin/email-templates")
assert response.status_code == 200
assert b"job-alert" in response.data
def test_email_templates_create_update_delete(client):
# Create
response = client.post(
"/admin/email-templates",
data={
"action": "create",
"name": "Daily Summary",
"slug": "daily-summary",
"subject": "Summary: {count_label}",
"body": "Jobs:{jobs_section}",
"is_active": "on",
},
follow_redirects=False,
)
assert response.status_code == 302
templates = list_email_templates()
assert any(t["slug"] == "daily-summary" for t in templates)
# Update
template_row = next(t for t in templates if t["slug"] == "daily-summary")
response = client.post(
"/admin/email-templates",
data={
"action": "update",
"template_id": template_row["template_id"],
"name": "Daily Summary",
"slug": "daily-summary",
"subject": "Updated: {count_label}",
"body": "Updated body {jobs_section}",
},
follow_redirects=False,
)
assert response.status_code == 302
updated = list_email_templates()
updated_row = next(t for t in updated if t["slug"] == "daily-summary")
assert "Updated:" in updated_row["subject"]
# Delete
response = client.post(
"/admin/email-templates",
data={
"action": "delete",
"template_id": updated_row["template_id"],
},
follow_redirects=False,
)
assert response.status_code == 302
slugs = [t["slug"] for t in list_email_templates()]
assert "daily-summary" not in slugs
def test_email_templates_preview(client):
templates = list_email_templates()
job_alert = next(t for t in templates if t["slug"] == "job-alert")
response = client.get(f"/admin/email-templates?preview_id={job_alert['template_id']}")
assert response.status_code == 200
assert b"Preview" in response.data
assert b"Subject" in response.data
def test_render_job_alert_email_uses_template_override(client):
templates = list_email_templates()
job_alert = next(t for t in templates if t["slug"] == "job-alert")
update_email_template(
job_alert["template_id"],
subject="Custom Subject {count}",
body="Body {jobs_message}",
)
rendered = render_job_alert_email([
{
"title": "Python Developer",
"company": "Acme",
"location": "Remote",
"url": "https://example.com",
}
])
assert rendered["subject"].startswith("Custom Subject")
assert "Python Developer" in rendered["body"]

View File

@@ -0,0 +1,21 @@
import pytest
import web.db as db
def test_upsert_job_details_skips_negative_match(monkeypatch):
def fail(*args, **kwargs): # pragma: no cover - guard against unwanted calls
raise AssertionError("should not reach database layers when negative")
monkeypatch.setattr(db, "_ensure_session", fail)
monkeypatch.setattr(db, "insert_log", fail)
job_data = {
"url": "https://example.com/job/neg",
"id": "neg123",
"is_negative_match": True,
"negative_keyword_match": "scam",
"negative_match_field": "title",
}
# Should return early without touching the database helpers.
db.upsert_job_details(job_data)

106
tests/test_email_service.py Normal file
View File

@@ -0,0 +1,106 @@
import pytest
from web.email_service import (
EmailConfigurationError,
send_email,
)
def test_send_email_disabled(monkeypatch):
called = {}
def _fake_smtp(*args, **kwargs): # pragma: no cover - should not be called
called["used"] = True
raise AssertionError(
"SMTP should not be invoked when email is disabled")
monkeypatch.setattr("web.email_service.smtplib.SMTP", _fake_smtp)
monkeypatch.setattr("web.email_service.smtplib.SMTP_SSL", _fake_smtp)
result = send_email(
subject="Hi",
body="Test",
to="user@example.com",
settings={"enabled": False},
)
assert result is False
assert called == {}
def test_send_email_sends_message(monkeypatch):
events = {"starttls": False, "login": None, "sent": None}
class FakeSMTP:
def __init__(self, *, host, port, timeout):
self.host = host
self.port = port
self.timeout = timeout
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def ehlo(self):
events.setdefault("ehlo", 0)
events["ehlo"] += 1
def starttls(self):
events["starttls"] = True
def login(self, username, password):
events["login"] = (username, password)
def send_message(self, message, *, from_addr, to_addrs):
events["sent"] = {
"from": from_addr,
"to": tuple(to_addrs),
"subject": message["Subject"],
}
monkeypatch.setattr("web.email_service.smtplib.SMTP", FakeSMTP)
monkeypatch.setattr("web.email_service.smtplib.SMTP_SSL", FakeSMTP)
settings = {
"enabled": True,
"from_address": "jobs@example.com",
"smtp": {
"host": "smtp.example.com",
"port": 2525,
"timeout": 15,
"username": "jobs",
"password": "secret",
"use_tls": True,
"use_ssl": False,
},
}
result = send_email(
subject="New Jobs",
body="You have new jobs waiting.",
to=["a@example.com", "b@example.com"],
cc="c@example.com",
bcc=["d@example.com"],
settings=settings,
)
assert result is True
assert events["starttls"] is True
assert events["login"] == ("jobs", "secret")
assert events["sent"] == {
"from": "jobs@example.com",
"to": ("a@example.com", "b@example.com", "c@example.com", "d@example.com"),
"subject": "New Jobs",
}
def test_send_email_requires_host():
settings = {
"enabled": True,
"from_address": "jobs@example.com",
"smtp": {"host": "", "port": 587},
}
with pytest.raises(EmailConfigurationError):
send_email(subject="Hi", body="Test",
to="user@example.com", settings=settings)

View File

@@ -0,0 +1,40 @@
from datetime import datetime
from web.email_templates import render_job_alert_email
def test_render_job_alert_email_with_jobs():
jobs = [
{
"title": "Python Developer",
"company": "Acme",
"location": "Remote",
"url": "https://example.com/jobs/1",
},
{
"title": "Data Engineer",
"company": "Globex",
"location": "NYC",
"url": "https://example.com/jobs/2",
},
]
ts = datetime(2025, 11, 3, 12, 0)
rendered = render_job_alert_email(
jobs, region="sfbay", keyword="python", generated_at=ts)
assert rendered["subject"] == "2 new jobs (region: sfbay, keyword: python)"
assert "1. Python Developer" in rendered["body"]
assert "Generated at 2025-11-03 12:00 UTC." in rendered["body"]
assert rendered["context"]["count"] == 2
assert rendered["context"]["jobs_section"].startswith(
"\n1. Python Developer")
def test_render_job_alert_email_empty():
ts = datetime(2025, 11, 3, 12, 0)
rendered = render_job_alert_email([], generated_at=ts)
assert rendered["subject"] == "No new jobs"
assert "No jobs matched this alert." in rendered["body"]
assert rendered["body"].count("Generated at") == 1
assert rendered["context"]["count"] == 0

137
tests/test_scheduler.py Normal file
View File

@@ -0,0 +1,137 @@
import pytest
import time
from unittest.mock import patch, MagicMock
from web.craigslist import scrape_jobs_with_retry, run_scheduled_scraping, fetch_listings
class TestScheduler:
def test_scrape_jobs_with_retry_success(self):
"""Test that scrape_jobs_with_retry succeeds on first attempt."""
with patch('web.craigslist.scraper') as mock_scrape:
result = scrape_jobs_with_retry()
assert result is True
mock_scrape.assert_called_once()
def test_scrape_jobs_with_retry_failure(self):
"""Test that scrape_jobs_with_retry handles failures properly."""
with patch('web.craigslist.scraper', side_effect=Exception("Test error")) as mock_scrape:
result = scrape_jobs_with_retry(max_retries=2)
assert result is False
assert mock_scrape.call_count == 2
def test_run_scheduled_scraping(self):
"""Test the scheduled scraping wrapper function."""
with patch('web.craigslist.scrape_jobs_with_retry') as mock_retry:
mock_retry.return_value = True
run_scheduled_scraping()
mock_retry.assert_called_once()
def test_scheduler_import(self):
"""Test that scheduler functions can be imported."""
from web.craigslist import start_scheduler
assert callable(start_scheduler)
@patch('web.craigslist.schedule')
def test_scheduler_setup(self, mock_schedule):
"""Test that scheduler setup works correctly."""
# This is a basic test to ensure the scheduler can be set up
from web.craigslist import schedule
assert schedule is not None
@patch('web.craigslist.db_get_all_job_urls')
@patch('web.craigslist.seed_regions_keywords_from_listings')
@patch('web.craigslist.get_all_regions')
@patch('web.craigslist.get_all_keywords')
@patch('web.craigslist.get_last_fetch_time')
@patch('web.craigslist.process_region_keyword')
@patch('web.craigslist.upsert_listing')
@patch('web.craigslist.insert_log')
def test_fetch_listings_return_structure(self, mock_log, mock_upsert, mock_process, mock_last_fetch,
mock_keywords, mock_regions, mock_seed, mock_db_urls):
"""Test that fetch_listings returns the correct structure with per-search counts."""
# Setup mocks
mock_db_urls.return_value = []
mock_regions.return_value = [{"name": "sfbay"}]
mock_keywords.return_value = [{"name": "python"}]
mock_last_fetch.return_value = None # Never fetched before
mock_process.return_value = [
("2025-11-03T10:00:00Z", "sfbay", "python", "Python Dev",
"$100k", "San Francisco", "http://example.com/1"),
("2025-11-03T10:00:00Z", "sfbay", "python", "Python Dev",
"$100k", "San Francisco", "http://example.com/2"),
]
# Collect messages and get return value from generator
gen = fetch_listings()
messages = []
result = None
try:
while True:
messages.append(next(gen))
except StopIteration as e:
result = e.value
# Verify return structure
assert result is not None
assert "discovered" in result
assert "new" in result
assert "by_search" in result
assert isinstance(result.get("by_search"), list)
assert result.get("discovered") == 2
assert result.get("new") == 2
@patch('web.craigslist.db_get_all_job_urls')
@patch('web.craigslist.seed_regions_keywords_from_listings')
@patch('web.craigslist.get_all_regions')
@patch('web.craigslist.get_all_keywords')
@patch('web.craigslist.get_last_fetch_time')
@patch('web.craigslist.process_region_keyword')
@patch('web.craigslist.upsert_listing')
@patch('web.craigslist.insert_log')
def test_fetch_listings_per_search_count(self, mock_log, mock_upsert, mock_process, mock_last_fetch,
mock_keywords, mock_regions, mock_seed, mock_db_urls):
"""Test that fetch_listings correctly counts jobs per search."""
# Setup mocks
mock_db_urls.return_value = []
mock_regions.return_value = [{"name": "sfbay"}, {"name": "losangeles"}]
mock_keywords.return_value = [{"name": "python"}, {"name": "java"}]
mock_last_fetch.return_value = None # Never fetched before
# Mock process_region_keyword to return different counts for each search
def mock_process_impl(region, keyword, discovered_urls):
# Use unique URLs per search to get the total discovered count
base_url = f"http://example.com/{region}/{keyword}"
counts = {
("sfbay", "python"): 3,
("sfbay", "java"): 2,
("losangeles", "python"): 4,
("losangeles", "java"): 1,
}
count = counts.get((region, keyword), 0)
return [(f"2025-11-03T10:00:00Z", region, keyword, f"Job {i}", "$100k", region, f"{base_url}/{i}")
for i in range(count)]
mock_process.side_effect = mock_process_impl
# Collect result from generator
gen = fetch_listings()
messages = []
result = None
try:
while True:
messages.append(next(gen))
except StopIteration as e:
result = e.value
# Verify per-search counts
assert result is not None
by_search = result.get("by_search", [])
assert len(by_search) == 4
search_data = {(r.get("region"), r.get("keyword")) : r.get("count") for r in by_search}
assert search_data.get(("sfbay", "python")) == 3
assert search_data.get(("sfbay", "java")) == 2
assert search_data.get(("losangeles", "python")) == 4
assert search_data.get(("losangeles", "java")) == 1
assert result.get("discovered") == 10 # Total unique jobs

384
tests/test_scraper.py Normal file
View File

@@ -0,0 +1,384 @@
import pytest
from web.scraper import scrape_job_page, extract_contact_info
from web.craigslist import process_job_url, scraper
def _make_negative_job(url: str) -> dict:
return {
"url": url,
"title": "SCAM role",
"company": "Test Co",
"location": "Remote",
"description": "This is a scam offer",
"id": "job123",
"posted_time": "",
"reply_url": "N/A",
"contact_email": "N/A",
"contact_phone": "N/A",
"contact_name": "N/A",
"is_negative_match": True,
"negative_keyword_match": "scam",
"negative_match_field": "title",
}
class TestExtractContactInfo:
"""Test suite for contact information extraction."""
def test_extract_email_from_mailto_link(self):
"""Test extraction of email from mailto link."""
reply_url = "mailto:contact@example.com?subject=Job%20Inquiry"
contact_info = extract_contact_info(reply_url)
assert contact_info["email"] == "contact@example.com"
assert contact_info["phone"] == "N/A"
assert contact_info["contact_name"] == "N/A"
def test_extract_phone_from_tel_link(self):
"""Test extraction of phone from tel link."""
reply_url = "tel:+1234567890"
contact_info = extract_contact_info(reply_url)
assert contact_info["email"] == "N/A"
assert contact_info["phone"] == "+1234567890"
assert contact_info["contact_name"] == "N/A"
def test_extract_email_from_url_parameter(self):
"""Test extraction of email from URL query parameters."""
reply_url = "https://example.com/contact?email=jobs@company.com&name=John%20Doe"
contact_info = extract_contact_info(reply_url)
assert contact_info["email"] == "jobs@company.com"
assert contact_info["contact_name"] == "John Doe"
def test_extract_phone_from_url_parameter(self):
"""Test extraction of phone from URL query parameters."""
reply_url = "https://example.com/apply?phone=555-1234&email=contact@test.com"
contact_info = extract_contact_info(reply_url)
assert contact_info["phone"] == "555-1234"
assert contact_info["email"] == "contact@test.com"
def test_extract_contact_name_from_url_parameter(self):
"""Test extraction of contact name from URL query parameters."""
reply_url = "https://example.com/reply?name=Alice%20Smith&contact_name=Bob%20Jones"
contact_info = extract_contact_info(reply_url)
# Should prefer contact_name over name
assert contact_info["contact_name"] == "Bob Jones"
def test_extract_all_fields_from_url(self):
"""Test extraction of all fields from URL parameters."""
reply_url = "https://example.com/contact?email=hr@company.com&phone=555-9876&contact_name=Jane%20Doe"
contact_info = extract_contact_info(reply_url)
assert contact_info["email"] == "hr@company.com"
assert contact_info["phone"] == "555-9876"
assert contact_info["contact_name"] == "Jane Doe"
def test_handle_empty_reply_url(self):
"""Test handling of empty reply URL."""
contact_info = extract_contact_info("")
assert contact_info["email"] == "N/A"
assert contact_info["phone"] == "N/A"
assert contact_info["contact_name"] == "N/A"
def test_handle_na_reply_url(self):
"""Test handling of N/A reply URL."""
contact_info = extract_contact_info("N/A")
assert contact_info["email"] == "N/A"
assert contact_info["phone"] == "N/A"
assert contact_info["contact_name"] == "N/A"
def test_handle_none_reply_url(self):
"""Test handling of None reply URL."""
contact_info = extract_contact_info(None)
assert contact_info["email"] == "N/A"
assert contact_info["phone"] == "N/A"
assert contact_info["contact_name"] == "N/A"
def test_handle_invalid_url(self):
"""Test handling of invalid URL (graceful fallback)."""
reply_url = "not a valid url at all"
contact_info = extract_contact_info(reply_url)
# Should return all N/A values without crashing
assert contact_info["email"] == "N/A"
assert contact_info["phone"] == "N/A"
assert contact_info["contact_name"] == "N/A"
def test_multiple_parameter_variations(self):
"""Test that function finds email despite multiple parameter name variations."""
reply_url = "https://example.com/reply?from_email=sender@test.com&other=value"
contact_info = extract_contact_info(reply_url)
assert contact_info["email"] == "sender@test.com"
def test_telephone_parameter_name(self):
"""Test extraction using 'telephone' parameter name."""
reply_url = "https://example.com/contact?telephone=555-0000"
contact_info = extract_contact_info(reply_url)
assert contact_info["phone"] == "555-0000"
class TestScrapeJobPageContactInfo:
"""Test suite for scrape_job_page contact information extraction."""
def test_scrape_job_page_includes_contact_fields(self):
"""Test that scrape_job_page includes contact information in return dict."""
html_content = """
<html>
<h1 class="postingtitle">Software Engineer</h1>
<h2 class="company-name">Tech Company</h2>
<button class="reply-button" data-href="mailto:jobs@techco.com"></button>
<div id="map" data-latitude="37.7749" data-longitude="-122.4194" data-accuracy="rooftop"></div>
<section id="postingbody">
<p>This is a test job description</p>
</section>
<div class="postinginfos">
<p class="postinginfo">posting id: 12345abc</p>
<time class="date timeago" datetime="2025-11-03T10:00:00"></time>
</div>
</html>
"""
job_data = scrape_job_page(html_content, "https://example.com/job/123")
# Verify all expected keys are present
assert "contact_email" in job_data
assert "contact_phone" in job_data
assert "contact_name" in job_data
assert "reply_url" in job_data
def test_scrape_job_page_extracts_mailto_contact(self):
"""Test that scrape_job_page correctly extracts email from mailto link."""
html_content = """
<html>
<h1 class="postingtitle">Job Title</h1>
<h2 class="company-name">Company</h2>
<button class="reply-button" data-href="mailto:hiring@company.com?subject=Application"></button>
<div id="map"></div>
<section id="postingbody"><p>Job desc</p></section>
<div class="postinginfos">
<p class="postinginfo">id: xyz</p>
</div>
</html>
"""
job_data = scrape_job_page(html_content, "https://example.com/job/456")
assert job_data["contact_email"] == "hiring@company.com"
assert job_data["reply_url"] == "mailto:hiring@company.com?subject=Application"
def test_scrape_job_page_no_reply_button(self):
"""Test scrape_job_page when no reply button is present."""
html_content = """
<html>
<h1 class="postingtitle">Job Title</h1>
<h2 class="company-name">Company</h2>
<div id="map"></div>
<section id="postingbody"><p>Job desc</p></section>
<div class="postinginfos">
<p class="postinginfo">id: xyz</p>
</div>
</html>
"""
job_data = scrape_job_page(html_content, "https://example.com/job/789")
# Should have N/A for all contact fields
assert job_data["reply_url"] == "N/A"
assert job_data["contact_email"] == "N/A"
assert job_data["contact_phone"] == "N/A"
assert job_data["contact_name"] == "N/A"
def test_scrape_job_page_with_url_based_reply(self):
"""Test scrape_job_page with URL-based reply link containing contact info."""
html_content = """
<html>
<h1 class="postingtitle">Manager Position</h1>
<h2 class="company-name">BigCorp</h2>
<button class="reply-button" data-href="https://apply.bigcorp.com?email=hr@bigcorp.com&name=HR%20Team"></button>
<div id="map"></div>
<section id="postingbody"><p>Apply now</p></section>
<div class="postinginfos">
<p class="postinginfo">id: manager123</p>
</div>
</html>
"""
job_data = scrape_job_page(html_content, "https://example.com/job/999")
assert job_data["contact_email"] == "hr@bigcorp.com"
assert job_data["contact_name"] == "HR Team"
def test_scrape_job_page_negative_keyword_match(self, monkeypatch):
"""Test that negative keyword detection flags matching jobs."""
monkeypatch.setattr(
"web.scraper.get_negative_keywords", lambda: ["scam"])
html_content = """
<html>
<h1 class="postingtitle">Great Opportunity</h1>
<h2 class="company-name">SCAM Corp</h2>
<section id="postingbody"><p>This is a scam offer</p></section>
</html>
"""
job_data = scrape_job_page(
html_content, "https://example.com/job/negative")
assert job_data["is_negative_match"] is True
assert job_data["negative_keyword_match"] == "scam"
assert job_data["negative_match_field"] in {
"title", "company", "description"}
def test_scrape_job_page_no_negative_match(self, monkeypatch):
"""Test that jobs without matching keywords are not flagged."""
monkeypatch.setattr(
"web.scraper.get_negative_keywords", lambda: ["scam"])
html_content = """
<html>
<h1 class="postingtitle">Legit Opportunity</h1>
<h2 class="company-name">Honest Corp</h2>
<section id="postingbody"><p>We pay well and on time.</p></section>
</html>
"""
job_data = scrape_job_page(
html_content, "https://example.com/job/positive")
assert job_data["is_negative_match"] is False
assert job_data["negative_keyword_match"] is None
assert job_data["negative_match_field"] is None
class TestProcessJobUrlNegativeFiltering:
def test_process_job_url_skips_negative_match(self, monkeypatch):
job_url = "https://example.com/job/negative"
remove_calls = []
upsert_calls = []
monkeypatch.setattr(
"web.craigslist.get_last_fetch_time", lambda url: None)
monkeypatch.setattr(
"web.craigslist.insert_log",
lambda *args, **kwargs: None,
)
monkeypatch.setattr(
"web.craigslist.make_request_with_retry",
lambda url, attempts: "<html />",
)
monkeypatch.setattr(
"web.craigslist.scrape_job_page",
lambda content, url: _make_negative_job(url),
)
def fake_upsert(job_data, region="", keyword=""):
upsert_calls.append(job_data)
def fake_remove(url):
remove_calls.append(url)
monkeypatch.setattr("web.craigslist.upsert_job_details", fake_upsert)
monkeypatch.setattr("web.craigslist.remove_job", fake_remove)
messages = list(process_job_url(job_url, region="test", keyword="kw"))
assert any("Skipping job" in message for message in messages)
assert remove_calls == [job_url]
assert upsert_calls == []
class TestScraperPipelineNegativeFiltering:
def test_scraper_skips_negative_jobs(self, monkeypatch):
job_url = "https://example.com/job/negative"
remove_calls = []
upsert_calls = []
monkeypatch.setattr("web.craigslist.db_init", lambda: None)
def fake_fetch_listings():
yield "Fake listing fetch\n"
return {"discovered": 0, "new": 0, "by_search": [], "new_jobs": []}
monkeypatch.setattr("web.craigslist.fetch_listings",
fake_fetch_listings)
monkeypatch.setattr(
"web.craigslist.db_get_all_job_urls",
lambda: [{"url": job_url, "region": "reg", "keyword": "kw"}],
)
monkeypatch.setattr(
"web.craigslist.get_last_fetch_time", lambda url: None)
monkeypatch.setattr("web.craigslist.insert_log",
lambda *args, **kwargs: None)
monkeypatch.setattr(
"web.craigslist.make_request_with_retry", lambda url, attempts: "<html />"
)
monkeypatch.setattr("web.craigslist.url_to_job_id",
lambda url: "job123")
monkeypatch.setattr(
"web.craigslist.scrape_job_page",
lambda content, url: _make_negative_job(url),
)
def fake_upsert(job_data, region="", keyword=""):
upsert_calls.append(job_data)
def fake_remove(url):
remove_calls.append(url)
monkeypatch.setattr("web.craigslist.upsert_job_details", fake_upsert)
monkeypatch.setattr("web.craigslist.remove_job", fake_remove)
messages = list(scraper())
assert any("Skipping job" in message for message in messages)
assert remove_calls == [job_url]
assert upsert_calls == []
class TestScraperEmailNotifications:
def test_scraper_sends_email_for_new_jobs(self, monkeypatch):
monkeypatch.setattr("web.craigslist.db_init", lambda: None)
new_jobs = [
{
"title": "Python Developer",
"company": "Acme",
"location": "Remote",
"url": "https://example.com/jobs/1",
}
]
def fake_fetch_listings():
yield "Fake listing fetch\n"
return {
"discovered": 1,
"new": 1,
"by_search": [],
"new_jobs": new_jobs,
}
monkeypatch.setattr("web.craigslist.fetch_listings", fake_fetch_listings)
monkeypatch.setattr("web.craigslist.db_get_all_job_urls", lambda: [])
calls = {}
def fake_send_alert(jobs):
calls["jobs"] = jobs
return True, "sent"
monkeypatch.setattr("web.craigslist._send_new_job_alert", fake_send_alert)
messages = list(scraper())
assert calls["jobs"] == new_jobs
assert any("Job alert email sent." in message for message in messages)

View File

@@ -0,0 +1,148 @@
import pytest
from web.db import (
db_init,
create_or_update_user,
upsert_negative_keyword,
set_user_negative_keywords,
get_user_negative_keywords,
upsert_listing,
upsert_job_details,
get_all_jobs,
UserNegativeKeyword,
NegativeKeyword
)
from web.app import app
from web.utils import filter_jobs
@pytest.fixture
def client():
app.config['TESTING'] = True
app.config['WTF_CSRF_ENABLED'] = False
with app.test_client() as client:
with app.app_context():
db_init()
yield client
def test_negative_keyword_db_ops():
db_init()
username = "test_neg_user"
create_or_update_user(username, "password")
# Test upsert
kid = upsert_negative_keyword("scam")
assert kid > 0
kid2 = upsert_negative_keyword("scam")
assert kid == kid2
# Test set/get
set_user_negative_keywords(username, ["scam", "unpaid"])
nks = get_user_negative_keywords(username)
assert len(nks) == 2
assert "scam" in nks
assert "unpaid" in nks
# Test update
set_user_negative_keywords(username, ["scam"])
nks = get_user_negative_keywords(username)
assert len(nks) == 1
assert "scam" in nks
assert "unpaid" not in nks
# Test clear
set_user_negative_keywords(username, [])
nks = get_user_negative_keywords(username)
assert len(nks) == 0
def test_settings_endpoint(client):
username = "test_settings_user"
create_or_update_user(username, "password")
# Login
client.post('/login', data={'username': username, 'password': 'password'})
# Post settings
resp = client.post('/settings', json={
'regions': [],
'keywords': [],
'negative_keywords': ['spam', 'junk']
})
assert resp.status_code == 200
# Verify DB
nks = get_user_negative_keywords(username)
assert "spam" in nks
assert "junk" in nks
def test_job_filtering_with_negative_keywords():
# Setup jobs
jobs = [
{"title": "Great Job", "description": "Good pay"},
{"title": "Bad Job", "description": "This is a scam"},
{"title": "Okay Job", "description": "Average pay"},
]
# Filter
filtered = filter_jobs(jobs, negative_keywords=["scam"])
assert len(filtered) == 2
assert "Bad Job" not in [j['title'] for j in filtered]
filtered = filter_jobs(jobs, negative_keywords=["pay"])
assert len(filtered) == 1
assert "Bad Job" in [j['title']
for j in filtered] # "scam" job doesn't have "pay"
def test_jobs_endpoint_filtering(client):
username = "test_filter_user"
create_or_update_user(username, "password")
# Setup DB with jobs
upsert_listing(
url="http://example.com/1",
region="sfbay",
keyword="python",
title="Good Python Job",
pay="$100k",
location="SF",
timestamp="now"
)
upsert_job_details({
"url": "http://example.com/1",
"id": "1",
"title": "Good Python Job",
"description": "This is a legit job."
})
upsert_listing(
url="http://example.com/2",
region="sfbay",
keyword="python",
title="Bad Python Job",
pay="$100k",
location="SF",
timestamp="now"
)
upsert_job_details({
"url": "http://example.com/2",
"id": "2",
"title": "Bad Python Job",
"description": "This is a scam job."
})
# Login
client.post('/login', data={'username': username, 'password': 'password'})
# Set negative keywords
set_user_negative_keywords(username, ["scam"])
# Fetch jobs
resp = client.get('/jobs')
data = resp.get_json()
titles = [j['title'] for j in data]
assert "Good Python Job" in titles
assert "Bad Python Job" not in titles

View File

@@ -16,3 +16,23 @@ def test_http_settings_helpers():
assert isinstance(utils.get_backoff_factor(), int) assert isinstance(utils.get_backoff_factor(), int)
assert isinstance(utils.get_min_delay(), int) assert isinstance(utils.get_min_delay(), int)
assert isinstance(utils.get_max_delay(), int) assert isinstance(utils.get_max_delay(), int)
def test_negative_keywords_helper():
keywords = utils.get_negative_keywords()
assert isinstance(keywords, list)
for kw in keywords:
assert isinstance(kw, str)
assert kw == kw.lower()
def test_email_settings_helper():
settings = utils.get_email_settings()
assert isinstance(settings, dict)
assert 'enabled' in settings
assert 'from_address' in settings
smtp = settings.get('smtp')
assert isinstance(smtp, dict)
assert 'host' in smtp
assert isinstance(smtp.get('port'), int)
assert isinstance(settings.get('recipients'), list)

View File

@@ -2,6 +2,7 @@ import os
from flask import Flask, request, jsonify, render_template, redirect, url_for, session, flash, Response from flask import Flask, request, jsonify, render_template, redirect, url_for, session, flash, Response
from flask_wtf import CSRFProtect from flask_wtf import CSRFProtect
from typing import Dict, List from typing import Dict, List
from datetime import datetime, timezone
from web.craigslist import scraper from web.craigslist import scraper
from web.db import ( from web.db import (
@@ -17,8 +18,10 @@ from web.db import (
get_user_by_id, get_user_by_id,
get_user_regions, get_user_regions,
get_user_keywords, get_user_keywords,
get_user_negative_keywords,
set_user_regions, set_user_regions,
set_user_keywords, set_user_keywords,
set_user_negative_keywords,
get_all_regions, get_all_regions,
get_all_keywords, get_all_keywords,
stats_overview, stats_overview,
@@ -29,14 +32,24 @@ from web.db import (
rename_region, rename_region,
rename_keyword, rename_keyword,
change_region_color, change_region_color,
change_keyword_color change_keyword_color,
subscribe_email,
unsubscribe_email,
list_email_subscriptions,
list_email_templates,
create_email_template,
update_email_template,
delete_email_template,
get_email_template,
) )
from web.utils import ( from web.utils import (
initialize_users_from_settings, initialize_users_from_settings,
filter_jobs, filter_jobs,
get_job_by_id, get_job_by_id,
now_iso,
) )
from web.db import get_all_regions, get_all_keywords from web.db import get_all_regions, get_all_keywords
from web.email_templates import render_job_alert_email
app = Flask(__name__) app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET", "dev-secret-change-me") app.secret_key = os.environ.get("FLASK_SECRET", "dev-secret-change-me")
@@ -107,24 +120,30 @@ def index():
# Apply user preference filters if no explicit filters provided # Apply user preference filters if no explicit filters provided
selected_region = request.args.get("region") selected_region = request.args.get("region")
selected_keyword = request.args.get("keyword") selected_keyword = request.args.get("keyword")
if not selected_region and session.get('username'): user_negative_keywords = []
if session.get('username'):
try: try:
prefs = get_user_regions(session['username']) username = session['username']
if prefs: if not selected_region:
# If user has region prefs, filter to them by default prefs = get_user_regions(username)
all_jobs = [j for j in all_jobs if j.get( if prefs:
'region') in set(prefs)] # If user has region prefs, filter to them by default
all_jobs = [j for j in all_jobs if j.get(
'region') in set(prefs)]
if not selected_keyword:
prefs = get_user_keywords(username)
if prefs:
all_jobs = [j for j in all_jobs if j.get(
'keyword') in set(prefs)]
# Always fetch negative keywords for logged-in users
user_negative_keywords = get_user_negative_keywords(username)
except Exception: except Exception:
pass pass
if not selected_keyword and session.get('username'):
try: filtered_jobs = filter_jobs(
prefs = get_user_keywords(session['username']) all_jobs, selected_region, selected_keyword, negative_keywords=user_negative_keywords)
if prefs:
all_jobs = [j for j in all_jobs if j.get(
'keyword') in set(prefs)]
except Exception:
pass
filtered_jobs = filter_jobs(all_jobs, selected_region, selected_keyword)
return render_template( return render_template(
"index.html", "index.html",
@@ -178,23 +197,26 @@ def jobs():
# Respect user preferences when no explicit filters provided # Respect user preferences when no explicit filters provided
region = request.args.get("region") region = request.args.get("region")
keyword = request.args.get("keyword") keyword = request.args.get("keyword")
if not region and session.get('username'): user_negative_keywords = []
if session.get('username'):
try: try:
prefs = get_user_regions(session['username']) username = session['username']
if prefs: if not region:
all_jobs = [j for j in all_jobs if j.get( prefs = get_user_regions(username)
'region') in set(prefs)] if prefs:
all_jobs = [j for j in all_jobs if j.get(
'region') in set(prefs)]
if not keyword:
prefs = get_user_keywords(username)
if prefs:
all_jobs = [j for j in all_jobs if j.get(
'keyword') in set(prefs)]
user_negative_keywords = get_user_negative_keywords(username)
except Exception: except Exception:
pass pass
if not keyword and session.get('username'): return jsonify(filter_jobs(all_jobs, region, keyword, negative_keywords=user_negative_keywords))
try:
prefs = get_user_keywords(session['username'])
if prefs:
all_jobs = [j for j in all_jobs if j.get(
'keyword') in set(prefs)]
except Exception:
pass
return jsonify(filter_jobs(all_jobs, region, keyword))
@app.route('/job_details', methods=['GET']) @app.route('/job_details', methods=['GET'])
@@ -204,9 +226,9 @@ def job_details():
if session.get('username'): if session.get('username'):
try: try:
r = set(get_user_regions(session['username'])) r = set(get_user_regions(session['username']))
k = set(get_user_keywords(session['username']))
if r: if r:
jobs = [j for j in jobs if j.get('region') in r] jobs = [j for j in jobs if j.get('region') in r]
k = set(get_user_keywords(session['username']))
if k: if k:
jobs = [j for j in jobs if j.get('keyword') in k] jobs = [j for j in jobs if j.get('keyword') in k]
except Exception: except Exception:
@@ -356,6 +378,130 @@ def admin_user_delete(user_id):
return redirect(url_for('admin_users')) return redirect(url_for('admin_users'))
@app.route('/admin/emails', methods=['GET', 'POST'])
def admin_emails():
if not require_admin():
return redirect(url_for('login'))
if request.method == 'POST':
action = (request.form.get('action') or '').strip().lower()
email = (request.form.get('email') or '').strip()
try:
if action == 'subscribe':
subscribe_email(email)
flash('Subscription saved')
elif action == 'unsubscribe':
if unsubscribe_email(email):
flash('Subscription deactivated')
else:
flash('No matching subscription found')
elif action == 'reactivate':
subscribe_email(email)
flash('Subscription reactivated')
else:
flash('Unknown action')
except ValueError as exc:
flash(f'Error: {exc}')
except Exception as exc:
flash(f'Error: {exc}')
return redirect(url_for('admin_emails'))
subscriptions = list_email_subscriptions()
class Sub(dict):
__getattr__ = dict.get
subscription_rows = [Sub(s) for s in subscriptions]
active_count = sum(1 for s in subscription_rows if s.get('is_active'))
return render_template(
'admin/email.html',
title='Email Subscriptions',
subscriptions=subscription_rows,
total_active=active_count,
total=len(subscription_rows),
)
@app.route('/admin/email-templates', methods=['GET', 'POST'])
def admin_email_templates():
if not require_admin():
return redirect(url_for('login'))
if request.method == 'POST':
action = (request.form.get('action') or '').strip().lower()
template_id = request.form.get('template_id')
name = request.form.get('name') or ''
slug = request.form.get('slug') or ''
subject = request.form.get('subject') or ''
body = request.form.get('body') or ''
is_active = request.form.get('is_active') == 'on'
try:
if action == 'create':
create_email_template(
name=name, slug=slug, subject=subject, body=body, is_active=is_active)
flash('Template created')
elif action == 'update':
update_email_template(
int(template_id or 0),
name=name,
slug=slug or None,
subject=subject,
body=body,
is_active=is_active,
)
flash('Template updated')
elif action == 'delete':
if delete_email_template(int(template_id or 0)):
flash('Template deleted')
else:
flash('Template not found')
else:
flash('Unknown action')
except ValueError as exc:
flash(f'Error: {exc}')
except Exception as exc:
flash(f'Error: {exc}')
return redirect(url_for('admin_email_templates'))
templates = list_email_templates(include_inactive=True)
edit_id = request.args.get('template_id', type=int)
editing = get_email_template(edit_id) if edit_id else None
preview_payload = None
preview_template = None
preview_id = request.args.get('preview_id', type=int)
if preview_id:
preview_template = get_email_template(preview_id)
if preview_template:
sample_jobs = [
{
'title': 'Senior Python Engineer',
'company': 'ACME Corp',
'location': 'Remote',
'url': 'https://example.com/jobs/1',
},
{
'title': 'Data Engineer',
'company': 'Globex',
'location': 'New York, NY',
'url': 'https://example.com/jobs/2',
},
]
preview_payload = render_job_alert_email(
sample_jobs,
region='preview-region',
keyword='preview-keyword',
template_override=preview_template,
)
return render_template(
'admin/email_templates.html',
title='Email Templates',
templates=templates,
editing=editing,
preview=preview_payload,
preview_template=preview_template,
)
# ---------------- User settings (regions/keywords) ------------------------- # ---------------- User settings (regions/keywords) -------------------------
@app.route('/settings', methods=['GET', 'POST']) @app.route('/settings', methods=['GET', 'POST'])
@@ -367,6 +513,8 @@ def user_settings():
# Accept JSON or form posts. Normalize singular/plural names. # Accept JSON or form posts. Normalize singular/plural names.
sel_regions: list[str] = [] sel_regions: list[str] = []
sel_keywords: list[str] = [] sel_keywords: list[str] = []
sel_negative_keywords: list[str] = []
if request.is_json: if request.is_json:
data = request.get_json(silent=True) or {} data = request.get_json(silent=True) or {}
sel_regions = [ sel_regions = [
@@ -375,16 +523,25 @@ def user_settings():
sel_keywords = [ sel_keywords = [
(v or '').strip() for v in (data.get('keywords') or []) if v and (v or '').strip() (v or '').strip() for v in (data.get('keywords') or []) if v and (v or '').strip()
] ]
sel_negative_keywords = [
(v or '').strip() for v in (data.get('negative_keywords') or []) if v and (v or '').strip()
]
else: else:
# HTML form fallback: support names 'regions' or 'region', 'keywords' or 'keyword' # HTML form fallback: support names 'regions' or 'region', 'keywords' or 'keyword'
r_vals = request.form.getlist( r_vals = request.form.getlist(
'regions') + request.form.getlist('region') 'regions') + request.form.getlist('region')
k_vals = request.form.getlist( k_vals = request.form.getlist(
'keywords') + request.form.getlist('keyword') 'keywords') + request.form.getlist('keyword')
nk_vals = request.form.getlist(
'negative_keywords') + request.form.getlist('negative_keyword')
sel_regions = [(v or '').strip() sel_regions = [(v or '').strip()
for v in r_vals if v and (v or '').strip()] for v in r_vals if v and (v or '').strip()]
sel_keywords = [(v or '').strip() sel_keywords = [(v or '').strip()
for v in k_vals if v and (v or '').strip()] for v in k_vals if v and (v or '').strip()]
sel_negative_keywords = [(v or '').strip()
for v in nk_vals if v and (v or '').strip()]
# Upsert any new values into master lists # Upsert any new values into master lists
for r in sel_regions: for r in sel_regions:
try: try:
@@ -396,9 +553,14 @@ def user_settings():
upsert_keyword(k) upsert_keyword(k)
except Exception: except Exception:
pass pass
# Negative keywords are upserted inside set_user_negative_keywords implicitly if we wanted,
# but let's stick to the pattern. Actually set_user_negative_keywords calls upsert_negative_keyword.
try: try:
set_user_regions(username, sel_regions) set_user_regions(username, sel_regions)
set_user_keywords(username, sel_keywords) set_user_keywords(username, sel_keywords)
set_user_negative_keywords(username, sel_negative_keywords)
# For JSON callers, return 200 without redirect # For JSON callers, return 200 without redirect
if request.is_json: if request.is_json:
return jsonify({"status": "ok"}) return jsonify({"status": "ok"})
@@ -413,6 +575,8 @@ def user_settings():
all_keywords = get_all_keywords() all_keywords = get_all_keywords()
user_regions = get_user_regions(username) user_regions = get_user_regions(username)
user_keywords = get_user_keywords(username) user_keywords = get_user_keywords(username)
user_negative_keywords = get_user_negative_keywords(username)
return render_template( return render_template(
'user/settings.html', 'user/settings.html',
title='Your Preferences', title='Your Preferences',
@@ -420,6 +584,7 @@ def user_settings():
all_keywords=all_keywords, all_keywords=all_keywords,
user_regions=user_regions, user_regions=user_regions,
user_keywords=user_keywords, user_keywords=user_keywords,
user_negative_keywords=user_negative_keywords,
) )
@@ -516,6 +681,17 @@ def admin_stats():
return render_template('admin/stats.html', title='Statistics', stats=stats, jobs=jobs, regions=get_all_regions(), keywords=get_all_keywords()) return render_template('admin/stats.html', title='Statistics', stats=stats, jobs=jobs, regions=get_all_regions(), keywords=get_all_keywords())
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint for monitoring application status."""
return jsonify({
"status": "healthy",
"timestamp": now_iso(),
"service": "jobs-scraper",
"version": "1.0.0"
}), 200
def init(): def init():
"""Main function to run the Flask app.""" """Main function to run the Flask app."""
# Ensure DB is initialized # Ensure DB is initialized

View File

@@ -12,23 +12,73 @@ from web.db import (
insert_log, insert_log,
get_last_fetch_time, get_last_fetch_time,
) )
import schedule
import time
# Import utility functions # Import utility functions
from web.utils import ( from web.utils import (
get_base_url, get_base_url,
make_request_with_retry, make_request_with_retry,
now_iso, get_email_settings,
) )
from web.db import get_all_regions, get_all_keywords, seed_regions_keywords_from_listings from web.db import get_all_regions, get_all_keywords, seed_regions_keywords_from_listings
from web.email_templates import render_job_alert_email
from web.email_service import send_email
def _negative_match_details(job_data: dict) -> tuple[str, str] | None:
"""Return (keyword, field) when job_data indicates a negative match."""
if not job_data or not job_data.get("is_negative_match"):
return None
keyword = (job_data.get("negative_keyword_match") or "").strip()
field = (job_data.get("negative_match_field")
or "unknown").strip() or "unknown"
if not keyword:
keyword = "unknown keyword"
return keyword, field
def _send_new_job_alert(new_jobs: list[dict]) -> tuple[bool, str]:
"""Send an email alert for newly discovered jobs.
Returns (sent, message) where message explains why mail was skipped.
"""
settings = get_email_settings()
if not settings.get("enabled"):
return False, "email alerts disabled"
recipients = settings.get("recipients", []) or []
if not recipients:
return False, "no recipients configured"
payload = render_job_alert_email(new_jobs)
send_email(
subject=payload.get("subject", "New jobs available"),
body=payload.get("body", ""),
to=recipients,
settings=settings,
)
return True, "sent"
def fetch_listings(): def fetch_listings():
"""Fetch job listings from all regions and keywords.""" """Fetch job listings from all regions and keywords.
Yields progress messages and returns a dict with:
- discovered: total number of unique job URLs discovered
- new: total number of new jobs added to the database
- by_search: list of dicts, each containing:
- region: region name
- keyword: keyword name
- count: number of jobs fetched for this search
"""
# We'll collect URLs discovered in this run and then remove any DB listings # We'll collect URLs discovered in this run and then remove any DB listings
# not present in this set (treat DB as reflecting current search results). # not present in this set (treat DB as reflecting current search results).
existing_db_urls = set(row['url'] for row in db_get_all_job_urls()) existing_db_urls = set(row['url'] for row in db_get_all_job_urls())
discovered_urls = set() discovered_urls = set()
new_rows = [] new_rows = []
new_jobs = []
search_results = [] # Track count per search
# Ensure regions/keywords master lists exist # Ensure regions/keywords master lists exist
try: try:
@@ -57,13 +107,14 @@ def fetch_listings():
# Build a canonical search identifier for this region+keyword combination. # Build a canonical search identifier for this region+keyword combination.
url = get_base_url().format(region=region, keyword=keyword_name.replace(" ", "+")) url = get_base_url().format(region=region, keyword=keyword_name.replace(" ", "+"))
search_page_id = f"search:{region_name}:{keyword_name}" search_page_id = f"search:{region_name}:{keyword_name}"
search_count = 0 # Count jobs for this search
try: try:
last = get_last_fetch_time(url) last = get_last_fetch_time(url)
if last is not None: if last is not None:
# skip if fetched within the last 24 hours # skip if fetched within the last hour
age = datetime.now( age = datetime.now(
timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc)) timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc))
if age.total_seconds() < 24 * 3600: if age.total_seconds() < 1 * 3600:
yield f"Skipping {region_name} + {keyword_name} (fetched {age.seconds//3600}h ago)...\n" yield f"Skipping {region_name} + {keyword_name} (fetched {age.seconds//3600}h ago)...\n"
processed += 1 processed += 1
continue continue
@@ -81,8 +132,18 @@ def fetch_listings():
for row in process_region_keyword(region_name, keyword_name, discovered_urls): for row in process_region_keyword(region_name, keyword_name, discovered_urls):
timestamp, region, keyword, title, pay, location, url = row timestamp, region, keyword, title, pay, location, url = row
discovered_urls.add(url) discovered_urls.add(url)
search_count += 1
if url not in existing_db_urls: if url not in existing_db_urls:
new_rows.append(row) new_rows.append(row)
new_jobs.append({
"timestamp": timestamp,
"region": region,
"keyword": keyword,
"title": title,
"pay": pay,
"location": location,
"url": url,
})
# Upsert or update listing to reflect current search result # Upsert or update listing to reflect current search result
upsert_listing( upsert_listing(
url=url, url=url,
@@ -95,9 +156,20 @@ def fetch_listings():
fetched_from=search_page_id, fetched_from=search_page_id,
fetched_at=datetime.now(timezone.utc), fetched_at=datetime.now(timezone.utc),
) )
# Record per-search count
search_results.append({
"region": region_name,
"keyword": keyword_name,
"count": search_count
})
yield f"Listing fetch complete: {len(discovered_urls)} discovered, {len(new_rows)} new,\n" yield f"Listing fetch complete: {len(discovered_urls)} discovered, {len(new_rows)} new,\n"
return {"discovered": len(discovered_urls), "new": len(new_rows)} return {
"discovered": len(discovered_urls),
"new": len(new_rows),
"by_search": search_results,
"new_jobs": new_jobs,
}
def process_job_url(job_url: str, region: str = "", keyword: str = ""): def process_job_url(job_url: str, region: str = "", keyword: str = ""):
@@ -123,10 +195,17 @@ def process_job_url(job_url: str, region: str = "", keyword: str = ""):
yield f"Scraping job data from {job_url}\n" yield f"Scraping job data from {job_url}\n"
job_data = scrape_job_page(content, job_url) job_data = scrape_job_page(content, job_url)
if job_data: if job_data:
negative_info = _negative_match_details(job_data)
if negative_info:
keyword, field = negative_info
yield (
f"Skipping job {job_id} due to negative keyword "
f"'{keyword}' in {field}\n"
)
remove_job(job_url)
return None
yield f"Upserting job details for {job_id}\n" yield f"Upserting job details for {job_id}\n"
upsert_job_details(job_data, region=region, keyword=keyword) upsert_job_details(job_data, region=region, keyword=keyword)
upsert_user_interaction(
job_id, seen_at=datetime.now(timezone.utc).isoformat())
yield f"Successfully processed job {job_id}: {job_data.get('title', 'Unknown')}\n" yield f"Successfully processed job {job_id}: {job_data.get('title', 'Unknown')}\n"
return job_data return job_data
else: else:
@@ -145,8 +224,29 @@ def scraper():
# First, fetch current listings from search pages and make DB reflect them. # First, fetch current listings from search pages and make DB reflect them.
yield "Fetching listings...\n" yield "Fetching listings...\n"
for message in fetch_listings(): listing_summary: dict | None = None
yield message fetch_iter = fetch_listings()
try:
while True:
message = next(fetch_iter)
yield message
except StopIteration as stop:
listing_summary = stop.value if isinstance(stop.value, dict) else {}
new_jobs = []
if listing_summary:
new_jobs = listing_summary.get("new_jobs", []) or []
if new_jobs:
yield f"Preparing email alert for {len(new_jobs)} new jobs...\n"
try:
sent, info = _send_new_job_alert(new_jobs)
if sent:
yield "Job alert email sent.\n"
else:
yield f"Skipping email alert: {info}\n"
except Exception as exc:
yield f"Failed to send job alert email: {exc}\n"
# Finally, fetch and refresh individual job pages for current listings # Finally, fetch and refresh individual job pages for current listings
job_urls = db_get_all_job_urls() job_urls = db_get_all_job_urls()
@@ -165,5 +265,42 @@ def scraper():
yield "\nScraping completed successfully!\n" yield "\nScraping completed successfully!\n"
def scrape_jobs_with_retry(max_retries=3):
"""Run the scraping process with retry logic for failures."""
for attempt in range(max_retries):
try:
scraper()
return True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt * 10) # Exponential backoff
return False
def start_scheduler():
"""Start the scheduler to run scraping every hour."""
# Clear any existing jobs
schedule.clear()
# Schedule scraping every hour
schedule.every().hour.do(scrape_jobs_with_retry)
# Run the scheduler in a loop
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
def run_scheduled_scraping():
"""Run the scheduled scraping process."""
try:
scrape_jobs_with_retry()
except Exception as e:
pass
# Initialize scheduler when module is imported
schedule.every().hour.do(run_scheduled_scraping)
if __name__ == "__main__": if __name__ == "__main__":
scraper() scraper()

421
web/db.py
View File

@@ -5,16 +5,18 @@ from __future__ import annotations
Tables: Tables:
- users(user_id PK, username UNIQUE, created_at) - users(user_id PK, username UNIQUE, created_at)
- job_listings(job_id PK, url UNIQUE, region, keyword, title, pay, location, timestamp) - job_listings(job_id PK, url UNIQUE, region, keyword, title, pay, location, timestamp)
- job_descriptions(job_id PK FK -> job_listings, title, company, location, description, posted_time, url) - job_descriptions(job_id PK FK -> job_listings, title, company, location, description, posted_time, url, reply_url)
- user_interactions(job_id PK FK -> job_listings, user_id FK -> users, seen_at, url_visited, is_user_favorite) - user_interactions(job_id PK FK -> job_listings, user_id FK -> users, seen_at, url_visited, is_user_favorite)
- regions(region_id PK, name UNIQUE) - regions(region_id PK, name UNIQUE)
- keywords(keyword_id PK, name UNIQUE) - keywords(keyword_id PK, name UNIQUE)
- user_regions(user_id FK -> users, region_id FK -> regions, composite PK) - user_regions(user_id FK -> users, region_id FK -> regions, composite PK)
- user_keywords(user_id FK -> users, keyword_id FK -> keywords, composite PK) - user_keywords(user_id FK -> users, keyword_id FK -> keywords, composite PK)
- logs(id PK, page_url, region, keyword, fetched_at)
""" """
from datetime import datetime, UTC from datetime import datetime, UTC
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
import re
from web.utils import ( from web.utils import (
get_color_from_string, get_color_from_string,
url_to_job_id, url_to_job_id,
@@ -95,10 +97,279 @@ class JobDescription(Base):
description = Column(Text) description = Column(Text)
posted_time = Column(String(TIME_LEN)) posted_time = Column(String(TIME_LEN))
url = Column(String(URL_LEN)) url = Column(String(URL_LEN))
reply_url = Column(String(URL_LEN))
contact_email = Column(String(SHORT_LEN))
contact_phone = Column(String(SHORT_LEN))
contact_name = Column(String(SHORT_LEN))
listing = relationship("JobListing", back_populates="description") listing = relationship("JobListing", back_populates="description")
def _normalize_email(value: Optional[str]) -> str:
if not value or not isinstance(value, str):
return ""
return value.strip().lower()
def subscribe_email(email: str) -> bool:
"""Add or reactivate an email subscription."""
address = _normalize_email(email)
if not address:
raise ValueError("email address required")
with _ensure_session() as session:
existing = session.execute(
text(
"SELECT subscription_id, is_active FROM email_subscriptions WHERE email = :e"
),
{"e": address},
).fetchone()
now = datetime.now(UTC)
if existing:
session.execute(
text(
"UPDATE email_subscriptions SET is_active = 1, updated_at = :u WHERE subscription_id = :sid"
),
{"u": now, "sid": existing[0]},
)
else:
session.execute(
text(
"INSERT INTO email_subscriptions(email, is_active, created_at, updated_at) "
"VALUES(:e, 1, :u, :u)"
),
{"e": address, "u": now},
)
session.commit()
return True
def unsubscribe_email(email: str) -> bool:
"""Deactivate an email subscription."""
address = _normalize_email(email)
if not address:
raise ValueError("email address required")
with _ensure_session() as session:
now = datetime.now(UTC)
result = session.execute(
text(
"UPDATE email_subscriptions SET is_active = 0, updated_at = :u WHERE email = :e"
),
{"u": now, "e": address},
)
session.commit()
rowcount = getattr(result, "rowcount", None)
if rowcount is None:
return False
return rowcount > 0
def list_email_subscriptions(*, active_only: bool = False) -> List[Dict[str, Any]]:
"""Return subscription rows as dicts."""
query = "SELECT subscription_id, email, is_active, created_at, updated_at FROM email_subscriptions"
params: Dict[str, Any] = {}
if active_only:
query += " WHERE is_active = 1"
query += " ORDER BY email"
with _ensure_session() as session:
rows = session.execute(text(query), params).fetchall()
result: List[Dict[str, Any]] = []
for row in rows:
result.append(
{
"subscription_id": row[0],
"email": row[1],
"is_active": bool(row[2]),
"created_at": row[3],
"updated_at": row[4],
}
)
return result
def get_active_email_recipients() -> List[str]:
"""Return list of active subscription email addresses."""
return [s["email"] for s in list_email_subscriptions(active_only=True)]
def _normalize_slug(value: Optional[str]) -> str:
if not value:
return ""
slug = re.sub(r"[^a-zA-Z0-9-]+", "-", value.strip().lower())
slug = re.sub(r"-+", "-", slug).strip("-")
return slug
def _template_to_dict(template: EmailTemplate) -> Dict[str, Any]:
created = getattr(template, "created_at", None)
updated = getattr(template, "updated_at", None)
return {
"template_id": template.template_id,
"slug": template.slug,
"name": template.name,
"subject": template.subject,
"body": template.body,
"is_active": bool(template.is_active),
"created_at": created.isoformat() if isinstance(created, datetime) else created,
"updated_at": updated.isoformat() if isinstance(updated, datetime) else updated,
}
def list_email_templates(*, include_inactive: bool = True) -> List[Dict[str, Any]]:
with _ensure_session() as session:
query = session.query(EmailTemplate)
if not include_inactive:
query = query.filter(EmailTemplate.is_active.is_(True))
items = query.order_by(EmailTemplate.name.asc()).all()
return [_template_to_dict(obj) for obj in items]
def get_email_template(template_id: int) -> Optional[Dict[str, Any]]:
if not template_id:
return None
with _ensure_session() as session:
obj = session.get(EmailTemplate, int(template_id))
return _template_to_dict(obj) if obj else None
def get_email_template_by_slug(slug: str) -> Optional[Dict[str, Any]]:
normalized = _normalize_slug(slug)
if not normalized:
return None
with _ensure_session() as session:
obj = session.query(EmailTemplate).filter(
EmailTemplate.slug == normalized).one_or_none()
return _template_to_dict(obj) if obj else None
def create_email_template(
*,
name: str,
subject: str,
body: str,
slug: Optional[str] = None,
is_active: bool = True,
) -> Dict[str, Any]:
name_clean = (name or "").strip()
if not name_clean:
raise ValueError("Template name is required")
subject_clean = (subject or "").strip()
if not subject_clean:
raise ValueError("Template subject is required")
body_clean = (body or "").strip()
if not body_clean:
raise ValueError("Template body is required")
slug_clean = _normalize_slug(slug or name_clean)
if not slug_clean:
raise ValueError("Template slug is required")
with _ensure_session() as session:
existing = session.query(EmailTemplate).filter(
EmailTemplate.slug == slug_clean).one_or_none()
if existing:
raise ValueError("A template with this slug already exists")
template = EmailTemplate(
name=name_clean,
slug=slug_clean,
subject=subject_clean,
body=body_clean,
is_active=bool(is_active),
)
session.add(template)
session.commit()
session.refresh(template)
return _template_to_dict(template)
def update_email_template(
template_id: int,
*,
name: Optional[str] = None,
subject: Optional[str] = None,
body: Optional[str] = None,
slug: Optional[str] = None,
is_active: Optional[bool] = None,
) -> Dict[str, Any]:
if not template_id:
raise ValueError("template_id is required")
with _ensure_session() as session:
template = session.get(EmailTemplate, int(template_id))
if template is None:
raise ValueError("Template not found")
if name is not None:
name_clean = name.strip()
if not name_clean:
raise ValueError("Template name is required")
setattr(template, "name", name_clean)
if subject is not None:
subject_clean = subject.strip()
if not subject_clean:
raise ValueError("Template subject is required")
setattr(template, "subject", subject_clean)
if body is not None:
body_clean = body.strip()
if not body_clean:
raise ValueError("Template body is required")
setattr(template, "body", body_clean)
if slug is not None:
slug_clean = _normalize_slug(slug)
if not slug_clean:
raise ValueError("Template slug is required")
existing = (
session.query(EmailTemplate)
.filter(EmailTemplate.slug == slug_clean, EmailTemplate.template_id != template.template_id)
.one_or_none()
)
if existing:
raise ValueError("A template with this slug already exists")
setattr(template, "slug", slug_clean)
if is_active is not None:
setattr(template, "is_active", bool(is_active))
template.touch()
session.commit()
session.refresh(template)
return _template_to_dict(template)
def delete_email_template(template_id: int) -> bool:
if not template_id:
return False
with _ensure_session() as session:
template = session.get(EmailTemplate, int(template_id))
if template is None:
return False
session.delete(template)
session.commit()
return True
def ensure_default_email_template() -> None:
try:
from web.email_templates import DEFAULT_JOB_ALERT_SUBJECT, DEFAULT_JOB_ALERT_BODY
except Exception:
DEFAULT_JOB_ALERT_SUBJECT = "{count_label}{scope}"
DEFAULT_JOB_ALERT_BODY = (
"Hi,\n\n{intro_line}\n{jobs_message}\n\nGenerated at {timestamp} UTC.\n"
"You are receiving this message because job alerts are enabled.\n"
)
try:
with _ensure_session() as session:
existing = session.query(EmailTemplate).filter(
EmailTemplate.slug == "job-alert").one_or_none()
if existing is None:
template = EmailTemplate(
name="Job Alert",
slug="job-alert",
subject=DEFAULT_JOB_ALERT_SUBJECT,
body=DEFAULT_JOB_ALERT_BODY,
is_active=True,
)
session.add(template)
session.commit()
except Exception:
pass
class UserInteraction(Base): class UserInteraction(Base):
__tablename__ = "user_interactions" __tablename__ = "user_interactions"
# composite uniqueness on (user_id, job_id) # composite uniqueness on (user_id, job_id)
@@ -145,6 +416,20 @@ class UserKeyword(Base):
"keywords.keyword_id", ondelete="CASCADE"), primary_key=True) "keywords.keyword_id", ondelete="CASCADE"), primary_key=True)
class NegativeKeyword(Base):
__tablename__ = "negative_keywords"
keyword_id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(SHORT_LEN), unique=True, nullable=False)
class UserNegativeKeyword(Base):
__tablename__ = "user_negative_keywords"
user_id = Column(Integer, ForeignKey(
"users.user_id", ondelete="CASCADE"), primary_key=True)
keyword_id = Column(Integer, ForeignKey(
"negative_keywords.keyword_id", ondelete="CASCADE"), primary_key=True)
class Log(Base): class Log(Base):
__tablename__ = "logs" __tablename__ = "logs"
id = Column(Integer, primary_key=True, autoincrement=True) id = Column(Integer, primary_key=True, autoincrement=True)
@@ -154,6 +439,35 @@ class Log(Base):
fetched_at = Column(DateTime) fetched_at = Column(DateTime)
class EmailSubscription(Base):
__tablename__ = "email_subscriptions"
subscription_id = Column(Integer, primary_key=True, autoincrement=True)
email = Column(String(SHORT_LEN), unique=True, nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)
def touch(self):
setattr(self, "updated_at", datetime.utcnow())
class EmailTemplate(Base):
__tablename__ = "email_templates"
template_id = Column(Integer, primary_key=True, autoincrement=True)
slug = Column(String(SHORT_LEN), unique=True, nullable=False)
name = Column(String(SHORT_LEN), nullable=False)
subject = Column(Text, nullable=False)
body = Column(Text, nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
created_at = Column(
DateTime, default=lambda: datetime.now(UTC), nullable=False)
updated_at = Column(
DateTime, default=lambda: datetime.now(UTC), nullable=False)
def touch(self):
setattr(self, "updated_at", datetime.now(UTC))
def _ensure_session() -> Session: def _ensure_session() -> Session:
global engine, SessionLocal global engine, SessionLocal
if engine is None or SessionLocal is None: if engine is None or SessionLocal is None:
@@ -201,6 +515,31 @@ def db_init():
text("ALTER TABLE users ADD COLUMN IF NOT EXISTS last_login DATETIME NULL")) text("ALTER TABLE users ADD COLUMN IF NOT EXISTS last_login DATETIME NULL"))
except Exception: except Exception:
pass pass
try:
conn.execute(text(
"ALTER TABLE job_descriptions ADD COLUMN IF NOT EXISTS reply_url VARCHAR(512) NULL"))
except Exception:
pass
try:
conn.execute(text(
"ALTER TABLE job_descriptions ADD COLUMN IF NOT EXISTS contact_email VARCHAR(255) NULL"))
except Exception:
pass
try:
conn.execute(text(
"ALTER TABLE job_descriptions ADD COLUMN IF NOT EXISTS contact_phone VARCHAR(255) NULL"))
except Exception:
pass
try:
conn.execute(text(
"ALTER TABLE job_descriptions ADD COLUMN IF NOT EXISTS contact_name VARCHAR(255) NULL"))
except Exception:
pass
try:
ensure_default_email_template()
except Exception:
pass
def upsert_user_interaction(job_id: str | int, *, user_id: Optional[int] = None, seen_at: Optional[str] = None, url_visited: Optional[str] = None, is_user_favorite: Optional[bool] = None): def upsert_user_interaction(job_id: str | int, *, user_id: Optional[int] = None, seen_at: Optional[str] = None, url_visited: Optional[str] = None, is_user_favorite: Optional[bool] = None):
@@ -278,6 +617,9 @@ def upsert_job_details(job_data: Dict[str, Any], region: str = "", keyword: str
the function will skip updating to avoid unnecessary work. the function will skip updating to avoid unnecessary work.
- On successful upsert, a log entry is recorded with `insert_log(url, ...)`. - On successful upsert, a log entry is recorded with `insert_log(url, ...)`.
""" """
if not job_data or job_data.get("is_negative_match"):
return
url = job_data.get("url") url = job_data.get("url")
job_id = normalize_job_id(job_data.get("id"), url) job_id = normalize_job_id(job_data.get("id"), url)
if not job_id: if not job_id:
@@ -302,6 +644,10 @@ def upsert_job_details(job_data: Dict[str, Any], region: str = "", keyword: str
location = job_data.get("location") or None location = job_data.get("location") or None
description = job_data.get("description") or None description = job_data.get("description") or None
posted_time = job_data.get("posted_time") or None posted_time = job_data.get("posted_time") or None
reply_url = job_data.get("reply_url") or None
contact_email = job_data.get("contact_email") or None
contact_phone = job_data.get("contact_phone") or None
contact_name = job_data.get("contact_name") or None
job_id = str(job_id) job_id = str(job_id)
with _ensure_session() as session: with _ensure_session() as session:
@@ -315,6 +661,10 @@ def upsert_job_details(job_data: Dict[str, Any], region: str = "", keyword: str
setattr(obj, "description", description) setattr(obj, "description", description)
setattr(obj, "posted_time", posted_time) setattr(obj, "posted_time", posted_time)
setattr(obj, "url", url) setattr(obj, "url", url)
setattr(obj, "reply_url", reply_url)
setattr(obj, "contact_email", contact_email)
setattr(obj, "contact_phone", contact_phone)
setattr(obj, "contact_name", contact_name)
session.commit() session.commit()
# Record that we fetched/updated this job page # Record that we fetched/updated this job page
try: try:
@@ -626,6 +976,27 @@ def upsert_keyword(name: str) -> int:
return upsert_keyword(name) return upsert_keyword(name)
def upsert_negative_keyword(name: str) -> int:
"""Get or create a negative keyword by name; return keyword_id."""
name = (name or "").strip().lower()
if not name:
raise ValueError("Negative keyword cannot be empty")
with _ensure_session() as session:
row = session.execute(text("SELECT keyword_id FROM negative_keywords WHERE name = :n"), {
"n": name}).fetchone()
if row:
return int(row[0])
session.execute(
text("INSERT INTO negative_keywords(name) VALUES (:n)"), {"n": name})
session.commit()
with _ensure_session() as session:
row2 = session.execute(text("SELECT keyword_id FROM negative_keywords WHERE name = :n"), {
"n": name}).fetchone()
if row2:
return int(row2[0])
return upsert_negative_keyword(name)
def set_user_regions(username: str, region_names: List[str]) -> None: def set_user_regions(username: str, region_names: List[str]) -> None:
"""Replace user's preferred regions with given names.""" """Replace user's preferred regions with given names."""
user_id = get_or_create_user(username) user_id = get_or_create_user(username)
@@ -684,6 +1055,34 @@ def set_user_keywords(username: str, keyword_names: List[str]) -> None:
session.commit() session.commit()
def set_user_negative_keywords(username: str, keyword_names: List[str]) -> None:
"""Replace user's negative keywords with given names."""
user_id = get_or_create_user(username)
names = sorted({(n or "").strip().lower()
for n in keyword_names if (n or "").strip()})
keyword_ids: List[int] = [upsert_negative_keyword(n) for n in names]
if not keyword_ids and not names:
with _ensure_session() as session:
session.execute(
text("DELETE FROM user_negative_keywords WHERE user_id = :u"), {"u": user_id})
session.commit()
return
desired = set(keyword_ids)
with _ensure_session() as session:
rows = session.execute(text("SELECT keyword_id FROM user_negative_keywords WHERE user_id = :u"), {
"u": user_id}).fetchall()
current = set(int(r[0]) for r in rows)
to_add = desired - current
to_remove = current - desired
for kid in to_remove:
session.execute(text("DELETE FROM user_negative_keywords WHERE user_id = :u AND keyword_id = :k"), {
"u": user_id, "k": int(kid)})
for kid in to_add:
session.execute(text("INSERT INTO user_negative_keywords(user_id, keyword_id) VALUES(:u, :k)"), {
"u": user_id, "k": int(kid)})
session.commit()
def get_user_regions(username: str) -> List[Dict[str, str]]: def get_user_regions(username: str) -> List[Dict[str, str]]:
"""Return preferred region names for a user (empty if none).""" """Return preferred region names for a user (empty if none)."""
with _ensure_session() as session: with _ensure_session() as session:
@@ -724,6 +1123,26 @@ def get_user_keywords(username: str) -> List[Dict[str, str]]:
return [{"name": r[0], "color": r[1]} for r in rows] return [{"name": r[0], "color": r[1]} for r in rows]
def get_user_negative_keywords(username: str) -> List[str]:
"""Return negative keyword names for a user (empty if none)."""
with _ensure_session() as session:
row = session.execute(text("SELECT user_id FROM users WHERE username = :u"), {
"u": username}).fetchone()
if not row:
return []
user_id = int(row[0])
rows = session.execute(text(
"""
SELECT k.name
FROM negative_keywords k
INNER JOIN user_negative_keywords uk ON uk.keyword_id = k.keyword_id
WHERE uk.user_id = :u
ORDER BY k.name ASC
"""
), {"u": user_id}).fetchall()
return [r[0] for r in rows]
def get_all_regions() -> List[Dict[str, str]]: def get_all_regions() -> List[Dict[str, str]]:
"""Return all region names from regions table (sorted).""" """Return all region names from regions table (sorted)."""
with _ensure_session() as session: with _ensure_session() as session:

130
web/email_service.py Normal file
View File

@@ -0,0 +1,130 @@
"""Email sending utilities for the jobs scraper."""
from __future__ import annotations
from email.message import EmailMessage
from typing import Iterable, Sequence
import smtplib
from web.utils import get_email_settings
class EmailConfigurationError(RuntimeError):
"""Raised when email settings are missing or invalid."""
class EmailDeliveryError(RuntimeError):
"""Raised when an email fails to send."""
def _normalize_addresses(addresses: Sequence[str] | str | None) -> list[str]:
if not addresses:
return []
if isinstance(addresses, str):
items = [addresses]
else:
items = list(addresses)
cleaned: list[str] = []
seen: set[str] = set()
for raw in items:
if not isinstance(raw, str):
continue
addr = raw.strip()
if not addr:
continue
lower = addr.lower()
if lower in seen:
continue
seen.add(lower)
cleaned.append(addr)
return cleaned
def _ensure_recipients(*recipient_groups: Iterable[str]) -> list[str]:
merged: list[str] = []
seen: set[str] = set()
for group in recipient_groups:
for addr in group:
lower = addr.lower()
if lower in seen:
continue
seen.add(lower)
merged.append(addr)
if not merged:
raise EmailConfigurationError(
"At least one recipient address is required")
return merged
def send_email(
*,
subject: str,
body: str,
to: Sequence[str] | str,
cc: Sequence[str] | str | None = None,
bcc: Sequence[str] | str | None = None,
reply_to: Sequence[str] | str | None = None,
settings: dict | None = None,
) -> bool:
"""Send an email using configured SMTP settings.
Returns True when a message is sent, False when email is disabled.
Raises EmailConfigurationError for invalid config and EmailDeliveryError for SMTP failures.
"""
config = settings or get_email_settings()
if not config.get("enabled"):
return False
smtp_cfg = config.get("smtp", {})
host = (smtp_cfg.get("host") or "").strip()
if not host:
raise EmailConfigurationError("SMTP host is not configured")
port = int(smtp_cfg.get("port", 587) or 587)
timeout = int(smtp_cfg.get("timeout", 30) or 30)
use_ssl = bool(smtp_cfg.get("use_ssl", False))
use_tls = bool(smtp_cfg.get("use_tls", True))
from_address = (config.get("from_address")
or smtp_cfg.get("username") or "").strip()
if not from_address:
raise EmailConfigurationError("From address is not configured")
to_list = _normalize_addresses(to)
cc_list = _normalize_addresses(cc)
bcc_list = _normalize_addresses(bcc)
reply_to_list = _normalize_addresses(reply_to)
all_recipients = _ensure_recipients(to_list, cc_list, bcc_list)
message = EmailMessage()
message["Subject"] = subject
message["From"] = from_address
message["To"] = ", ".join(to_list)
if cc_list:
message["Cc"] = ", ".join(cc_list)
if reply_to_list:
message["Reply-To"] = ", ".join(reply_to_list)
message.set_content(body)
username = (smtp_cfg.get("username") or "").strip()
password = smtp_cfg.get("password") or ""
client_cls = smtplib.SMTP_SSL if use_ssl else smtplib.SMTP
try:
with client_cls(host=host, port=port, timeout=timeout) as client:
client.ehlo()
if use_tls and not use_ssl:
client.starttls()
client.ehlo()
if username:
client.login(username, password)
client.send_message(message, from_addr=from_address,
to_addrs=all_recipients)
except EmailConfigurationError:
raise
except Exception as exc: # pragma: no cover - network errors depend on env
raise EmailDeliveryError(str(exc)) from exc
return True

106
web/email_templates.py Normal file
View File

@@ -0,0 +1,106 @@
"""Email templates for job notifications."""
from __future__ import annotations
from datetime import datetime, UTC
from typing import Iterable, Mapping, Dict, Any
DEFAULT_DATETIME_FORMAT = "%Y-%m-%d %H:%M"
DEFAULT_JOB_ALERT_SUBJECT = "{count_label}{scope}"
DEFAULT_JOB_ALERT_BODY = (
"Hi,\n\n{intro_line}{jobs_section}\n\nGenerated at {timestamp} UTC.\n"
"You are receiving this message because job alerts are enabled.\n"
)
class _SafeDict(dict):
def __missing__(self, key: str) -> str:
return ""
def _format_template(template: str, context: Dict[str, Any]) -> str:
safe_context = _SafeDict(
{k: ("\n".join(str(v) for v in context[k]) if isinstance(
context[k], list) else context[k]) for k in context}
)
return template.format_map(safe_context)
def render_job_alert_email(
jobs: Iterable[Mapping[str, object]],
*,
region: str | None = None,
keyword: str | None = None,
generated_at: datetime | None = None,
template_override: Mapping[str, str] | None = None,
) -> dict[str, Any]:
"""Render the subject/body for a job alert email.
Returns a dict with subject/body strings and the context used to render them.
"""
job_list = list(jobs)
generated_at = generated_at or datetime.now(UTC)
timestamp = generated_at.strftime(DEFAULT_DATETIME_FORMAT)
scope_parts = []
if region:
scope_parts.append(f"region: {region}")
if keyword:
scope_parts.append(f"keyword: {keyword}")
scope = " (" + ", ".join(scope_parts) + ")" if scope_parts else ""
job_lines: list[str] = []
for index, job in enumerate(job_list, start=1):
title = str(job.get("title", "Untitled"))
company = str(job.get("company", "Unknown company"))
location = str(job.get("location", "N/A"))
url = str(job.get("url", ""))
line = f"{index}. {title}{company} ({location})"
job_lines.append(line)
if url:
job_lines.append(f" {url}")
if job_lines:
jobs_section = "\n" + "\n".join(job_lines)
else:
jobs_section = "\nNo jobs matched this alert."
jobs_message = jobs_section.strip()
context: Dict[str, Any] = {
"count": len(job_list),
"count_label": "No new jobs" if not job_list else f"{len(job_list)} new jobs",
"scope": scope,
"region": region or "",
"keyword": keyword or "",
"timestamp": timestamp,
"generated_at": generated_at,
"intro_line": "Here are the latest jobs discovered by the scraper:",
"jobs_message": jobs_message,
"jobs_section": jobs_section,
"jobs_lines": job_lines,
"has_jobs": bool(job_list),
}
template = template_override
if template is None:
try:
from web.db import get_email_template_by_slug
template = get_email_template_by_slug("job-alert")
except Exception:
template = None
template_subject = (template or {}).get(
"subject") or DEFAULT_JOB_ALERT_SUBJECT
template_body = (template or {}).get("body") or DEFAULT_JOB_ALERT_BODY
subject = _format_template(template_subject, context)
body = _format_template(template_body, context)
result = {
"subject": subject,
"body": body,
"context": context,
"template_slug": (template or {}).get("slug", "job-alert"),
}
return result

View File

@@ -1,7 +1,82 @@
from datetime import datetime, UTC from datetime import datetime, UTC
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from typing import List, Dict, Set from typing import List, Dict, Set
from web.utils import get_base_url, safe_get_text, safe_get_attr, make_request_with_retry from urllib.parse import urlparse, parse_qs
import re
from web.utils import (
get_base_url,
safe_get_text,
safe_get_attr,
make_request_with_retry,
get_negative_keywords,
)
def extract_contact_info(reply_url) -> Dict[str, str]:
"""Extract contact information from reply URL.
Parses mailto links, phone links, and contact form URLs to extract:
- email: Email address (from mailto links)
- phone: Phone number (from tel links or URL parameters)
- contact_name: Contact person name (if available in URL parameters)
Returns a dict with email, phone, and contact_name keys (values may be "N/A").
"""
contact_info = {
"email": "N/A",
"phone": "N/A",
"contact_name": "N/A"
}
# Handle None or empty cases
if not reply_url or reply_url == "N/A":
return contact_info
reply_url = str(reply_url).strip()
if not reply_url or reply_url == "N/A":
return contact_info
try:
# Check for mailto links
if reply_url.startswith("mailto:"):
email_part = reply_url.replace("mailto:", "")
# Extract email (may contain ?subject=...)
email = email_part.split("?")[0]
contact_info["email"] = email
return contact_info
# Check for tel links
if reply_url.startswith("tel:"):
phone = reply_url.replace("tel:", "")
contact_info["phone"] = phone
return contact_info
# Parse as URL
if reply_url.startswith("http"):
parsed = urlparse(reply_url)
params = parse_qs(parsed.query)
# Try to extract email from parameters
for key in ["email", "from_email", "sender_email", "contact_email"]:
if key in params:
contact_info["email"] = params[key][0]
break
# Try to extract phone from parameters
for key in ["phone", "tel", "telephone"]:
if key in params:
contact_info["phone"] = params[key][0]
break
# Try to extract contact name from parameters
for key in ["contact_name", "from_name", "name"]:
if key in params:
contact_info["contact_name"] = params[key][0]
break
except Exception:
pass
return contact_info
def scrape_listings_page(listing, region: str, keyword: str, seen_urls: Set[str]) -> List: def scrape_listings_page(listing, region: str, keyword: str, seen_urls: Set[str]) -> List:
@@ -40,6 +115,16 @@ def scrape_job_page(content: str, url: str) -> Dict:
"""Scrape job details from a job listing page.""" """Scrape job details from a job listing page."""
soup = BeautifulSoup(content, "html.parser") soup = BeautifulSoup(content, "html.parser")
# Extract reply button
reply_button = soup.find("button", class_="reply-button")
if reply_button:
reply_url = safe_get_attr(reply_button, "data-href")
else:
reply_url = "N/A"
# Extract contact information from reply URL
contact_info = extract_contact_info(reply_url)
# Extract each field # Extract each field
title = safe_get_text(soup.find("h1", class_="postingtitle")) title = safe_get_text(soup.find("h1", class_="postingtitle"))
company = safe_get_text(soup.find("h2", class_="company-name")) company = safe_get_text(soup.find("h2", class_="company-name"))
@@ -80,6 +165,30 @@ def scrape_job_page(content: str, url: str) -> Dict:
job_id = "" job_id = ""
posted_time = "" posted_time = ""
# Negative keyword detection
negative_keyword_match = None
negative_match_field = None
negative_keywords = get_negative_keywords()
if negative_keywords:
fields_to_check = {
"title": title or "",
"company": company or "",
"location": location or "",
"description": description or "",
}
for keyword in negative_keywords:
if not keyword:
continue
pattern = re.compile(
r"\b" + re.escape(keyword) + r"\b", re.IGNORECASE)
for field_name, field_value in fields_to_check.items():
if field_value and pattern.search(field_value):
negative_keyword_match = keyword
negative_match_field = field_name
break
if negative_keyword_match:
break
return { return {
"url": url, "url": url,
"title": title, "title": title,
@@ -87,7 +196,14 @@ def scrape_job_page(content: str, url: str) -> Dict:
"location": location, "location": location,
"description": description, "description": description,
"id": job_id, "id": job_id,
"posted_time": posted_time "posted_time": posted_time,
"reply_url": reply_url,
"contact_email": contact_info["email"],
"contact_phone": contact_info["phone"],
"contact_name": contact_info["contact_name"],
"negative_keyword_match": negative_keyword_match,
"negative_match_field": negative_match_field,
"is_negative_match": bool(negative_keyword_match),
} }
@@ -108,7 +224,7 @@ def scrape_job_data(content: str, region: str, keyword: str, seen_urls: Set[str]
def process_region_keyword(region: str, keyword: str, seen_urls: Set[str]) -> List[List]: def process_region_keyword(region: str, keyword: str, seen_urls: Set[str]) -> List[List]:
"""Process a single region and keyword.""" """Process a single region and keyword."""
url = get_base_url().format(region=region, keyword=keyword.replace(" ", "+")) url = get_base_url().format(region=region, keyword=keyword.replace(" ", "+"))
content = make_request_with_retry(url, 3) content = make_request_with_retry(url, 1)
if content is None: if content is None:
return [] return []
return scrape_job_data(content, region, keyword, seen_urls) return scrape_job_data(content, region, keyword, seen_urls)

View File

@@ -41,12 +41,16 @@ function scrape(event) {
event.preventDefault(); // Prevent the default form submission event.preventDefault(); // Prevent the default form submission
updateScrapeInfo("Scraping in progress...", "blue"); updateScrapeInfo("Scraping in progress...", "blue");
fetch("/scrape") fetch("/scrape")
.then((response) => response.json()) // expect HTML response containing "Scraping completed successfully!"
.then((response) => response.text())
.then((data) => { .then((data) => {
if (data.status) { if (data.includes("Scraping completed successfully!")) {
updateScrapeInfo(data.status, "green"); updateScrapeInfo("Scraping completed successfully!", "green");
} else { } else {
updateScrapeInfo("Scraping failed. Please try again.", "red"); updateScrapeInfo(
"Scraping failed or timed out. Please try again.",
"red"
);
} }
}) })
.catch((error) => console.error("Error:", error)); .catch((error) => console.error("Error:", error));

View File

@@ -1,4 +1,22 @@
/* javascript form handling */ /* javascript form handling */
document.addEventListener("DOMContentLoaded", function () {
const newNkInput = document.getElementById("new-negative-keyword");
if (newNkInput) {
newNkInput.addEventListener("input", function () {
const val = this.value.trim();
const existing = Array.from(
document.querySelectorAll('input[name="negative_keyword"]')
).map((el) => el.value);
if (existing.includes(val)) {
this.setCustomValidity("Keyword already exists");
this.reportValidity();
} else {
this.setCustomValidity("");
}
});
}
});
document document
.getElementById("user-settings-form") .getElementById("user-settings-form")
.addEventListener("submit", function (event) { .addEventListener("submit", function (event) {
@@ -10,11 +28,15 @@ document
// Collect selected regions and keywords // Collect selected regions and keywords
const selectedRegions = []; const selectedRegions = [];
const selectedKeywords = []; const selectedKeywords = [];
const selectedNegativeKeywords = [];
formData.forEach((value, key) => { formData.forEach((value, key) => {
if (key === "region") { if (key === "region") {
selectedRegions.push(value); selectedRegions.push(value);
} else if (key === "keyword") { } else if (key === "keyword") {
selectedKeywords.push(value); selectedKeywords.push(value);
} else if (key === "negative_keyword") {
selectedNegativeKeywords.push(value);
} }
}); });
@@ -30,10 +52,21 @@ document
selectedKeywords.push(newKeyword); selectedKeywords.push(newKeyword);
} }
// Add new negative keyword if provided
const newNegativeKeyword = formData.get("new-negative-keyword").trim();
if (newNegativeKeyword) {
if (selectedNegativeKeywords.includes(newNegativeKeyword)) {
alert("Negative keyword already exists!");
return;
}
selectedNegativeKeywords.push(newNegativeKeyword);
}
// Prepare data to send // Prepare data to send
const dataToSend = { const dataToSend = {
regions: selectedRegions, regions: selectedRegions,
keywords: selectedKeywords, keywords: selectedKeywords,
negative_keywords: selectedNegativeKeywords,
csrf_token: formData.get("csrf_token"), csrf_token: formData.get("csrf_token"),
}; };

View File

@@ -0,0 +1,62 @@
{% extends 'base.html' %} {% block content %}
<h2>Email Subscriptions</h2>
<section>
<h3>Add Subscription</h3>
<form method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
<input type="hidden" name="action" value="subscribe" />
<label for="email">Email address</label>
<input
type="email"
id="email"
name="email"
placeholder="alerts@example.com"
required
/>
<button type="submit">Subscribe</button>
</form>
</section>
<section>
<h3>Current Recipients</h3>
{% if not subscriptions %}
<p>No subscriptions yet. Add one above to start sending alerts.</p>
<p>You can customize alert content from the <a href="{{ url_for('admin_email_templates') }}">Email Templates</a> page.</p>
{% else %}
<p>{{ total_active }} active of {{ total }} total.</p>
<table>
<thead>
<tr>
<th>Email</th>
<th>Status</th>
<th>Created</th>
<th>Updated</th>
<th>Action</th>
</tr>
</thead>
<tbody>
{% for sub in subscriptions %}
<tr>
<td>{{ sub.email }}</td>
<td>{{ 'Active' if sub.is_active else 'Inactive' }}</td>
<td>{{ sub.created_at }}</td>
<td>{{ sub.updated_at }}</td>
<td>
<form method="post" style="display: inline-flex; gap: 0.5rem">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
<input type="hidden" name="email" value="{{ sub.email }}" />
{% if sub.is_active %}
<input type="hidden" name="action" value="unsubscribe" />
<button type="submit">Deactivate</button>
{% else %}
<input type="hidden" name="action" value="reactivate" />
<button type="submit">Reactivate</button>
{% endif %}
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
</section>
{% endblock %}

View File

@@ -0,0 +1,102 @@
{% extends 'base.html' %}
{% block content %}
<h2>Email Templates</h2>
<section>
<h3>Available Templates</h3>
{% if not templates %}
<p>No templates found. Create one below to get started.</p>
{% else %}
<table>
<thead>
<tr>
<th>Name</th>
<th>Slug</th>
<th>Status</th>
<th>Updated</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for template in templates %}
<tr>
<td>{{ template.name }}</td>
<td>{{ template.slug }}</td>
<td>{{ 'Active' if template.is_active else 'Inactive' }}</td>
<td>{{ template.updated_at or template.created_at or '' }}</td>
<td style="display: flex; gap: 0.5rem;">
<a class="button" href="{{ url_for('admin_email_templates', template_id=template.template_id) }}">Edit</a>
<a class="button" href="{{ url_for('admin_email_templates', preview_id=template.template_id) }}">Preview</a>
<form method="post" onsubmit="return confirm('Delete template {{ template.name }}?');">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
<input type="hidden" name="action" value="delete" />
<input type="hidden" name="template_id" value="{{ template.template_id }}" />
<button type="submit">Delete</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
</section>
<section>
<h3>{{ 'Edit Template' if editing else 'Create Template' }}</h3>
<form method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
<input type="hidden" name="action" value="{{ 'update' if editing else 'create' }}" />
{% if editing %}
<input type="hidden" name="template_id" value="{{ editing.template_id }}" />
{% endif %}
<div>
<label for="name">Name</label>
<input type="text" id="name" name="name" value="{{ editing.name if editing else '' }}" required />
</div>
<div>
<label for="slug">Slug</label>
<input type="text" id="slug" name="slug" placeholder="job-alert" value="{{ editing.slug if editing else '' }}" />
<small>Leave blank to reuse the name. Slug must be URL friendly (letters, numbers, dashes).</small>
</div>
<div>
<label for="subject">Subject Template</label>
<input type="text" id="subject" name="subject" value="{{ editing.subject if editing else '' }}" required />
</div>
<div>
<label for="body">Body Template</label>
<textarea id="body" name="body" rows="12" required>{{ editing.body if editing else '' }}</textarea>
</div>
<div>
<label>
<input type="checkbox" name="is_active" {% if editing is none or editing.is_active %}checked{% endif %} />
Active
</label>
</div>
<button type="submit">{{ 'Update Template' if editing else 'Create Template' }}</button>
{% if editing %}
<a class="button" href="{{ url_for('admin_email_templates') }}">Cancel</a>
{% endif %}
</form>
<aside>
<h4>Available placeholders</h4>
<ul>
<li><code>{count}</code> number of jobs in the alert</li>
<li><code>{count_label}</code> "No new jobs" or "X new jobs"</li>
<li><code>{scope}</code> formatted region/keyword context</li>
<li><code>{region}</code>, <code>{keyword}</code></li>
<li><code>{timestamp}</code> formatted timestamp</li>
<li><code>{jobs_section}</code> newline-prefixed block of job entries</li>
<li><code>{jobs_message}</code> jobs block without leading newline</li>
</ul>
</aside>
</section>
{% if preview %}
<section>
<h3>Preview: {{ preview_template.name if preview_template else 'Job Alert' }}</h3>
<article>
<h4>Subject</h4>
<pre>{{ preview.subject }}</pre>
<h4>Body</h4>
<pre>{{ preview.body }}</pre>
</article>
</section>
{% endif %}
{% endblock %}

View File

@@ -16,17 +16,21 @@
<header> <header>
<h1><a href="/">{{ title or 'Admin' }}</a></h1> <h1><a href="/">{{ title or 'Admin' }}</a></h1>
<nav> <nav>
{% if username %}<span>Hi, {{ username }}</span> | {% endif %} <div id="navigation">
<a href="{{ url_for('index') }}">Home</a> | {% if username %}<span>Hi, {{ username }}</span> | {% endif %}
<a href="{{ url_for('user_settings') }}">Preferences</a> <a href="{{ url_for('index') }}">Home</a> |
{% if current_user and current_user.is_admin %} | <a href="{{ url_for('user_settings') }}">Preferences</a>
<a href="{{ url_for('scrape_page') }}">Scrape Jobs</a> | {% if current_user and current_user.is_admin %} |
<a href="{{ url_for('admin_taxonomy') }}">Taxonomy</a> | <a href="{{ url_for('scrape_page') }}">Scrape Jobs</a> |
<a href="{{ url_for('admin_stats') }}">Statistics</a> | <a href="{{ url_for('admin_taxonomy') }}">Taxonomy</a> |
<a href="{{ url_for('admin_users') }}">Users</a> {% endif %} {% if <a href="{{ url_for('admin_stats') }}">Statistics</a> |
session.get('username') %} | <a href="{{ url_for('admin_emails') }}">Email Alerts</a> |
<a href="{{ url_for('logout') }}">Logout</a> {% else %} | <a href="{{ url_for('admin_email_templates') }}">Email Templates</a> |
<a href="{{ url_for('login') }}">Login</a>{% endif %} <a href="{{ url_for('admin_users') }}">Users</a> {% endif %} {% if
session.get('username') %} |
<a href="{{ url_for('logout') }}">Logout</a> {% else %} |
<a href="{{ url_for('login') }}">Login</a>{% endif %}
</div>
</nav> </nav>
{% with messages = get_flashed_messages() %} {% if messages %} {% with messages = get_flashed_messages() %} {% if messages %}
<ul> <ul>

View File

@@ -77,6 +77,29 @@ block content %}
<p>No keywords available. Ask an admin to add some.</p> <p>No keywords available. Ask an admin to add some.</p>
{% endif %} {% endif %}
</fieldset> </fieldset>
<fieldset>
<legend>Negative Keywords</legend>
<p>
<small>Add new Negative Keyword:</small>
<input
type="text"
name="new-negative-keyword"
id="new-negative-keyword"
value=""
placeholder="Type a keyword and save to add"
size="30"
/>
</p>
{% if user_negative_keywords %} {% for nk in user_negative_keywords %}
<label style="display: block">
<input type="checkbox" name="negative_keyword" value="{{ nk }}" checked />
{{ nk }}
</label>
{% endfor %} {% else %}
<p>No negative keywords set.</p>
{% endif %}
<p><small>Uncheck to remove.</small></p>
</fieldset>
<button type="submit">Save</button> <button type="submit">Save</button>
</form> </form>
{% endblock %} {% block footer_scripts %} {% endblock %} {% block footer_scripts %}

View File

@@ -125,6 +125,66 @@ def get_base_url() -> str:
return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel") return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel")
def get_negative_keywords() -> List[str]:
"""Return normalized list of negative keywords from config."""
raw = get_config().get('scraper', {}).get('negative_keywords', [])
if not isinstance(raw, list):
return []
cleaned: List[str] = []
for item in raw:
if not isinstance(item, str):
continue
val = item.strip()
if not val:
continue
cleaned.append(val.lower())
return cleaned
def get_email_settings() -> Dict[str, Any]:
"""Return normalized email settings from config."""
cfg = get_config().get('email', {})
if not isinstance(cfg, dict):
cfg = {}
raw_smtp = cfg.get('smtp', {}) if isinstance(cfg.get('smtp'), dict) else {}
raw_recipients = cfg.get('recipients', [])
def _to_int(value, default):
try:
return int(value)
except (TypeError, ValueError):
return default
recipients: List[str] = []
if isinstance(raw_recipients, list):
for item in raw_recipients:
if isinstance(item, str):
addr = item.strip()
if addr:
recipients.append(addr)
smtp = {
'host': (raw_smtp.get('host') or '').strip(),
'port': _to_int(raw_smtp.get('port', 587), 587),
'username': (raw_smtp.get('username') or '').strip(),
'password': raw_smtp.get('password') or '',
'use_tls': bool(raw_smtp.get('use_tls', True)),
'use_ssl': bool(raw_smtp.get('use_ssl', False)),
'timeout': _to_int(raw_smtp.get('timeout', 30), 30),
}
if smtp['port'] <= 0:
smtp['port'] = 587
if smtp['timeout'] <= 0:
smtp['timeout'] = 30
return {
'enabled': bool(cfg.get('enabled', False)),
'from_address': (cfg.get('from_address') or '').strip(),
'smtp': smtp,
'recipients': recipients,
}
def now_iso() -> str: def now_iso() -> str:
"""Get the current time in ISO format.""" """Get the current time in ISO format."""
return datetime.now(UTC).isoformat() return datetime.now(UTC).isoformat()
@@ -203,13 +263,39 @@ def filter_jobs(
jobs: List[Dict[str, Any]], jobs: List[Dict[str, Any]],
region: Optional[str] = None, region: Optional[str] = None,
keyword: Optional[str] = None, keyword: Optional[str] = None,
negative_keywords: Optional[List[str]] = None,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Filter jobs by optional region and keyword.""" """Filter jobs by optional region, keyword, and negative keywords."""
filtered = jobs filtered = jobs
if region: if region:
filtered = [j for j in filtered if j.get("region") == region] filtered = [j for j in filtered if j.get("region") == region]
if keyword: if keyword:
filtered = [j for j in filtered if j.get("keyword") == keyword] filtered = [j for j in filtered if j.get("keyword") == keyword]
if negative_keywords:
# Pre-compile regexes or just check substring?
# Scraper uses substring check. Let's do the same for consistency.
# Fields to check: title, company, location, description
# Note: description might contain HTML or be long.
# Normalize negative keywords
nks = [nk.lower() for nk in negative_keywords if nk]
def is_clean(job):
# Check all fields
text_blob = " ".join([
str(job.get("title") or ""),
str(job.get("company") or ""),
str(job.get("location") or ""),
str(job.get("description") or "")
]).lower()
for nk in nks:
if nk in text_blob:
return False
return True
filtered = [j for j in filtered if is_clean(j)]
return filtered return filtered