Files
jobs/web/utils.py
zwitschi 2185a07ff0
Some checks failed
CI/CD Pipeline / test (push) Failing after 4m9s
feat: Implement email sending utilities and templates for job notifications
- Added email_service.py for sending emails with SMTP configuration.
- Introduced email_templates.py to render job alert email subjects and bodies.
- Enhanced scraper.py to extract contact information from job listings.
- Updated settings.js to handle negative keyword input validation.
- Created email.html and email_templates.html for managing email subscriptions and templates in the admin interface.
- Modified base.html to include links for email alerts and templates.
- Expanded user settings.html to allow management of negative keywords.
- Updated utils.py to include functions for retrieving negative keywords and email settings.
- Enhanced job filtering logic to exclude jobs containing negative keywords.
2025-11-28 18:15:08 +01:00

359 lines
11 KiB
Python

"""
Utility functions for the Craigslist scraper.
"""
from typing import Any, Optional, List, Dict
from datetime import datetime, UTC
import json
import os
import random
import re
import requests
import time
def get_config_file() -> str:
"""Return the path to the main config file."""
return os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'config', 'settings.json'))
def get_config() -> dict:
"""Return the loaded configuration dict."""
CONFIG = {}
try:
with open(get_config_file(), 'r', encoding='utf-8') as _f:
CONFIG = json.load(_f)
except Exception:
CONFIG = {}
return CONFIG
def get_users_from_settings() -> List[Dict]:
"""Return user entries from settings.json (array of dicts)."""
users = get_config().get('users', [])
if not isinstance(users, list):
return []
out: List[Dict] = []
for u in users:
if not isinstance(u, dict):
continue
username = (u.get('username') or '').strip()
if not username:
continue
out.append({
'username': username,
'is_admin': bool(u.get('is_admin', False)),
'password': u.get('password') or ''
})
return out
def initialize_users_from_settings() -> int:
"""Ensure users from settings.json exist in DB; set admin/active and passwords.
Returns number of users processed.
"""
from web.db import create_or_update_user # local import to avoid cycles
users = get_users_from_settings()
count = 0
for u in users:
pw = u.get('password') or None
create_or_update_user(u['username'], password=pw, is_admin=bool(
u.get('is_admin', False)), is_active=True)
count += 1
return count
def verify_credentials(username: str, password: str) -> bool:
"""Proxy to db.verify_user_credentials"""
from web.db import verify_user_credentials
return verify_user_credentials(username, password)
# --- Database configuration helpers ---
def get_mysql_config() -> dict:
"""Return MySQL/MariaDB connection settings."""
db = get_config().get('database', {}).get('mysql', {})
return {
'host': db.get('host', '127.0.0.1'),
'user': db.get('user', 'root'),
'password': db.get('password', ''),
'database': db.get('database', 'jobs'),
'port': db.get('port', 3306),
}
def get_http_setting(key: str, default=None):
return get_config().get('http', {}).get(key, default)
def get_paths() -> dict:
return get_config().get('paths', {})
def get_logs_dir() -> str:
return get_paths().get('logs_dir', 'logs')
def get_user_agent() -> str:
return get_http_setting('user_agent')
def get_request_timeout() -> int:
return get_http_setting('request_timeout')
def get_max_retries() -> int:
return get_http_setting('max_retries')
def get_backoff_factor() -> int:
return get_http_setting('backoff_factor')
def get_min_delay() -> int:
return get_http_setting('min_delay')
def get_max_delay() -> int:
return get_http_setting('max_delay')
def get_base_url() -> str:
return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel")
def get_negative_keywords() -> List[str]:
"""Return normalized list of negative keywords from config."""
raw = get_config().get('scraper', {}).get('negative_keywords', [])
if not isinstance(raw, list):
return []
cleaned: List[str] = []
for item in raw:
if not isinstance(item, str):
continue
val = item.strip()
if not val:
continue
cleaned.append(val.lower())
return cleaned
def get_email_settings() -> Dict[str, Any]:
"""Return normalized email settings from config."""
cfg = get_config().get('email', {})
if not isinstance(cfg, dict):
cfg = {}
raw_smtp = cfg.get('smtp', {}) if isinstance(cfg.get('smtp'), dict) else {}
raw_recipients = cfg.get('recipients', [])
def _to_int(value, default):
try:
return int(value)
except (TypeError, ValueError):
return default
recipients: List[str] = []
if isinstance(raw_recipients, list):
for item in raw_recipients:
if isinstance(item, str):
addr = item.strip()
if addr:
recipients.append(addr)
smtp = {
'host': (raw_smtp.get('host') or '').strip(),
'port': _to_int(raw_smtp.get('port', 587), 587),
'username': (raw_smtp.get('username') or '').strip(),
'password': raw_smtp.get('password') or '',
'use_tls': bool(raw_smtp.get('use_tls', True)),
'use_ssl': bool(raw_smtp.get('use_ssl', False)),
'timeout': _to_int(raw_smtp.get('timeout', 30), 30),
}
if smtp['port'] <= 0:
smtp['port'] = 587
if smtp['timeout'] <= 0:
smtp['timeout'] = 30
return {
'enabled': bool(cfg.get('enabled', False)),
'from_address': (cfg.get('from_address') or '').strip(),
'smtp': smtp,
'recipients': recipients,
}
def now_iso() -> str:
"""Get the current time in ISO format."""
return datetime.now(UTC).isoformat()
def get_filename_from_url(url: str) -> str:
"""Convert URL to a safe filename."""
return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_")
def url_to_job_id(url: str) -> int:
"""Extract the job id from a Craigslist URL (last path segment without .html)."""
last = url.rstrip("/").split("/")[-1].replace(".html", "")
if last.isdigit():
return int(last)
return 0
def normalize_job_id(raw_id: Optional[str], url: Optional[str]) -> Optional[int]:
"""Normalize job id coming from details page (e.g., 'post id: 1234567890').
Fallback to URL-derived id when needed.
"""
if raw_id:
m = re.search(r"(\d{5,})", raw_id)
if m:
return int(m.group(1))
if url:
return url_to_job_id(url)
return None
def get_url_from_filename(name: str) -> str:
"""Generate a URL guess based on the name."""
# Best-effort URL guess from filename convention (underscores to slashes)
base = os.path.splitext(name)[0]
url_guess = f"https://{base.replace('_', '/')}"
return url_guess
def safe_get_text(element, default="N/A"):
"""Safely extract text from BeautifulSoup element."""
return element.get_text(strip=True) if element else default
def safe_get_attr(element, attr, default="N/A"):
"""Safely extract attribute from BeautifulSoup element."""
return element.get(attr, default) if element else default
def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_delay()) -> float:
"""Get a random delay between min_delay and max_delay seconds."""
return random.uniform(min_delay, max_delay)
def get_color_from_string(s: str) -> str:
"""Generate a color code from a string."""
hash_code = hash(s)
# Ensure the hash code is positive
hash_code = hash_code if hash_code >= 0 else -hash_code
# Extract RGB components
r, g, b = (hash_code & 0xFF0000) >> 16, (hash_code &
0x00FF00) >> 8, hash_code & 0x0000FF
# ensure RGB components are within 128-255
r = max(128, min(255, r))
g = max(128, min(255, g))
b = max(128, min(255, b))
# Combine RGB components back into a single integer
calculated = (r << 16) | (g << 8) | b
return f"#{calculated:06X}"
# ---- App helpers moved from app.py for reuse and readability -------------
def filter_jobs(
jobs: List[Dict[str, Any]],
region: Optional[str] = None,
keyword: Optional[str] = None,
negative_keywords: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Filter jobs by optional region, keyword, and negative keywords."""
filtered = jobs
if region:
filtered = [j for j in filtered if j.get("region") == region]
if keyword:
filtered = [j for j in filtered if j.get("keyword") == keyword]
if negative_keywords:
# Pre-compile regexes or just check substring?
# Scraper uses substring check. Let's do the same for consistency.
# Fields to check: title, company, location, description
# Note: description might contain HTML or be long.
# Normalize negative keywords
nks = [nk.lower() for nk in negative_keywords if nk]
def is_clean(job):
# Check all fields
text_blob = " ".join([
str(job.get("title") or ""),
str(job.get("company") or ""),
str(job.get("location") or ""),
str(job.get("description") or "")
]).lower()
for nk in nks:
if nk in text_blob:
return False
return True
filtered = [j for j in filtered if is_clean(j)]
return filtered
def get_job_by_id(job_id):
"""Fetch job details by job ID from the database."""
try:
from web.db import get_all_jobs # lazy import to avoid cycles
for j in get_all_jobs():
if str(j.get("id")) == str(job_id) or str(j.get("job_id")) == str(job_id):
return j
except Exception:
pass
return {}
def make_request_with_retry(url: str, max_retries: int = get_max_retries()) -> Optional[str]:
"""Make HTTP request with retry logic and proper error handling."""
# initial delay
delay = get_random_delay()
headers = {'User-Agent': get_user_agent()}
for attempt in range(max_retries):
try:
delay = get_random_delay() * (get_backoff_factor() ** attempt)
if attempt > 0:
time.sleep(delay)
resp = requests.get(url, headers=headers,
timeout=get_request_timeout())
if resp.status_code == 403:
return None
elif resp.status_code == 429:
time.sleep(delay * 3) # Longer delay for rate limiting
continue
elif resp.status_code == 404:
return None
elif resp.status_code == 410:
return None
elif resp.status_code >= 400:
if attempt == max_retries - 1:
return None
continue
resp.raise_for_status()
return resp.text
except requests.exceptions.Timeout:
pass
except requests.exceptions.ConnectionError:
pass
except requests.exceptions.RequestException:
pass
if attempt < max_retries - 1:
delay = get_random_delay() * (get_backoff_factor() ** attempt)
time.sleep(delay)
return None