Files
jobs/web/craigslist.py
zwitschi 92b6efb550
Some checks failed
CI/CD Pipeline / test (push) Failing after 42s
CI/CD Pipeline / deploy (push) Has been skipped
fix: adjust exponential backoff timing in scrape_jobs_with_retry
2025-11-01 18:31:35 +01:00

208 lines
7.2 KiB
Python

from datetime import datetime, timezone
from web.scraper import process_region_keyword, scrape_job_page
from web.db import (
db_init,
upsert_listing,
upsert_job_details,
url_to_job_id,
upsert_user_interaction,
db_get_all_job_urls,
db_delete_job,
remove_job,
insert_log,
get_last_fetch_time,
)
import schedule
import time
# Import utility functions
from web.utils import (
get_base_url,
make_request_with_retry,
now_iso,
)
from web.db import get_all_regions, get_all_keywords, seed_regions_keywords_from_listings
def fetch_listings():
"""Fetch job listings from all regions and keywords."""
# We'll collect URLs discovered in this run and then remove any DB listings
# not present in this set (treat DB as reflecting current search results).
existing_db_urls = set(row['url'] for row in db_get_all_job_urls())
discovered_urls = set()
new_rows = []
# Ensure regions/keywords master lists exist
try:
seed_regions_keywords_from_listings()
except Exception:
pass
yield "Initializing database and seeding regions/keywords...\n"
# Fetch listings for each region/keyword from DB
regions = get_all_regions()
keywords = get_all_keywords()
total_combinations = len(regions) * len(keywords)
processed = 0
yield f"Found {len(regions)} regions and {len(keywords)} keywords. Processing {total_combinations} combinations...\n"
for region in regions:
region_name = region.get("name")
if not region_name:
continue
for keyword in keywords:
keyword_name = keyword.get("name")
if not keyword_name:
continue
# Build a canonical search identifier for this region+keyword combination.
url = get_base_url().format(region=region, keyword=keyword_name.replace(" ", "+"))
search_page_id = f"search:{region_name}:{keyword_name}"
try:
last = get_last_fetch_time(url)
if last is not None:
# skip if fetched within the last 24 hours
age = datetime.now(
timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc))
if age.total_seconds() < 24 * 3600:
yield f"Skipping {region_name} + {keyword_name} (fetched {age.seconds//3600}h ago)...\n"
processed += 1
continue
except Exception:
# if logging lookup fails, proceed with fetch
pass
processed += 1
yield f"Processing {region_name} + {keyword_name} ({processed}/{total_combinations})...\n"
# record that we're fetching this search page now
try:
insert_log(url, region=region_name,
keyword=keyword_name, fetched_at=datetime.now(timezone.utc))
except Exception:
pass
for row in process_region_keyword(region_name, keyword_name, discovered_urls):
timestamp, region, keyword, title, pay, location, url = row
discovered_urls.add(url)
if url not in existing_db_urls:
new_rows.append(row)
# Upsert or update listing to reflect current search result
upsert_listing(
url=url,
region=region,
keyword=keyword,
title=title,
pay=pay,
location=location,
timestamp=timestamp,
fetched_from=search_page_id,
fetched_at=datetime.now(timezone.utc),
)
yield f"Listing fetch complete: {len(discovered_urls)} discovered, {len(new_rows)} new,\n"
return {"discovered": len(discovered_urls), "new": len(new_rows)}
def process_job_url(job_url: str, region: str = "", keyword: str = ""):
last = get_last_fetch_time(job_url)
if last is not None:
# skip if fetched within the last 24 hours
age = datetime.now(
timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc))
if age.total_seconds() < 24 * 3600:
yield f"Skipping job {job_url} (fetched {age.seconds//3600}h ago)...\n"
return None
try:
job_id = url_to_job_id(job_url)
yield f"Fetching job page: {job_url}\n"
content = make_request_with_retry(job_url, 1)
if content is None:
yield f"Failed to fetch content for {job_url}, removing from database\n"
remove_job(job_url)
return None
yield f"Scraping job data from {job_url}\n"
job_data = scrape_job_page(content, job_url)
if job_data:
yield f"Upserting job details for {job_id}\n"
upsert_job_details(job_data, region=region, keyword=keyword)
upsert_user_interaction(
job_id, seen_at=datetime.now(timezone.utc).isoformat())
yield f"Successfully processed job {job_id}: {job_data.get('title', 'Unknown')}\n"
return job_data
else:
yield f"Failed to scrape job data from {job_url}\n"
return None
except Exception as e:
yield f"Error processing {job_url}: {str(e)}\n"
return None
def scraper():
"""Main function to run the scraper."""
yield "Starting scraper...\n"
db_init()
yield "Database initialized\n"
# First, fetch current listings from search pages and make DB reflect them.
yield "Fetching listings...\n"
for message in fetch_listings():
yield message
# Finally, fetch and refresh individual job pages for current listings
job_urls = db_get_all_job_urls()
yield f"Processing {len(job_urls)} job pages...\n"
for i, url_dict in enumerate(job_urls, start=1):
url = url_dict.get("url")
region = url_dict.get("region", "")
keyword = url_dict.get("keyword", "")
if not url:
continue
yield f"\n--- Processing job {i}/{len(job_urls)} ---\n"
for message in process_job_url(job_url=url, region=region, keyword=keyword):
yield message
yield "\nScraping completed successfully!\n"
def scrape_jobs_with_retry(max_retries=3):
"""Run the scraping process with retry logic for failures."""
for attempt in range(max_retries):
try:
scraper()
return True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt * 10) # Exponential backoff
return False
def start_scheduler():
"""Start the scheduler to run scraping every hour."""
# Clear any existing jobs
schedule.clear()
# Schedule scraping every hour
schedule.every().hour.do(scrape_jobs_with_retry)
# Run the scheduler in a loop
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
def run_scheduled_scraping():
"""Run the scheduled scraping process."""
try:
scrape_jobs_with_retry()
except Exception as e:
pass
# Initialize scheduler when module is imported
schedule.every().hour.do(run_scheduled_scraping)
if __name__ == "__main__":
scraper()