from datetime import datetime, timezone from web.scraper import process_region_keyword, scrape_job_page from web.db import ( db_init, upsert_listing, upsert_job_details, url_to_job_id, upsert_user_interaction, db_get_all_job_urls, db_delete_job, remove_job, insert_log, get_last_fetch_time, ) import schedule import time # Import utility functions from web.utils import ( get_base_url, make_request_with_retry, now_iso, ) from web.db import get_all_regions, get_all_keywords, seed_regions_keywords_from_listings def fetch_listings(): """Fetch job listings from all regions and keywords.""" # We'll collect URLs discovered in this run and then remove any DB listings # not present in this set (treat DB as reflecting current search results). existing_db_urls = set(row['url'] for row in db_get_all_job_urls()) discovered_urls = set() new_rows = [] # Ensure regions/keywords master lists exist try: seed_regions_keywords_from_listings() except Exception: pass yield "Initializing database and seeding regions/keywords...\n" # Fetch listings for each region/keyword from DB regions = get_all_regions() keywords = get_all_keywords() total_combinations = len(regions) * len(keywords) processed = 0 yield f"Found {len(regions)} regions and {len(keywords)} keywords. Processing {total_combinations} combinations...\n" for region in regions: region_name = region.get("name") if not region_name: continue for keyword in keywords: keyword_name = keyword.get("name") if not keyword_name: continue # Build a canonical search identifier for this region+keyword combination. url = get_base_url().format(region=region, keyword=keyword_name.replace(" ", "+")) search_page_id = f"search:{region_name}:{keyword_name}" try: last = get_last_fetch_time(url) if last is not None: # skip if fetched within the last 24 hours age = datetime.now( timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc)) if age.total_seconds() < 24 * 3600: yield f"Skipping {region_name} + {keyword_name} (fetched {age.seconds//3600}h ago)...\n" processed += 1 continue except Exception: # if logging lookup fails, proceed with fetch pass processed += 1 yield f"Processing {region_name} + {keyword_name} ({processed}/{total_combinations})...\n" # record that we're fetching this search page now try: insert_log(url, region=region_name, keyword=keyword_name, fetched_at=datetime.now(timezone.utc)) except Exception: pass for row in process_region_keyword(region_name, keyword_name, discovered_urls): timestamp, region, keyword, title, pay, location, url = row discovered_urls.add(url) if url not in existing_db_urls: new_rows.append(row) # Upsert or update listing to reflect current search result upsert_listing( url=url, region=region, keyword=keyword, title=title, pay=pay, location=location, timestamp=timestamp, fetched_from=search_page_id, fetched_at=datetime.now(timezone.utc), ) yield f"Listing fetch complete: {len(discovered_urls)} discovered, {len(new_rows)} new,\n" return {"discovered": len(discovered_urls), "new": len(new_rows)} def process_job_url(job_url: str, region: str = "", keyword: str = ""): last = get_last_fetch_time(job_url) if last is not None: # skip if fetched within the last 24 hours age = datetime.now( timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc)) if age.total_seconds() < 24 * 3600: yield f"Skipping job {job_url} (fetched {age.seconds//3600}h ago)...\n" return None try: job_id = url_to_job_id(job_url) yield f"Fetching job page: {job_url}\n" content = make_request_with_retry(job_url, 1) if content is None: yield f"Failed to fetch content for {job_url}, removing from database\n" remove_job(job_url) return None yield f"Scraping job data from {job_url}\n" job_data = scrape_job_page(content, job_url) if job_data: yield f"Upserting job details for {job_id}\n" upsert_job_details(job_data, region=region, keyword=keyword) upsert_user_interaction( job_id, seen_at=datetime.now(timezone.utc).isoformat()) yield f"Successfully processed job {job_id}: {job_data.get('title', 'Unknown')}\n" return job_data else: yield f"Failed to scrape job data from {job_url}\n" return None except Exception as e: yield f"Error processing {job_url}: {str(e)}\n" return None def scraper(): """Main function to run the scraper.""" yield "Starting scraper...\n" db_init() yield "Database initialized\n" # First, fetch current listings from search pages and make DB reflect them. yield "Fetching listings...\n" for message in fetch_listings(): yield message # Finally, fetch and refresh individual job pages for current listings job_urls = db_get_all_job_urls() yield f"Processing {len(job_urls)} job pages...\n" for i, url_dict in enumerate(job_urls, start=1): url = url_dict.get("url") region = url_dict.get("region", "") keyword = url_dict.get("keyword", "") if not url: continue yield f"\n--- Processing job {i}/{len(job_urls)} ---\n" for message in process_job_url(job_url=url, region=region, keyword=keyword): yield message yield "\nScraping completed successfully!\n" def scrape_jobs_with_retry(max_retries=3): """Run the scraping process with retry logic for failures.""" for attempt in range(max_retries): try: scraper() return True except Exception as e: if attempt < max_retries - 1: time.sleep(2 ** attempt * 10) # Exponential backoff return False def start_scheduler(): """Start the scheduler to run scraping every hour.""" # Clear any existing jobs schedule.clear() # Schedule scraping every hour schedule.every().hour.do(scrape_jobs_with_retry) # Run the scheduler in a loop while True: schedule.run_pending() time.sleep(60) # Check every minute def run_scheduled_scraping(): """Run the scheduled scraping process.""" try: scrape_jobs_with_retry() except Exception as e: pass # Initialize scheduler when module is imported schedule.every().hour.do(run_scheduled_scraping) if __name__ == "__main__": scraper()