Files
jobs/web/utils.py
georg.sinn-schirwitz 23a67d7fe1 initial project commit
2025-08-29 15:07:58 +02:00

337 lines
9.7 KiB
Python

"""
Utility functions for the Craigslist scraper.
"""
from typing import Any, Optional as _Optional
from datetime import datetime, UTC
import json
import os
import random
import re
import requests
import time
from typing import Optional, List, Dict
def get_config_file() -> str:
"""Return the path to the main config file."""
return os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'config', 'settings.json'))
def get_config() -> dict:
"""Return the loaded configuration dict."""
CONFIG = {}
try:
with open(get_config_file(), 'r', encoding='utf-8') as _f:
CONFIG = json.load(_f)
except Exception:
CONFIG = {}
return CONFIG
def get_users_from_settings() -> List[Dict]:
"""Return user entries from settings.json (array of dicts)."""
users = get_config().get('users', [])
if not isinstance(users, list):
return []
out: List[Dict] = []
for u in users:
if not isinstance(u, dict):
continue
username = (u.get('username') or '').strip()
if not username:
continue
out.append({
'username': username,
'is_admin': bool(u.get('is_admin', False)),
'password': u.get('password') or ''
})
return out
def initialize_users_from_settings() -> int:
"""Ensure users from settings.json exist in DB; set admin/active and passwords.
Returns number of users processed.
"""
from web.db import create_or_update_user # local import to avoid cycles
users = get_users_from_settings()
count = 0
for u in users:
pw = u.get('password') or None
create_or_update_user(u['username'], password=pw, is_admin=bool(
u.get('is_admin', False)), is_active=True)
count += 1
return count
def verify_credentials(username: str, password: str) -> bool:
"""Proxy to db.verify_user_credentials"""
from web.db import verify_user_credentials
return verify_user_credentials(username, password)
# --- Database configuration helpers ---
def get_mysql_config() -> dict:
"""Return MySQL/MariaDB connection settings."""
db = get_config().get('database', {}).get('mysql', {})
return {
'host': db.get('host', '127.0.0.1'),
'user': db.get('user', 'root'),
'password': db.get('password', ''),
'database': db.get('database', 'jobs'),
'port': db.get('port', 3306),
}
def get_http_setting(key: str, default=None):
return get_config().get('http', {}).get(key, default)
def get_paths() -> dict:
return get_config().get('paths', {})
def get_cache_dir() -> str:
return get_paths().get('cache_dir', 'cache')
def get_logs_dir() -> str:
return get_paths().get('logs_dir', 'logs')
def get_user_agent() -> str:
return get_http_setting('user_agent')
def get_request_timeout() -> int:
return get_http_setting('request_timeout')
def get_max_retries() -> int:
return get_http_setting('max_retries')
def get_backoff_factor() -> int:
return get_http_setting('backoff_factor')
def get_min_delay() -> int:
return get_http_setting('min_delay')
def get_max_delay() -> int:
return get_http_setting('max_delay')
def get_base_url() -> str:
return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel")
def ensure_cache_dir():
"""Ensure cache directory exists."""
os.makedirs(get_cache_dir(), exist_ok=True)
def now_iso() -> str:
"""Get the current time in ISO format."""
return datetime.now(UTC).isoformat()
def get_filename_from_url(url: str) -> str:
"""Convert URL to a safe filename."""
return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_")
def url_to_job_id(url: str) -> int:
"""Extract the job id from a Craigslist URL (last path segment without .html)."""
last = url.rstrip("/").split("/")[-1].replace(".html", "")
if last.isdigit():
return int(last)
return 0
def normalize_job_id(raw_id: Optional[str], url: Optional[str]) -> Optional[int]:
"""Normalize job id coming from details page (e.g., 'post id: 1234567890').
Fallback to URL-derived id when needed.
"""
if raw_id:
m = re.search(r"(\d{5,})", raw_id)
if m:
return int(m.group(1))
if url:
return url_to_job_id(url)
return None
def get_url_from_filename(name: str) -> str:
"""Generate a URL guess based on the name."""
# Best-effort URL guess from filename convention (underscores to slashes)
base = os.path.splitext(name)[0]
url_guess = f"https://{base.replace('_', '/')}"
return url_guess
def get_cached_content(url: str) -> str:
"""Get cached content for URL."""
with open(get_cache_path(url), "r", encoding="utf-8") as f:
return f.read()
def safe_get_text(element, default="N/A"):
"""Safely extract text from BeautifulSoup element."""
return element.get_text(strip=True) if element else default
def safe_get_attr(element, attr, default="N/A"):
"""Safely extract attribute from BeautifulSoup element."""
return element.get(attr, default) if element else default
def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_delay()) -> float:
"""Get a random delay between min_delay and max_delay seconds."""
return random.uniform(min_delay, max_delay)
def get_cache_path(url: str) -> str:
"""Get cache file path for URL."""
return os.path.join(get_cache_dir(), f"{get_filename_from_url(url)}.html")
def cache_page(url: str, content: str):
"""Cache the page content with a timestamp."""
cache_path = get_cache_path(url)
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:
f.write(content)
# Update the file's modification time to the current time
os.utime(cache_path, None)
def is_cached(url: str) -> bool:
"""Check if the page is cached and not older than 24 hours."""
cache_path = get_cache_path(url)
if not os.path.isfile(cache_path):
return False
# Check the file's age if it's a search result page
if 'search' in url:
file_age = time.time() - os.path.getmtime(cache_path)
if file_age > 24 * 3600: # 24 hours in seconds
return False
return True
def is_cache_stale(last_modified: str, days: int = 1) -> bool:
"""Check if the cached page is stale (older than 24 hours)."""
if not last_modified:
return True
last_datetime = datetime.fromisoformat(last_modified)
file_age = time.time() - last_datetime.timestamp()
return file_age > days * 24 * 3600 # days in seconds
def delete_cached_page(url: str):
cache_fp = get_cache_path(url)
if os.path.exists(cache_fp):
try:
os.remove(cache_fp)
except Exception:
pass
def get_color_from_string(s: str) -> str:
"""Generate a color code from a string."""
hash_code = hash(s)
# Ensure the hash code is positive
hash_code = hash_code if hash_code >= 0 else -hash_code
# Extract RGB components
r, g, b = (hash_code & 0xFF0000) >> 16, (hash_code &
0x00FF00) >> 8, hash_code & 0x0000FF
# ensure RGB components are within 128-255
r = max(128, min(255, r))
g = max(128, min(255, g))
b = max(128, min(255, b))
# Combine RGB components back into a single integer
calculated = (r << 16) | (g << 8) | b
return f"#{calculated:06X}"
# ---- App helpers moved from app.py for reuse and readability -------------
def filter_jobs(
jobs: List[Dict[str, Any]],
region: _Optional[str] = None,
keyword: _Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Filter jobs by optional region and keyword."""
filtered = jobs
if region:
filtered = [j for j in filtered if j.get("region") == region]
if keyword:
filtered = [j for j in filtered if j.get("keyword") == keyword]
return filtered
def get_job_by_id(job_id):
"""Fetch job details by job ID from the database."""
try:
from web.db import get_all_jobs # lazy import to avoid cycles
for j in get_all_jobs():
if str(j.get("id")) == str(job_id) or str(j.get("job_id")) == str(job_id):
return j
except Exception:
pass
return {}
def make_request_with_retry(url: str, max_retries: int = get_max_retries()) -> Optional[str]:
"""Make HTTP request with retry logic and proper error handling."""
# initial delay
delay = get_random_delay()
headers = {'User-Agent': get_user_agent()}
for attempt in range(max_retries):
try:
delay = get_random_delay() * (get_backoff_factor() ** attempt)
if attempt > 0:
time.sleep(delay)
resp = requests.get(url, headers=headers,
timeout=get_request_timeout())
if resp.status_code == 403:
return None
elif resp.status_code == 429:
time.sleep(delay * 3) # Longer delay for rate limiting
continue
elif resp.status_code == 404:
return None
elif resp.status_code == 410:
return None
elif resp.status_code >= 400:
if attempt == max_retries - 1:
return None
continue
resp.raise_for_status()
return resp.text
except requests.exceptions.Timeout:
pass
except requests.exceptions.ConnectionError:
pass
except requests.exceptions.RequestException:
pass
if attempt < max_retries - 1:
delay = get_random_delay() * (get_backoff_factor() ** attempt)
time.sleep(delay)
return None