Files
jobs/web/utils.py
georg.sinn-schirwitz 042a196718 remove caching
2025-09-08 14:44:46 +02:00

297 lines
7.9 KiB
Python

"""
Utility functions for the Craigslist scraper.
"""
from typing import Any, Optional, List, Dict
from datetime import datetime, UTC
import json
import os
import random
import re
import requests
import time
def get_config_file() -> str:
"""Return the path to the main config file."""
return os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'config', 'settings.json'))
def get_config() -> dict:
"""Return the loaded configuration dict."""
CONFIG = {}
try:
with open(get_config_file(), 'r', encoding='utf-8') as _f:
CONFIG = json.load(_f)
except Exception:
CONFIG = {}
return CONFIG
def get_users_from_settings() -> List[Dict]:
"""Return user entries from settings.json (array of dicts)."""
users = get_config().get('users', [])
if not isinstance(users, list):
return []
out: List[Dict] = []
for u in users:
if not isinstance(u, dict):
continue
username = (u.get('username') or '').strip()
if not username:
continue
out.append({
'username': username,
'is_admin': bool(u.get('is_admin', False)),
'password': u.get('password') or ''
})
return out
def initialize_users_from_settings() -> int:
"""Ensure users from settings.json exist in DB; set admin/active and passwords.
Returns number of users processed.
"""
from web.db import create_or_update_user # local import to avoid cycles
users = get_users_from_settings()
count = 0
for u in users:
pw = u.get('password') or None
create_or_update_user(u['username'], password=pw, is_admin=bool(
u.get('is_admin', False)), is_active=True)
count += 1
return count
def verify_credentials(username: str, password: str) -> bool:
"""Proxy to db.verify_user_credentials"""
from web.db import verify_user_credentials
return verify_user_credentials(username, password)
# --- Database configuration helpers ---
def get_mysql_config() -> dict:
"""Return MySQL/MariaDB connection settings."""
db = get_config().get('database', {}).get('mysql', {})
return {
'host': db.get('host', '127.0.0.1'),
'user': db.get('user', 'root'),
'password': db.get('password', ''),
'database': db.get('database', 'jobs'),
'port': db.get('port', 3306),
}
def get_http_setting(key: str, default=None):
return get_config().get('http', {}).get(key, default)
def get_paths() -> dict:
return get_config().get('paths', {})
def get_logs_dir() -> str:
return get_paths().get('logs_dir', 'logs')
def get_user_agent() -> str:
return get_http_setting('user_agent')
def get_request_timeout() -> int:
return get_http_setting('request_timeout')
def get_max_retries() -> int:
return get_http_setting('max_retries')
def get_backoff_factor() -> int:
return get_http_setting('backoff_factor')
def get_min_delay() -> int:
return get_http_setting('min_delay')
def get_max_delay() -> int:
return get_http_setting('max_delay')
def get_base_url() -> str:
return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel")
def now_iso() -> str:
"""Get the current time in ISO format."""
return datetime.now(UTC).isoformat()
def get_filename_from_url(url: str) -> str:
"""Convert URL to a safe filename."""
return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_")
def url_to_job_id(url: str) -> int:
"""Extract the job id from a Craigslist URL (last path segment without .html)."""
last = url.rstrip("/").split("/")[-1].replace(".html", "")
if last.isdigit():
return int(last)
return 0
def normalize_job_id(raw_id: Optional[str], url: Optional[str]) -> Optional[int]:
"""Normalize job id coming from details page (e.g., 'post id: 1234567890').
Fallback to URL-derived id when needed.
"""
if raw_id:
m = re.search(r"(\d{5,})", raw_id)
if m:
return int(m.group(1))
if url:
return url_to_job_id(url)
return None
def get_url_from_filename(name: str) -> str:
"""Generate a URL guess based on the name."""
# Best-effort URL guess from filename convention (underscores to slashes)
base = os.path.splitext(name)[0]
url_guess = f"https://{base.replace('_', '/')}"
return url_guess
def safe_get_text(element, default="N/A"):
"""Safely extract text from BeautifulSoup element."""
return element.get_text(strip=True) if element else default
def safe_get_attr(element, attr, default="N/A"):
"""Safely extract attribute from BeautifulSoup element."""
return element.get(attr, default) if element else default
def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_delay()) -> float:
"""Get a random delay between min_delay and max_delay seconds."""
return random.uniform(min_delay, max_delay)
def get_color_from_string(s: str) -> str:
"""Generate a color code from a string."""
hash_code = hash(s)
# Ensure the hash code is positive
hash_code = hash_code if hash_code >= 0 else -hash_code
# Extract RGB components
r, g, b = (hash_code & 0xFF0000) >> 16, (hash_code &
0x00FF00) >> 8, hash_code & 0x0000FF
# ensure RGB components are within 128-255
r = max(128, min(255, r))
g = max(128, min(255, g))
b = max(128, min(255, b))
# Combine RGB components back into a single integer
calculated = (r << 16) | (g << 8) | b
return f"#{calculated:06X}"
# ---- App helpers moved from app.py for reuse and readability -------------
def filter_jobs(
jobs: List[Dict[str, Any]],
region: Optional[str] = None,
keyword: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Filter jobs by optional region and keyword."""
filtered = jobs
if region:
filtered = [j for j in filtered if j.get("region") == region]
if keyword:
filtered = [j for j in filtered if j.get("keyword") == keyword]
return filtered
def get_job_by_id(job_id):
"""Fetch job details by job ID from the database."""
try:
from web.db import get_all_jobs # lazy import to avoid cycles
for j in get_all_jobs():
if str(j.get("id")) == str(job_id) or str(j.get("job_id")) == str(job_id):
return j
except Exception:
pass
return {}
def make_request_with_retry(url: str, max_retries: int = get_max_retries()) -> Optional[str]:
"""Make HTTP request with retry logic and proper error handling."""
# initial delay
delay = get_random_delay()
headers = {'User-Agent': get_user_agent()}
for attempt in range(max_retries):
try:
delay = get_random_delay() * (get_backoff_factor() ** attempt)
if attempt > 0:
time.sleep(delay)
resp = requests.get(url, headers=headers,
timeout=get_request_timeout())
if resp.status_code == 403:
return None
elif resp.status_code == 429:
time.sleep(delay * 3) # Longer delay for rate limiting
continue
elif resp.status_code == 404:
return None
elif resp.status_code == 410:
return None
elif resp.status_code >= 400:
if attempt == max_retries - 1:
return None
continue
resp.raise_for_status()
return resp.text
except requests.exceptions.Timeout:
pass
except requests.exceptions.ConnectionError:
pass
except requests.exceptions.RequestException:
pass
if attempt < max_retries - 1:
delay = get_random_delay() * (get_backoff_factor() ** attempt)
time.sleep(delay)
return None