initial project commit
This commit is contained in:
336
web/utils.py
Normal file
336
web/utils.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""
|
||||
Utility functions for the Craigslist scraper.
|
||||
"""
|
||||
|
||||
from typing import Any, Optional as _Optional
|
||||
from datetime import datetime, UTC
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import requests
|
||||
import time
|
||||
from typing import Optional, List, Dict
|
||||
|
||||
|
||||
def get_config_file() -> str:
|
||||
"""Return the path to the main config file."""
|
||||
return os.path.abspath(os.path.join(
|
||||
os.path.dirname(__file__), '..', 'config', 'settings.json'))
|
||||
|
||||
|
||||
def get_config() -> dict:
|
||||
"""Return the loaded configuration dict."""
|
||||
CONFIG = {}
|
||||
try:
|
||||
with open(get_config_file(), 'r', encoding='utf-8') as _f:
|
||||
CONFIG = json.load(_f)
|
||||
except Exception:
|
||||
CONFIG = {}
|
||||
return CONFIG
|
||||
|
||||
|
||||
def get_users_from_settings() -> List[Dict]:
|
||||
"""Return user entries from settings.json (array of dicts)."""
|
||||
users = get_config().get('users', [])
|
||||
if not isinstance(users, list):
|
||||
return []
|
||||
out: List[Dict] = []
|
||||
for u in users:
|
||||
if not isinstance(u, dict):
|
||||
continue
|
||||
username = (u.get('username') or '').strip()
|
||||
if not username:
|
||||
continue
|
||||
out.append({
|
||||
'username': username,
|
||||
'is_admin': bool(u.get('is_admin', False)),
|
||||
'password': u.get('password') or ''
|
||||
})
|
||||
return out
|
||||
|
||||
|
||||
def initialize_users_from_settings() -> int:
|
||||
"""Ensure users from settings.json exist in DB; set admin/active and passwords.
|
||||
|
||||
Returns number of users processed.
|
||||
"""
|
||||
from web.db import create_or_update_user # local import to avoid cycles
|
||||
users = get_users_from_settings()
|
||||
count = 0
|
||||
for u in users:
|
||||
pw = u.get('password') or None
|
||||
create_or_update_user(u['username'], password=pw, is_admin=bool(
|
||||
u.get('is_admin', False)), is_active=True)
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def verify_credentials(username: str, password: str) -> bool:
|
||||
"""Proxy to db.verify_user_credentials"""
|
||||
from web.db import verify_user_credentials
|
||||
return verify_user_credentials(username, password)
|
||||
|
||||
|
||||
# --- Database configuration helpers ---
|
||||
|
||||
def get_mysql_config() -> dict:
|
||||
"""Return MySQL/MariaDB connection settings."""
|
||||
db = get_config().get('database', {}).get('mysql', {})
|
||||
return {
|
||||
'host': db.get('host', '127.0.0.1'),
|
||||
'user': db.get('user', 'root'),
|
||||
'password': db.get('password', ''),
|
||||
'database': db.get('database', 'jobs'),
|
||||
'port': db.get('port', 3306),
|
||||
}
|
||||
|
||||
|
||||
def get_http_setting(key: str, default=None):
|
||||
return get_config().get('http', {}).get(key, default)
|
||||
|
||||
|
||||
def get_paths() -> dict:
|
||||
return get_config().get('paths', {})
|
||||
|
||||
|
||||
def get_cache_dir() -> str:
|
||||
return get_paths().get('cache_dir', 'cache')
|
||||
|
||||
|
||||
def get_logs_dir() -> str:
|
||||
return get_paths().get('logs_dir', 'logs')
|
||||
|
||||
|
||||
def get_user_agent() -> str:
|
||||
return get_http_setting('user_agent')
|
||||
|
||||
|
||||
def get_request_timeout() -> int:
|
||||
return get_http_setting('request_timeout')
|
||||
|
||||
|
||||
def get_max_retries() -> int:
|
||||
return get_http_setting('max_retries')
|
||||
|
||||
|
||||
def get_backoff_factor() -> int:
|
||||
return get_http_setting('backoff_factor')
|
||||
|
||||
|
||||
def get_min_delay() -> int:
|
||||
return get_http_setting('min_delay')
|
||||
|
||||
|
||||
def get_max_delay() -> int:
|
||||
return get_http_setting('max_delay')
|
||||
|
||||
|
||||
def get_base_url() -> str:
|
||||
return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel")
|
||||
|
||||
|
||||
def ensure_cache_dir():
|
||||
"""Ensure cache directory exists."""
|
||||
os.makedirs(get_cache_dir(), exist_ok=True)
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
"""Get the current time in ISO format."""
|
||||
return datetime.now(UTC).isoformat()
|
||||
|
||||
|
||||
def get_filename_from_url(url: str) -> str:
|
||||
"""Convert URL to a safe filename."""
|
||||
return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_")
|
||||
|
||||
|
||||
def url_to_job_id(url: str) -> int:
|
||||
"""Extract the job id from a Craigslist URL (last path segment without .html)."""
|
||||
last = url.rstrip("/").split("/")[-1].replace(".html", "")
|
||||
if last.isdigit():
|
||||
return int(last)
|
||||
return 0
|
||||
|
||||
|
||||
def normalize_job_id(raw_id: Optional[str], url: Optional[str]) -> Optional[int]:
|
||||
"""Normalize job id coming from details page (e.g., 'post id: 1234567890').
|
||||
Fallback to URL-derived id when needed.
|
||||
"""
|
||||
if raw_id:
|
||||
m = re.search(r"(\d{5,})", raw_id)
|
||||
if m:
|
||||
return int(m.group(1))
|
||||
if url:
|
||||
return url_to_job_id(url)
|
||||
return None
|
||||
|
||||
|
||||
def get_url_from_filename(name: str) -> str:
|
||||
"""Generate a URL guess based on the name."""
|
||||
# Best-effort URL guess from filename convention (underscores to slashes)
|
||||
base = os.path.splitext(name)[0]
|
||||
url_guess = f"https://{base.replace('_', '/')}"
|
||||
return url_guess
|
||||
|
||||
|
||||
def get_cached_content(url: str) -> str:
|
||||
"""Get cached content for URL."""
|
||||
with open(get_cache_path(url), "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def safe_get_text(element, default="N/A"):
|
||||
"""Safely extract text from BeautifulSoup element."""
|
||||
return element.get_text(strip=True) if element else default
|
||||
|
||||
|
||||
def safe_get_attr(element, attr, default="N/A"):
|
||||
"""Safely extract attribute from BeautifulSoup element."""
|
||||
return element.get(attr, default) if element else default
|
||||
|
||||
|
||||
def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_delay()) -> float:
|
||||
"""Get a random delay between min_delay and max_delay seconds."""
|
||||
return random.uniform(min_delay, max_delay)
|
||||
|
||||
|
||||
def get_cache_path(url: str) -> str:
|
||||
"""Get cache file path for URL."""
|
||||
return os.path.join(get_cache_dir(), f"{get_filename_from_url(url)}.html")
|
||||
|
||||
|
||||
def cache_page(url: str, content: str):
|
||||
"""Cache the page content with a timestamp."""
|
||||
cache_path = get_cache_path(url)
|
||||
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
|
||||
with open(cache_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
# Update the file's modification time to the current time
|
||||
os.utime(cache_path, None)
|
||||
|
||||
|
||||
def is_cached(url: str) -> bool:
|
||||
"""Check if the page is cached and not older than 24 hours."""
|
||||
cache_path = get_cache_path(url)
|
||||
if not os.path.isfile(cache_path):
|
||||
return False
|
||||
|
||||
# Check the file's age if it's a search result page
|
||||
if 'search' in url:
|
||||
file_age = time.time() - os.path.getmtime(cache_path)
|
||||
if file_age > 24 * 3600: # 24 hours in seconds
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def is_cache_stale(last_modified: str, days: int = 1) -> bool:
|
||||
"""Check if the cached page is stale (older than 24 hours)."""
|
||||
if not last_modified:
|
||||
return True
|
||||
last_datetime = datetime.fromisoformat(last_modified)
|
||||
file_age = time.time() - last_datetime.timestamp()
|
||||
return file_age > days * 24 * 3600 # days in seconds
|
||||
|
||||
|
||||
def delete_cached_page(url: str):
|
||||
cache_fp = get_cache_path(url)
|
||||
if os.path.exists(cache_fp):
|
||||
try:
|
||||
os.remove(cache_fp)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def get_color_from_string(s: str) -> str:
|
||||
"""Generate a color code from a string."""
|
||||
hash_code = hash(s)
|
||||
# Ensure the hash code is positive
|
||||
hash_code = hash_code if hash_code >= 0 else -hash_code
|
||||
# Extract RGB components
|
||||
r, g, b = (hash_code & 0xFF0000) >> 16, (hash_code &
|
||||
0x00FF00) >> 8, hash_code & 0x0000FF
|
||||
# ensure RGB components are within 128-255
|
||||
r = max(128, min(255, r))
|
||||
g = max(128, min(255, g))
|
||||
b = max(128, min(255, b))
|
||||
# Combine RGB components back into a single integer
|
||||
calculated = (r << 16) | (g << 8) | b
|
||||
return f"#{calculated:06X}"
|
||||
|
||||
|
||||
# ---- App helpers moved from app.py for reuse and readability -------------
|
||||
|
||||
|
||||
def filter_jobs(
|
||||
jobs: List[Dict[str, Any]],
|
||||
region: _Optional[str] = None,
|
||||
keyword: _Optional[str] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Filter jobs by optional region and keyword."""
|
||||
filtered = jobs
|
||||
if region:
|
||||
filtered = [j for j in filtered if j.get("region") == region]
|
||||
if keyword:
|
||||
filtered = [j for j in filtered if j.get("keyword") == keyword]
|
||||
return filtered
|
||||
|
||||
|
||||
def get_job_by_id(job_id):
|
||||
"""Fetch job details by job ID from the database."""
|
||||
try:
|
||||
from web.db import get_all_jobs # lazy import to avoid cycles
|
||||
for j in get_all_jobs():
|
||||
if str(j.get("id")) == str(job_id) or str(j.get("job_id")) == str(job_id):
|
||||
return j
|
||||
except Exception:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def make_request_with_retry(url: str, max_retries: int = get_max_retries()) -> Optional[str]:
|
||||
"""Make HTTP request with retry logic and proper error handling."""
|
||||
# initial delay
|
||||
delay = get_random_delay()
|
||||
|
||||
headers = {'User-Agent': get_user_agent()}
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
delay = get_random_delay() * (get_backoff_factor() ** attempt)
|
||||
if attempt > 0:
|
||||
time.sleep(delay)
|
||||
|
||||
resp = requests.get(url, headers=headers,
|
||||
timeout=get_request_timeout())
|
||||
|
||||
if resp.status_code == 403:
|
||||
return None
|
||||
elif resp.status_code == 429:
|
||||
time.sleep(delay * 3) # Longer delay for rate limiting
|
||||
continue
|
||||
elif resp.status_code == 404:
|
||||
return None
|
||||
elif resp.status_code == 410:
|
||||
return None
|
||||
elif resp.status_code >= 400:
|
||||
if attempt == max_retries - 1:
|
||||
return None
|
||||
continue
|
||||
|
||||
resp.raise_for_status()
|
||||
return resp.text
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
pass
|
||||
except requests.exceptions.ConnectionError:
|
||||
pass
|
||||
except requests.exceptions.RequestException:
|
||||
pass
|
||||
|
||||
if attempt < max_retries - 1:
|
||||
delay = get_random_delay() * (get_backoff_factor() ** attempt)
|
||||
time.sleep(delay)
|
||||
|
||||
return None
|
||||
Reference in New Issue
Block a user