273 lines
7.9 KiB
Python
273 lines
7.9 KiB
Python
"""
|
|
Utility functions for the Craigslist scraper.
|
|
"""
|
|
|
|
from typing import Any, Optional, List, Dict
|
|
from datetime import datetime, UTC
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import requests
|
|
import time
|
|
|
|
|
|
def get_config_file() -> str:
|
|
"""Return the path to the main config file."""
|
|
return os.path.abspath(os.path.join(
|
|
os.path.dirname(__file__), '..', 'config', 'settings.json'))
|
|
|
|
|
|
def get_config() -> dict:
|
|
"""Return the loaded configuration dict."""
|
|
CONFIG = {}
|
|
try:
|
|
with open(get_config_file(), 'r', encoding='utf-8') as _f:
|
|
CONFIG = json.load(_f)
|
|
except Exception:
|
|
CONFIG = {}
|
|
return CONFIG
|
|
|
|
|
|
def get_users_from_settings() -> List[Dict]:
|
|
"""Return user entries from settings.json (array of dicts)."""
|
|
users = get_config().get('users', [])
|
|
if not isinstance(users, list):
|
|
return []
|
|
out: List[Dict] = []
|
|
for u in users:
|
|
if not isinstance(u, dict):
|
|
continue
|
|
username = (u.get('username') or '').strip()
|
|
if not username:
|
|
continue
|
|
out.append({
|
|
'username': username,
|
|
'is_admin': bool(u.get('is_admin', False)),
|
|
'password': u.get('password') or ''
|
|
})
|
|
return out
|
|
|
|
|
|
def initialize_users_from_settings() -> int:
|
|
"""Ensure users from settings.json exist in DB; set admin/active and passwords.
|
|
|
|
Returns number of users processed.
|
|
"""
|
|
from web.db import create_or_update_user # local import to avoid cycles
|
|
users = get_users_from_settings()
|
|
count = 0
|
|
for u in users:
|
|
pw = u.get('password') or None
|
|
create_or_update_user(u['username'], password=pw, is_admin=bool(
|
|
u.get('is_admin', False)), is_active=True)
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def verify_credentials(username: str, password: str) -> bool:
|
|
"""Proxy to db.verify_user_credentials"""
|
|
from web.db import verify_user_credentials
|
|
return verify_user_credentials(username, password)
|
|
|
|
|
|
# --- Database configuration helpers ---
|
|
|
|
def get_mysql_config() -> dict:
|
|
"""Return MySQL/MariaDB connection settings."""
|
|
db = get_config().get('database', {}).get('mysql', {})
|
|
return {
|
|
'host': db.get('host', '127.0.0.1'),
|
|
'user': db.get('user', 'root'),
|
|
'password': db.get('password', ''),
|
|
'database': db.get('database', 'jobs'),
|
|
'port': db.get('port', 3306),
|
|
}
|
|
|
|
|
|
def get_http_setting(key: str, default=None):
|
|
return get_config().get('http', {}).get(key, default)
|
|
|
|
|
|
def get_paths() -> dict:
|
|
return get_config().get('paths', {})
|
|
|
|
|
|
def get_logs_dir() -> str:
|
|
return get_paths().get('logs_dir', 'logs')
|
|
|
|
|
|
def get_user_agent() -> str:
|
|
return get_http_setting('user_agent')
|
|
|
|
|
|
def get_request_timeout() -> int:
|
|
return get_http_setting('request_timeout')
|
|
|
|
|
|
def get_max_retries() -> int:
|
|
return get_http_setting('max_retries')
|
|
|
|
|
|
def get_backoff_factor() -> int:
|
|
return get_http_setting('backoff_factor')
|
|
|
|
|
|
def get_min_delay() -> int:
|
|
return get_http_setting('min_delay')
|
|
|
|
|
|
def get_max_delay() -> int:
|
|
return get_http_setting('max_delay')
|
|
|
|
|
|
def get_base_url() -> str:
|
|
return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel")
|
|
|
|
|
|
def now_iso() -> str:
|
|
"""Get the current time in ISO format."""
|
|
return datetime.now(UTC).isoformat()
|
|
|
|
|
|
def get_filename_from_url(url: str) -> str:
|
|
"""Convert URL to a safe filename."""
|
|
return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_")
|
|
|
|
|
|
def url_to_job_id(url: str) -> int:
|
|
"""Extract the job id from a Craigslist URL (last path segment without .html)."""
|
|
last = url.rstrip("/").split("/")[-1].replace(".html", "")
|
|
if last.isdigit():
|
|
return int(last)
|
|
return 0
|
|
|
|
|
|
def normalize_job_id(raw_id: Optional[str], url: Optional[str]) -> Optional[int]:
|
|
"""Normalize job id coming from details page (e.g., 'post id: 1234567890').
|
|
Fallback to URL-derived id when needed.
|
|
"""
|
|
if raw_id:
|
|
m = re.search(r"(\d{5,})", raw_id)
|
|
if m:
|
|
return int(m.group(1))
|
|
if url:
|
|
return url_to_job_id(url)
|
|
return None
|
|
|
|
|
|
def get_url_from_filename(name: str) -> str:
|
|
"""Generate a URL guess based on the name."""
|
|
# Best-effort URL guess from filename convention (underscores to slashes)
|
|
base = os.path.splitext(name)[0]
|
|
url_guess = f"https://{base.replace('_', '/')}"
|
|
return url_guess
|
|
|
|
|
|
def safe_get_text(element, default="N/A"):
|
|
"""Safely extract text from BeautifulSoup element."""
|
|
return element.get_text(strip=True) if element else default
|
|
|
|
|
|
def safe_get_attr(element, attr, default="N/A"):
|
|
"""Safely extract attribute from BeautifulSoup element."""
|
|
return element.get(attr, default) if element else default
|
|
|
|
|
|
def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_delay()) -> float:
|
|
"""Get a random delay between min_delay and max_delay seconds."""
|
|
return random.uniform(min_delay, max_delay)
|
|
|
|
|
|
def get_color_from_string(s: str) -> str:
|
|
"""Generate a color code from a string."""
|
|
hash_code = hash(s)
|
|
# Ensure the hash code is positive
|
|
hash_code = hash_code if hash_code >= 0 else -hash_code
|
|
# Extract RGB components
|
|
r, g, b = (hash_code & 0xFF0000) >> 16, (hash_code &
|
|
0x00FF00) >> 8, hash_code & 0x0000FF
|
|
# ensure RGB components are within 128-255
|
|
r = max(128, min(255, r))
|
|
g = max(128, min(255, g))
|
|
b = max(128, min(255, b))
|
|
# Combine RGB components back into a single integer
|
|
calculated = (r << 16) | (g << 8) | b
|
|
return f"#{calculated:06X}"
|
|
|
|
|
|
# ---- App helpers moved from app.py for reuse and readability -------------
|
|
|
|
|
|
def filter_jobs(
|
|
jobs: List[Dict[str, Any]],
|
|
region: Optional[str] = None,
|
|
keyword: Optional[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Filter jobs by optional region and keyword."""
|
|
filtered = jobs
|
|
if region:
|
|
filtered = [j for j in filtered if j.get("region") == region]
|
|
if keyword:
|
|
filtered = [j for j in filtered if j.get("keyword") == keyword]
|
|
return filtered
|
|
|
|
|
|
def get_job_by_id(job_id):
|
|
"""Fetch job details by job ID from the database."""
|
|
try:
|
|
from web.db import get_all_jobs # lazy import to avoid cycles
|
|
for j in get_all_jobs():
|
|
if str(j.get("id")) == str(job_id) or str(j.get("job_id")) == str(job_id):
|
|
return j
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def make_request_with_retry(url: str, max_retries: int = get_max_retries()) -> Optional[str]:
|
|
"""Make HTTP request with retry logic and proper error handling."""
|
|
# initial delay
|
|
delay = get_random_delay()
|
|
|
|
headers = {'User-Agent': get_user_agent()}
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
delay = get_random_delay() * (get_backoff_factor() ** attempt)
|
|
if attempt > 0:
|
|
time.sleep(delay)
|
|
|
|
resp = requests.get(url, headers=headers,
|
|
timeout=get_request_timeout())
|
|
|
|
if resp.status_code == 403:
|
|
return None
|
|
elif resp.status_code == 429:
|
|
time.sleep(delay * 3) # Longer delay for rate limiting
|
|
continue
|
|
elif resp.status_code == 404:
|
|
return None
|
|
elif resp.status_code == 410:
|
|
return None
|
|
elif resp.status_code >= 400:
|
|
if attempt == max_retries - 1:
|
|
return None
|
|
continue
|
|
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
|
|
except requests.exceptions.Timeout:
|
|
pass
|
|
except requests.exceptions.ConnectionError:
|
|
pass
|
|
except requests.exceptions.RequestException:
|
|
pass
|
|
|
|
if attempt < max_retries - 1:
|
|
delay = get_random_delay() * (get_backoff_factor() ** attempt)
|
|
time.sleep(delay)
|
|
|
|
return None
|