""" Utility functions for the Craigslist scraper. """ from typing import Any, Optional as _Optional from datetime import datetime, UTC import json import os import random import re import requests import time from typing import Optional, List, Dict def get_config_file() -> str: """Return the path to the main config file.""" return os.path.abspath(os.path.join( os.path.dirname(__file__), '..', 'config', 'settings.json')) def get_config() -> dict: """Return the loaded configuration dict.""" CONFIG = {} try: with open(get_config_file(), 'r', encoding='utf-8') as _f: CONFIG = json.load(_f) except Exception: CONFIG = {} return CONFIG def get_users_from_settings() -> List[Dict]: """Return user entries from settings.json (array of dicts).""" users = get_config().get('users', []) if not isinstance(users, list): return [] out: List[Dict] = [] for u in users: if not isinstance(u, dict): continue username = (u.get('username') or '').strip() if not username: continue out.append({ 'username': username, 'is_admin': bool(u.get('is_admin', False)), 'password': u.get('password') or '' }) return out def initialize_users_from_settings() -> int: """Ensure users from settings.json exist in DB; set admin/active and passwords. Returns number of users processed. """ from web.db import create_or_update_user # local import to avoid cycles users = get_users_from_settings() count = 0 for u in users: pw = u.get('password') or None create_or_update_user(u['username'], password=pw, is_admin=bool( u.get('is_admin', False)), is_active=True) count += 1 return count def verify_credentials(username: str, password: str) -> bool: """Proxy to db.verify_user_credentials""" from web.db import verify_user_credentials return verify_user_credentials(username, password) # --- Database configuration helpers --- def get_mysql_config() -> dict: """Return MySQL/MariaDB connection settings.""" db = get_config().get('database', {}).get('mysql', {}) return { 'host': db.get('host', '127.0.0.1'), 'user': db.get('user', 'root'), 'password': db.get('password', ''), 'database': db.get('database', 'jobs'), 'port': db.get('port', 3306), } def get_http_setting(key: str, default=None): return get_config().get('http', {}).get(key, default) def get_paths() -> dict: return get_config().get('paths', {}) def get_cache_dir() -> str: return get_paths().get('cache_dir', 'cache') def get_logs_dir() -> str: return get_paths().get('logs_dir', 'logs') def get_user_agent() -> str: return get_http_setting('user_agent') def get_request_timeout() -> int: return get_http_setting('request_timeout') def get_max_retries() -> int: return get_http_setting('max_retries') def get_backoff_factor() -> int: return get_http_setting('backoff_factor') def get_min_delay() -> int: return get_http_setting('min_delay') def get_max_delay() -> int: return get_http_setting('max_delay') def get_base_url() -> str: return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel") def ensure_cache_dir(): """Ensure cache directory exists.""" os.makedirs(get_cache_dir(), exist_ok=True) def now_iso() -> str: """Get the current time in ISO format.""" return datetime.now(UTC).isoformat() def get_filename_from_url(url: str) -> str: """Convert URL to a safe filename.""" return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_") def url_to_job_id(url: str) -> int: """Extract the job id from a Craigslist URL (last path segment without .html).""" last = url.rstrip("/").split("/")[-1].replace(".html", "") if last.isdigit(): return int(last) return 0 def normalize_job_id(raw_id: Optional[str], url: Optional[str]) -> Optional[int]: """Normalize job id coming from details page (e.g., 'post id: 1234567890'). Fallback to URL-derived id when needed. """ if raw_id: m = re.search(r"(\d{5,})", raw_id) if m: return int(m.group(1)) if url: return url_to_job_id(url) return None def get_url_from_filename(name: str) -> str: """Generate a URL guess based on the name.""" # Best-effort URL guess from filename convention (underscores to slashes) base = os.path.splitext(name)[0] url_guess = f"https://{base.replace('_', '/')}" return url_guess def get_cached_content(url: str) -> str: """Get cached content for URL.""" with open(get_cache_path(url), "r", encoding="utf-8") as f: return f.read() def safe_get_text(element, default="N/A"): """Safely extract text from BeautifulSoup element.""" return element.get_text(strip=True) if element else default def safe_get_attr(element, attr, default="N/A"): """Safely extract attribute from BeautifulSoup element.""" return element.get(attr, default) if element else default def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_delay()) -> float: """Get a random delay between min_delay and max_delay seconds.""" return random.uniform(min_delay, max_delay) def get_cache_path(url: str) -> str: """Get cache file path for URL.""" return os.path.join(get_cache_dir(), f"{get_filename_from_url(url)}.html") def cache_page(url: str, content: str): """Cache the page content with a timestamp.""" cache_path = get_cache_path(url) with open(cache_path, "w", encoding="utf-8") as f: f.write(content) # Update the file's modification time to the current time os.utime(cache_path, None) def is_cached(url: str) -> bool: """Check if the page is cached and not older than 24 hours.""" cache_path = get_cache_path(url) if not os.path.isfile(cache_path): return False # Check the file's age if it's a search result page if 'search' in url: file_age = time.time() - os.path.getmtime(cache_path) if file_age > 24 * 3600: # 24 hours in seconds return False return True def is_cache_stale(last_modified: str, days: int = 1) -> bool: """Check if the cached page is stale (older than 24 hours).""" if not last_modified: return True last_datetime = datetime.fromisoformat(last_modified) file_age = time.time() - last_datetime.timestamp() return file_age > days * 24 * 3600 # days in seconds def delete_cached_page(url: str): cache_fp = get_cache_path(url) if os.path.exists(cache_fp): try: os.remove(cache_fp) except Exception: pass def get_color_from_string(s: str) -> str: """Generate a color code from a string.""" hash_code = hash(s) # Ensure the hash code is positive hash_code = hash_code if hash_code >= 0 else -hash_code # Extract RGB components r, g, b = (hash_code & 0xFF0000) >> 16, (hash_code & 0x00FF00) >> 8, hash_code & 0x0000FF # ensure RGB components are within 128-255 r = max(128, min(255, r)) g = max(128, min(255, g)) b = max(128, min(255, b)) # Combine RGB components back into a single integer calculated = (r << 16) | (g << 8) | b return f"#{calculated:06X}" # ---- App helpers moved from app.py for reuse and readability ------------- def filter_jobs( jobs: List[Dict[str, Any]], region: _Optional[str] = None, keyword: _Optional[str] = None, ) -> List[Dict[str, Any]]: """Filter jobs by optional region and keyword.""" filtered = jobs if region: filtered = [j for j in filtered if j.get("region") == region] if keyword: filtered = [j for j in filtered if j.get("keyword") == keyword] return filtered def get_job_by_id(job_id): """Fetch job details by job ID from the database.""" try: from web.db import get_all_jobs # lazy import to avoid cycles for j in get_all_jobs(): if str(j.get("id")) == str(job_id) or str(j.get("job_id")) == str(job_id): return j except Exception: pass return {} def make_request_with_retry(url: str, max_retries: int = get_max_retries()) -> Optional[str]: """Make HTTP request with retry logic and proper error handling.""" # initial delay delay = get_random_delay() headers = {'User-Agent': get_user_agent()} for attempt in range(max_retries): try: delay = get_random_delay() * (get_backoff_factor() ** attempt) if attempt > 0: time.sleep(delay) resp = requests.get(url, headers=headers, timeout=get_request_timeout()) if resp.status_code == 403: return None elif resp.status_code == 429: time.sleep(delay * 3) # Longer delay for rate limiting continue elif resp.status_code == 404: return None elif resp.status_code == 410: return None elif resp.status_code >= 400: if attempt == max_retries - 1: return None continue resp.raise_for_status() return resp.text except requests.exceptions.Timeout: pass except requests.exceptions.ConnectionError: pass except requests.exceptions.RequestException: pass if attempt < max_retries - 1: delay = get_random_delay() * (get_backoff_factor() ** attempt) time.sleep(delay) return None