from datetime import datetime, timezone from web.scraper import process_region_keyword, scrape_job_page from web.db import ( db_init, upsert_listing, upsert_job_details, url_to_job_id, upsert_user_interaction, db_get_all_job_urls, db_delete_job, remove_job, insert_log, get_last_fetch_time, ) import schedule import time # Import utility functions from web.utils import ( get_base_url, make_request_with_retry, get_email_settings, ) from web.db import get_all_regions, get_all_keywords, seed_regions_keywords_from_listings from web.email_templates import render_job_alert_email from web.email_service import send_email def _negative_match_details(job_data: dict) -> tuple[str, str] | None: """Return (keyword, field) when job_data indicates a negative match.""" if not job_data or not job_data.get("is_negative_match"): return None keyword = (job_data.get("negative_keyword_match") or "").strip() field = (job_data.get("negative_match_field") or "unknown").strip() or "unknown" if not keyword: keyword = "unknown keyword" return keyword, field def _send_new_job_alert(new_jobs: list[dict]) -> tuple[bool, str]: """Send an email alert for newly discovered jobs. Returns (sent, message) where message explains why mail was skipped. """ settings = get_email_settings() if not settings.get("enabled"): return False, "email alerts disabled" recipients = settings.get("recipients", []) or [] if not recipients: return False, "no recipients configured" payload = render_job_alert_email(new_jobs) send_email( subject=payload.get("subject", "New jobs available"), body=payload.get("body", ""), to=recipients, settings=settings, ) return True, "sent" def fetch_listings(): """Fetch job listings from all regions and keywords. Yields progress messages and returns a dict with: - discovered: total number of unique job URLs discovered - new: total number of new jobs added to the database - by_search: list of dicts, each containing: - region: region name - keyword: keyword name - count: number of jobs fetched for this search """ # We'll collect URLs discovered in this run and then remove any DB listings # not present in this set (treat DB as reflecting current search results). existing_db_urls = set(row['url'] for row in db_get_all_job_urls()) discovered_urls = set() new_rows = [] new_jobs = [] search_results = [] # Track count per search # Ensure regions/keywords master lists exist try: seed_regions_keywords_from_listings() except Exception: pass yield "Initializing database and seeding regions/keywords...\n" # Fetch listings for each region/keyword from DB regions = get_all_regions() keywords = get_all_keywords() total_combinations = len(regions) * len(keywords) processed = 0 yield f"Found {len(regions)} regions and {len(keywords)} keywords. Processing {total_combinations} combinations...\n" for region in regions: region_name = region.get("name") if not region_name: continue for keyword in keywords: keyword_name = keyword.get("name") if not keyword_name: continue # Build a canonical search identifier for this region+keyword combination. url = get_base_url().format(region=region, keyword=keyword_name.replace(" ", "+")) search_page_id = f"search:{region_name}:{keyword_name}" search_count = 0 # Count jobs for this search try: last = get_last_fetch_time(url) if last is not None: # skip if fetched within the last hour age = datetime.now( timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc)) if age.total_seconds() < 1 * 3600: yield f"Skipping {region_name} + {keyword_name} (fetched {age.seconds//3600}h ago)...\n" processed += 1 continue except Exception: # if logging lookup fails, proceed with fetch pass processed += 1 yield f"Processing {region_name} + {keyword_name} ({processed}/{total_combinations})...\n" # record that we're fetching this search page now try: insert_log(url, region=region_name, keyword=keyword_name, fetched_at=datetime.now(timezone.utc)) except Exception: pass for row in process_region_keyword(region_name, keyword_name, discovered_urls): timestamp, region, keyword, title, pay, location, url = row discovered_urls.add(url) search_count += 1 if url not in existing_db_urls: new_rows.append(row) new_jobs.append({ "timestamp": timestamp, "region": region, "keyword": keyword, "title": title, "pay": pay, "location": location, "url": url, }) # Upsert or update listing to reflect current search result upsert_listing( url=url, region=region, keyword=keyword, title=title, pay=pay, location=location, timestamp=timestamp, fetched_from=search_page_id, fetched_at=datetime.now(timezone.utc), ) # Record per-search count search_results.append({ "region": region_name, "keyword": keyword_name, "count": search_count }) yield f"Listing fetch complete: {len(discovered_urls)} discovered, {len(new_rows)} new,\n" return { "discovered": len(discovered_urls), "new": len(new_rows), "by_search": search_results, "new_jobs": new_jobs, } def process_job_url(job_url: str, region: str = "", keyword: str = ""): last = get_last_fetch_time(job_url) if last is not None: # skip if fetched within the last 24 hours age = datetime.now( timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc)) if age.total_seconds() < 24 * 3600: yield f"Skipping job {job_url} (fetched {age.seconds//3600}h ago)...\n" return None try: job_id = url_to_job_id(job_url) yield f"Fetching job page: {job_url}\n" content = make_request_with_retry(job_url, 1) if content is None: yield f"Failed to fetch content for {job_url}, removing from database\n" remove_job(job_url) return None yield f"Scraping job data from {job_url}\n" job_data = scrape_job_page(content, job_url) if job_data: negative_info = _negative_match_details(job_data) if negative_info: keyword, field = negative_info yield ( f"Skipping job {job_id} due to negative keyword " f"'{keyword}' in {field}\n" ) remove_job(job_url) return None yield f"Upserting job details for {job_id}\n" upsert_job_details(job_data, region=region, keyword=keyword) yield f"Successfully processed job {job_id}: {job_data.get('title', 'Unknown')}\n" return job_data else: yield f"Failed to scrape job data from {job_url}\n" return None except Exception as e: yield f"Error processing {job_url}: {str(e)}\n" return None def scraper(): """Main function to run the scraper.""" yield "Starting scraper...\n" db_init() yield "Database initialized\n" # First, fetch current listings from search pages and make DB reflect them. yield "Fetching listings...\n" listing_summary: dict | None = None fetch_iter = fetch_listings() try: while True: message = next(fetch_iter) yield message except StopIteration as stop: listing_summary = stop.value if isinstance(stop.value, dict) else {} new_jobs = [] if listing_summary: new_jobs = listing_summary.get("new_jobs", []) or [] if new_jobs: yield f"Preparing email alert for {len(new_jobs)} new jobs...\n" try: sent, info = _send_new_job_alert(new_jobs) if sent: yield "Job alert email sent.\n" else: yield f"Skipping email alert: {info}\n" except Exception as exc: yield f"Failed to send job alert email: {exc}\n" # Finally, fetch and refresh individual job pages for current listings job_urls = db_get_all_job_urls() yield f"Processing {len(job_urls)} job pages...\n" for i, url_dict in enumerate(job_urls, start=1): url = url_dict.get("url") region = url_dict.get("region", "") keyword = url_dict.get("keyword", "") if not url: continue yield f"\n--- Processing job {i}/{len(job_urls)} ---\n" for message in process_job_url(job_url=url, region=region, keyword=keyword): yield message yield "\nScraping completed successfully!\n" def scrape_jobs_with_retry(max_retries=3): """Run the scraping process with retry logic for failures.""" for attempt in range(max_retries): try: scraper() return True except Exception as e: if attempt < max_retries - 1: time.sleep(2 ** attempt * 10) # Exponential backoff return False def start_scheduler(): """Start the scheduler to run scraping every hour.""" # Clear any existing jobs schedule.clear() # Schedule scraping every hour schedule.every().hour.do(scrape_jobs_with_retry) # Run the scheduler in a loop while True: schedule.run_pending() time.sleep(60) # Check every minute def run_scheduled_scraping(): """Run the scheduled scraping process.""" try: scrape_jobs_with_retry() except Exception as e: pass # Initialize scheduler when module is imported schedule.every().hour.do(run_scheduled_scraping) if __name__ == "__main__": scraper()