""" Utility functions for the Craigslist scraper. """ from typing import Any, Optional, List, Dict from datetime import datetime, UTC import json import os import random import re import requests import time def get_config_file() -> str: """Return the path to the main config file.""" return os.path.abspath(os.path.join( os.path.dirname(__file__), '..', 'config', 'settings.json')) def get_config() -> dict: """Return the loaded configuration dict.""" CONFIG = {} try: with open(get_config_file(), 'r', encoding='utf-8') as _f: CONFIG = json.load(_f) except Exception: CONFIG = {} return CONFIG def get_users_from_settings() -> List[Dict]: """Return user entries from settings.json (array of dicts).""" users = get_config().get('users', []) if not isinstance(users, list): return [] out: List[Dict] = [] for u in users: if not isinstance(u, dict): continue username = (u.get('username') or '').strip() if not username: continue out.append({ 'username': username, 'is_admin': bool(u.get('is_admin', False)), 'password': u.get('password') or '' }) return out def initialize_users_from_settings() -> int: """Ensure users from settings.json exist in DB; set admin/active and passwords. Returns number of users processed. """ from web.db import create_or_update_user # local import to avoid cycles users = get_users_from_settings() count = 0 for u in users: pw = u.get('password') or None create_or_update_user(u['username'], password=pw, is_admin=bool( u.get('is_admin', False)), is_active=True) count += 1 return count def verify_credentials(username: str, password: str) -> bool: """Proxy to db.verify_user_credentials""" from web.db import verify_user_credentials return verify_user_credentials(username, password) # --- Database configuration helpers --- def get_mysql_config() -> dict: """Return MySQL/MariaDB connection settings.""" db = get_config().get('database', {}).get('mysql', {}) return { 'host': db.get('host', '127.0.0.1'), 'user': db.get('user', 'root'), 'password': db.get('password', ''), 'database': db.get('database', 'jobs'), 'port': db.get('port', 3306), } def get_http_setting(key: str, default=None): return get_config().get('http', {}).get(key, default) def get_paths() -> dict: return get_config().get('paths', {}) def get_cache_dir() -> str: return get_paths().get('cache_dir', 'cache') def get_logs_dir() -> str: return get_paths().get('logs_dir', 'logs') def get_user_agent() -> str: return get_http_setting('user_agent') def get_request_timeout() -> int: return get_http_setting('request_timeout') def get_max_retries() -> int: return get_http_setting('max_retries') def get_backoff_factor() -> int: return get_http_setting('backoff_factor') def get_min_delay() -> int: return get_http_setting('min_delay') def get_max_delay() -> int: return get_http_setting('max_delay') def get_base_url() -> str: return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel") def ensure_cache_dir(): """Ensure cache directory exists.""" os.makedirs(get_cache_dir(), exist_ok=True) def now_iso() -> str: """Get the current time in ISO format.""" return datetime.now(UTC).isoformat() def get_filename_from_url(url: str) -> str: """Convert URL to a safe filename.""" return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_") def url_to_job_id(url: str) -> int: """Extract the job id from a Craigslist URL (last path segment without .html).""" last = url.rstrip("/").split("/")[-1].replace(".html", "") if last.isdigit(): return int(last) return 0 def normalize_job_id(raw_id: Optional[str], url: Optional[str]) -> Optional[int]: """Normalize job id coming from details page (e.g., 'post id: 1234567890'). Fallback to URL-derived id when needed. """ if raw_id: m = re.search(r"(\d{5,})", raw_id) if m: return int(m.group(1)) if url: return url_to_job_id(url) return None def get_url_from_filename(name: str) -> str: """Generate a URL guess based on the name.""" # Best-effort URL guess from filename convention (underscores to slashes) base = os.path.splitext(name)[0] url_guess = f"https://{base.replace('_', '/')}" return url_guess def get_cached_content(url: str) -> str: """Get cached content for URL.""" with open(get_cache_path(url), "r", encoding="utf-8") as f: return f.read() def safe_get_text(element, default="N/A"): """Safely extract text from BeautifulSoup element.""" return element.get_text(strip=True) if element else default def safe_get_attr(element, attr, default="N/A"): """Safely extract attribute from BeautifulSoup element.""" return element.get(attr, default) if element else default def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_delay()) -> float: """Get a random delay between min_delay and max_delay seconds.""" return random.uniform(min_delay, max_delay) def get_cache_path(url: str) -> str: """Get cache file path for URL.""" return os.path.join(get_cache_dir(), f"{get_filename_from_url(url)}.html") def cache_page(url: str, content: str): """Cache the page content with a timestamp.""" cache_path = get_cache_path(url) with open(cache_path, "w", encoding="utf-8") as f: f.write(content) # Update the file's modification time to the current time os.utime(cache_path, None) def is_cached(url: str) -> bool: """Check if the page is cached and not older than 24 hours.""" cache_path = get_cache_path(url) if not os.path.isfile(cache_path): return False # Check the file's age if it's a search result page if 'search' in url: file_age = time.time() - os.path.getmtime(cache_path) if file_age > 24 * 3600: # 24 hours in seconds return False return True def is_cache_stale(last_modified: str, days: int = 1) -> bool: """Check if the cached page is stale (older than 24 hours).""" if not last_modified: return True last_datetime = datetime.fromisoformat(last_modified) file_age = time.time() - last_datetime.timestamp() return file_age > days * 24 * 3600 # days in seconds def delete_cached_page(url: str): cache_fp = get_cache_path(url) if os.path.exists(cache_fp): try: os.remove(cache_fp) except Exception: pass def get_color_from_string(s: str) -> str: """Generate a color code from a string.""" hash_code = hash(s) # Ensure the hash code is positive hash_code = hash_code if hash_code >= 0 else -hash_code # Extract RGB components r, g, b = (hash_code & 0xFF0000) >> 16, (hash_code & 0x00FF00) >> 8, hash_code & 0x0000FF # ensure RGB components are within 128-255 r = max(128, min(255, r)) g = max(128, min(255, g)) b = max(128, min(255, b)) # Combine RGB components back into a single integer calculated = (r << 16) | (g << 8) | b return f"#{calculated:06X}" # ---- App helpers moved from app.py for reuse and readability ------------- def filter_jobs( jobs: List[Dict[str, Any]], region: Optional[str] = None, keyword: Optional[str] = None, ) -> List[Dict[str, Any]]: """Filter jobs by optional region and keyword.""" filtered = jobs if region: filtered = [j for j in filtered if j.get("region") == region] if keyword: filtered = [j for j in filtered if j.get("keyword") == keyword] return filtered def get_job_by_id(job_id): """Fetch job details by job ID from the database.""" try: from web.db import get_all_jobs # lazy import to avoid cycles for j in get_all_jobs(): if str(j.get("id")) == str(job_id) or str(j.get("job_id")) == str(job_id): return j except Exception: pass return {} def make_request_with_retry(url: str, max_retries: int = get_max_retries()) -> Optional[str]: """Make HTTP request with retry logic and proper error handling.""" # initial delay delay = get_random_delay() headers = {'User-Agent': get_user_agent()} for attempt in range(max_retries): try: delay = get_random_delay() * (get_backoff_factor() ** attempt) if attempt > 0: time.sleep(delay) resp = requests.get(url, headers=headers, timeout=get_request_timeout()) if resp.status_code == 403: return None elif resp.status_code == 429: time.sleep(delay * 3) # Longer delay for rate limiting continue elif resp.status_code == 404: return None elif resp.status_code == 410: return None elif resp.status_code >= 400: if attempt == max_retries - 1: return None continue resp.raise_for_status() return resp.text except requests.exceptions.Timeout: pass except requests.exceptions.ConnectionError: pass except requests.exceptions.RequestException: pass if attempt < max_retries - 1: delay = get_random_delay() * (get_backoff_factor() ** attempt) time.sleep(delay) return None