""" Utility functions for the Craigslist scraper. """ from typing import Any, Optional, List, Dict from datetime import datetime, UTC import json import os import random import re import requests import time def get_config_file() -> str: """Return the path to the main config file.""" return os.path.abspath(os.path.join( os.path.dirname(__file__), '..', 'config', 'settings.json')) def get_config() -> dict: """Return the loaded configuration dict.""" CONFIG = {} try: with open(get_config_file(), 'r', encoding='utf-8') as _f: CONFIG = json.load(_f) except Exception: CONFIG = {} return CONFIG def get_users_from_settings() -> List[Dict]: """Return user entries from settings.json (array of dicts).""" users = get_config().get('users', []) if not isinstance(users, list): return [] out: List[Dict] = [] for u in users: if not isinstance(u, dict): continue username = (u.get('username') or '').strip() if not username: continue out.append({ 'username': username, 'is_admin': bool(u.get('is_admin', False)), 'password': u.get('password') or '' }) return out def initialize_users_from_settings() -> int: """Ensure users from settings.json exist in DB; set admin/active and passwords. Returns number of users processed. """ from web.db import create_or_update_user # local import to avoid cycles users = get_users_from_settings() count = 0 for u in users: pw = u.get('password') or None create_or_update_user(u['username'], password=pw, is_admin=bool( u.get('is_admin', False)), is_active=True) count += 1 return count def verify_credentials(username: str, password: str) -> bool: """Proxy to db.verify_user_credentials""" from web.db import verify_user_credentials return verify_user_credentials(username, password) # --- Database configuration helpers --- def get_mysql_config() -> dict: """Return MySQL/MariaDB connection settings.""" db = get_config().get('database', {}).get('mysql', {}) return { 'host': db.get('host', '127.0.0.1'), 'user': db.get('user', 'root'), 'password': db.get('password', ''), 'database': db.get('database', 'jobs'), 'port': db.get('port', 3306), } def get_http_setting(key: str, default=None): return get_config().get('http', {}).get(key, default) def get_paths() -> dict: return get_config().get('paths', {}) def get_logs_dir() -> str: return get_paths().get('logs_dir', 'logs') def get_user_agent() -> str: return get_http_setting('user_agent') def get_request_timeout() -> int: return get_http_setting('request_timeout') def get_max_retries() -> int: return get_http_setting('max_retries') def get_backoff_factor() -> int: return get_http_setting('backoff_factor') def get_min_delay() -> int: return get_http_setting('min_delay') def get_max_delay() -> int: return get_http_setting('max_delay') def get_base_url() -> str: return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel") def get_negative_keywords() -> List[str]: """Return normalized list of negative keywords from config.""" raw = get_config().get('scraper', {}).get('negative_keywords', []) if not isinstance(raw, list): return [] cleaned: List[str] = [] for item in raw: if not isinstance(item, str): continue val = item.strip() if not val: continue cleaned.append(val.lower()) return cleaned def get_email_settings() -> Dict[str, Any]: """Return normalized email settings from config.""" cfg = get_config().get('email', {}) if not isinstance(cfg, dict): cfg = {} raw_smtp = cfg.get('smtp', {}) if isinstance(cfg.get('smtp'), dict) else {} raw_recipients = cfg.get('recipients', []) def _to_int(value, default): try: return int(value) except (TypeError, ValueError): return default recipients: List[str] = [] if isinstance(raw_recipients, list): for item in raw_recipients: if isinstance(item, str): addr = item.strip() if addr: recipients.append(addr) smtp = { 'host': (raw_smtp.get('host') or '').strip(), 'port': _to_int(raw_smtp.get('port', 587), 587), 'username': (raw_smtp.get('username') or '').strip(), 'password': raw_smtp.get('password') or '', 'use_tls': bool(raw_smtp.get('use_tls', True)), 'use_ssl': bool(raw_smtp.get('use_ssl', False)), 'timeout': _to_int(raw_smtp.get('timeout', 30), 30), } if smtp['port'] <= 0: smtp['port'] = 587 if smtp['timeout'] <= 0: smtp['timeout'] = 30 return { 'enabled': bool(cfg.get('enabled', False)), 'from_address': (cfg.get('from_address') or '').strip(), 'smtp': smtp, 'recipients': recipients, } def now_iso() -> str: """Get the current time in ISO format.""" return datetime.now(UTC).isoformat() def get_filename_from_url(url: str) -> str: """Convert URL to a safe filename.""" return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_") def url_to_job_id(url: str) -> int: """Extract the job id from a Craigslist URL (last path segment without .html).""" last = url.rstrip("/").split("/")[-1].replace(".html", "") if last.isdigit(): return int(last) return 0 def normalize_job_id(raw_id: Optional[str], url: Optional[str]) -> Optional[int]: """Normalize job id coming from details page (e.g., 'post id: 1234567890'). Fallback to URL-derived id when needed. """ if raw_id: m = re.search(r"(\d{5,})", raw_id) if m: return int(m.group(1)) if url: return url_to_job_id(url) return None def get_url_from_filename(name: str) -> str: """Generate a URL guess based on the name.""" # Best-effort URL guess from filename convention (underscores to slashes) base = os.path.splitext(name)[0] url_guess = f"https://{base.replace('_', '/')}" return url_guess def safe_get_text(element, default="N/A"): """Safely extract text from BeautifulSoup element.""" return element.get_text(strip=True) if element else default def safe_get_attr(element, attr, default="N/A"): """Safely extract attribute from BeautifulSoup element.""" return element.get(attr, default) if element else default def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_delay()) -> float: """Get a random delay between min_delay and max_delay seconds.""" return random.uniform(min_delay, max_delay) def get_color_from_string(s: str) -> str: """Generate a color code from a string.""" hash_code = hash(s) # Ensure the hash code is positive hash_code = hash_code if hash_code >= 0 else -hash_code # Extract RGB components r, g, b = (hash_code & 0xFF0000) >> 16, (hash_code & 0x00FF00) >> 8, hash_code & 0x0000FF # ensure RGB components are within 128-255 r = max(128, min(255, r)) g = max(128, min(255, g)) b = max(128, min(255, b)) # Combine RGB components back into a single integer calculated = (r << 16) | (g << 8) | b return f"#{calculated:06X}" # ---- App helpers moved from app.py for reuse and readability ------------- def filter_jobs( jobs: List[Dict[str, Any]], region: Optional[str] = None, keyword: Optional[str] = None, negative_keywords: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: """Filter jobs by optional region, keyword, and negative keywords.""" filtered = jobs if region: filtered = [j for j in filtered if j.get("region") == region] if keyword: filtered = [j for j in filtered if j.get("keyword") == keyword] if negative_keywords: # Pre-compile regexes or just check substring? # Scraper uses substring check. Let's do the same for consistency. # Fields to check: title, company, location, description # Note: description might contain HTML or be long. # Normalize negative keywords nks = [nk.lower() for nk in negative_keywords if nk] def is_clean(job): # Check all fields text_blob = " ".join([ str(job.get("title") or ""), str(job.get("company") or ""), str(job.get("location") or ""), str(job.get("description") or "") ]).lower() for nk in nks: if nk in text_blob: return False return True filtered = [j for j in filtered if is_clean(j)] return filtered def get_job_by_id(job_id): """Fetch job details by job ID from the database.""" try: from web.db import get_all_jobs # lazy import to avoid cycles for j in get_all_jobs(): if str(j.get("id")) == str(job_id) or str(j.get("job_id")) == str(job_id): return j except Exception: pass return {} def make_request_with_retry(url: str, max_retries: int = get_max_retries()) -> Optional[str]: """Make HTTP request with retry logic and proper error handling.""" # initial delay delay = get_random_delay() headers = {'User-Agent': get_user_agent()} for attempt in range(max_retries): try: delay = get_random_delay() * (get_backoff_factor() ** attempt) if attempt > 0: time.sleep(delay) resp = requests.get(url, headers=headers, timeout=get_request_timeout()) if resp.status_code == 403: return None elif resp.status_code == 429: time.sleep(delay * 3) # Longer delay for rate limiting continue elif resp.status_code == 404: return None elif resp.status_code == 410: return None elif resp.status_code >= 400: if attempt == max_retries - 1: return None continue resp.raise_for_status() return resp.text except requests.exceptions.Timeout: pass except requests.exceptions.ConnectionError: pass except requests.exceptions.RequestException: pass if attempt < max_retries - 1: delay = get_random_delay() * (get_backoff_factor() ** attempt) time.sleep(delay) return None