remove caching

This commit is contained in:
georg.sinn-schirwitz
2025-09-08 14:44:46 +02:00
parent f8e23d0fba
commit 042a196718
13 changed files with 144 additions and 525 deletions

View File

@@ -1,5 +1,5 @@
import os
from flask import Flask, request, jsonify, render_template, redirect, url_for, session, flash, send_file
from flask import Flask, request, jsonify, render_template, redirect, url_for, session, flash, Response
from flask_wtf import CSRFProtect
from typing import Dict, List
@@ -34,7 +34,6 @@ from web.utils import (
initialize_users_from_settings,
filter_jobs,
get_job_by_id,
get_cache_dir,
)
from web.db import get_all_regions, get_all_keywords
@@ -232,39 +231,6 @@ def job_by_id(job_id):
return jsonify({"error": "Job not found"}), 404
@app.route('/cached/<job_id>', methods=['GET'])
def serve_cached(job_id):
"""Serve the cached HTML file for a job if available.
Uses the job record's `file_path_abs` when present, or resolves the DB `file_path` via helper.
Ensures the returned file is located under the configured cache directory to avoid path-traversal.
"""
try:
from web.db import db_get_cached_abs_path
j = get_job_by_id(job_id)
if not j:
return "Job not found", 404
# Prefer file_path_abs, fall back to resolving the DB-stored file_path
abs_fp = j.get('file_path_abs') or None
if not abs_fp:
db_fp = j.get('file_path')
abs_fp = db_get_cached_abs_path(db_fp) if db_fp else None
if not abs_fp or not os.path.isfile(abs_fp):
return "Cached file not available", 404
cache_dir = os.path.abspath(get_cache_dir())
abs_fp = os.path.abspath(abs_fp)
# Ensure the file is inside the cache directory
if os.path.commonpath([cache_dir, abs_fp]) != cache_dir:
return "Forbidden", 403
return send_file(abs_fp)
except Exception:
return "Error serving cached file", 500
@app.route('/jobs/<job_id>/favorite', methods=['POST'])
def set_favorite(job_id):
"""Mark or unmark a job as favorite for a given user.
@@ -290,9 +256,21 @@ csrf.exempt(set_favorite)
@app.route('/scrape', methods=['GET'])
def scrape():
"""Trigger the web scraping process."""
scraper()
return jsonify({"status": "Scraping completed"})
"""Trigger the web scraping process with streaming output."""
def generate():
try:
for message in scraper():
yield message
except Exception as e:
yield f"Error during scraping: {str(e)}\n"
return Response(generate(), mimetype='text/plain')
@app.route('/scrape-page', methods=['GET'])
def scrape_page():
"""Serve the scrape page with streaming output display."""
return render_template('scrape.html', title='Scrape Jobs')
# ---------------- Auth & Admin UI ------------------------------------------

View File

@@ -2,31 +2,19 @@ from datetime import datetime, timezone
from web.scraper import process_region_keyword, scrape_job_page
from web.db import (
db_init,
upsert_cached_page,
upsert_listing,
upsert_job_details,
url_to_job_id,
upsert_user_interaction,
db_remove_cached_url,
db_sync_cached_pages,
db_get_all_job_urls,
db_get_cache_url,
db_delete_job,
remove_job,
normalize_cached_page_paths,
)
# Import utility functions
from web.utils import (
get_cache_dir,
make_request_with_retry,
now_iso,
get_cache_path,
cache_page,
is_cache_stale,
delete_cached_page,
get_cached_content,
ensure_cache_dir
)
from web.db import get_all_regions, get_all_keywords, seed_regions_keywords_from_listings
@@ -45,15 +33,26 @@ def fetch_listings():
except Exception:
pass
yield "Initializing database and seeding regions/keywords...\n"
# Fetch listings for each region/keyword from DB
for region in get_all_regions():
regions = get_all_regions()
keywords = get_all_keywords()
total_combinations = len(regions) * len(keywords)
processed = 0
yield f"Found {len(regions)} regions and {len(keywords)} keywords. Processing {total_combinations} combinations...\n"
for region in regions:
region_name = region.get("name")
if not region_name:
continue
for keyword in get_all_keywords():
for keyword in keywords:
keyword_name = keyword.get("name")
if not keyword_name:
continue
processed += 1
yield f"Processing {region_name} + {keyword_name} ({processed}/{total_combinations})...\n"
for row in process_region_keyword(region_name, keyword_name, discovered_urls):
timestamp, region, keyword, title, pay, location, url = row
discovered_urls.add(url)
@@ -72,75 +71,68 @@ def fetch_listings():
# Remove stale listings: those present in DB but not discovered now.
stale_urls = existing_db_urls - discovered_urls
for url in stale_urls:
try:
jid = url_to_job_id(url)
db_delete_job(jid)
# Also try to remove cached file and its metadata
delete_cached_page(url)
db_remove_cached_url(url)
except Exception:
pass
if stale_urls:
yield f"Removing {len(stale_urls)} stale listings...\n"
for url in stale_urls:
try:
jid = url_to_job_id(url)
db_delete_job(jid)
except Exception:
pass
yield f"Listing fetch complete: {len(discovered_urls)} discovered, {len(new_rows)} new, {len(stale_urls)} stale\n"
return {"discovered": len(discovered_urls), "new": len(new_rows), "stale": len(stale_urls)}
def process_job_url(job_url: str):
try:
job_id = url_to_job_id(job_url)
content = None
cached_page = db_get_cache_url(job_url)
if cached_page:
last_modified = cached_page.get("last_modified")
if last_modified and not is_cache_stale(last_modified):
content = get_cached_content(job_url)
else:
content = make_request_with_retry(job_url, 1)
else:
content = make_request_with_retry(job_url, 1)
yield f"Fetching job page: {job_url}\n"
content = make_request_with_retry(job_url, 1)
if content is None:
yield f"Failed to fetch content for {job_url}, removing from database\n"
remove_job(job_url)
return None
# refresh cache and details
cache_page(job_url, content)
upsert_cached_page(
file_path=get_cache_path(job_url),
url_guess=job_url,
last_modified=now_iso(),
size_bytes=len(content),
job_id=job_id
)
yield f"Scraping job data from {job_url}\n"
job_data = scrape_job_page(content, job_url)
if job_data:
yield f"Upserting job details for {job_id}\n"
upsert_job_details(job_data)
upsert_user_interaction(
job_id, seen_at=datetime.now(timezone.utc).isoformat())
yield f"Successfully processed job {job_id}: {job_data.get('title', 'Unknown')}\n"
return job_data
else:
yield f"Failed to scrape job data from {job_url}\n"
return None
except Exception:
except Exception as e:
yield f"Error processing {job_url}: {str(e)}\n"
return None
def scraper():
"""Main function to run the scraper."""
ensure_cache_dir()
yield "Starting scraper...\n"
db_init()
yield "Database initialized\n"
# First, fetch current listings from search pages and make DB reflect them.
jl = fetch_listings()
# Sync any cached files we have on disk into the cached_pages table.
db_sync_cached_pages(get_cache_dir())
# Normalize any relative cached file paths to absolute paths in DB
normalized = normalize_cached_page_paths()
if normalized:
pass
yield "Fetching listings...\n"
for message in fetch_listings():
yield message
# Finally, fetch and refresh individual job pages for current listings
for url in db_get_all_job_urls():
process_job_url(url)
job_urls = db_get_all_job_urls()
yield f"Processing {len(job_urls)} job pages...\n"
for i, url in enumerate(job_urls, 1):
yield f"\n--- Processing job {i}/{len(job_urls)} ---\n"
for message in process_job_url(url):
yield message
yield "\nScraping completed successfully!\n"
if __name__ == "__main__":

187
web/db.py
View File

@@ -4,7 +4,6 @@ from __future__ import annotations
Tables:
- users(user_id PK, username UNIQUE, created_at)
- cached_pages(file_path PK, url_guess, last_modified, size_bytes, job_id)
- job_listings(job_id PK, url UNIQUE, region, keyword, title, pay, location, timestamp)
- job_descriptions(job_id PK FK -> job_listings, title, company, location, description, posted_time, url)
- user_interactions(job_id PK FK -> job_listings, user_id FK -> users, seen_at, url_visited, is_user_favorite)
@@ -23,8 +22,6 @@ from web.utils import (
url_to_job_id,
normalize_job_id,
now_iso,
get_cache_path,
get_cache_dir,
get_mysql_config,
)
@@ -86,8 +83,6 @@ class JobListing(Base):
description = relationship(
"JobDescription", back_populates="listing", uselist=False, cascade="all, delete-orphan")
cached_pages = relationship(
"CachedPage", back_populates="listing", cascade="all, delete-orphan")
interactions = relationship(
"UserInteraction", back_populates="listing", cascade="all, delete-orphan")
@@ -106,30 +101,6 @@ class JobDescription(Base):
listing = relationship("JobListing", back_populates="description")
class CachedPage(Base):
__tablename__ = "cached_pages"
file_path = Column(String(FILE_PATH_LEN), primary_key=True)
url_guess = Column(String(URL_LEN))
last_modified = Column(String(TIME_LEN))
size_bytes = Column(Integer)
job_id = Column(String(JOB_ID_LEN), ForeignKey(
"job_listings.job_id", ondelete="CASCADE"))
listing = relationship("JobListing", back_populates="cached_pages")
@property
def abs_path(self) -> Optional[str]:
"""Return the absolute filesystem path for this cached page.
The DB stores `file_path` relative to the configured cache dir. This
helper centralizes resolution so callers can use `cached_page.abs_path`.
"""
fp = getattr(self, 'file_path', None)
if not fp:
return None
return os.path.join(os.path.abspath(get_cache_dir()), fp)
class UserInteraction(Base):
__tablename__ = "user_interactions"
# composite uniqueness on (user_id, job_id)
@@ -292,139 +263,6 @@ def upsert_job_details(job_data: Dict[str, Any]):
session.commit()
def upsert_cached_page(*, file_path: str, url_guess: Optional[str], last_modified: Optional[str], size_bytes: Optional[int], job_id: Optional[int]):
# Store file paths relative to the cache directory (keeps DB portable)
cache_dir = os.path.abspath(get_cache_dir())
abs_fp = os.path.abspath(file_path)
rel_fp = os.path.relpath(abs_fp, start=cache_dir)
with _ensure_session() as session:
obj = session.get(CachedPage, rel_fp)
if obj is None:
obj = CachedPage(file_path=rel_fp)
session.add(obj)
setattr(obj, "url_guess", url_guess)
setattr(obj, "last_modified", last_modified)
setattr(obj, "size_bytes", size_bytes)
setattr(obj, "job_id", str(job_id) if job_id else None)
session.commit()
def remove_cached_page(file_path: str):
# Accept absolute or relative input but DB keys are stored relative to cache dir
cache_dir = os.path.abspath(get_cache_dir())
abs_fp = os.path.abspath(file_path)
rel_fp = os.path.relpath(abs_fp, start=cache_dir)
with _ensure_session() as session:
obj = session.get(CachedPage, rel_fp)
if obj:
session.delete(obj)
session.commit()
def db_remove_cached_url(url: str):
"""Remove a cached page by URL."""
# Compute absolute path for the URL and delegate to remove_cached_page
abs_fp = os.path.abspath(get_cache_path(url))
try:
remove_cached_page(abs_fp)
except Exception:
pass
def db_get_all_cached_pages() -> List[Dict[str, Any]]:
with _ensure_session() as session:
rows = session.execute(text(
"SELECT file_path, url_guess, last_modified, size_bytes, job_id FROM cached_pages")).fetchall()
out = []
for row in rows:
fp = row[0]
out.append({
"file_path": fp,
"file_path_abs": db_get_cached_abs_path(fp) if fp else None,
"url_guess": row[1],
"last_modified": row[2],
"size_bytes": row[3],
"job_id": row[4],
})
return out
def db_get_cache_url(url: str):
"""Return the data for a specific URL from cached_pages.
Arguments:
url -- The URL to look up in the cache.
"""
with _ensure_session() as session:
row = session.execute(text(
"SELECT file_path, url_guess, last_modified, size_bytes, job_id FROM cached_pages WHERE url_guess = :u"), {"u": url}).fetchone()
if not row:
return None
fp = row[0]
return {
"file_path": fp,
"file_path_abs": db_get_cached_abs_path(fp) if fp else None,
"url_guess": row[1],
"last_modified": row[2],
"size_bytes": row[3],
"job_id": row[4],
}
def db_sync_cached_pages(cache_dir: str):
"""Scan cache_dir and upsert page metadata into cached_pages table."""
if not os.path.isdir(cache_dir):
return
abs_cache = os.path.abspath(cache_dir)
# read existing DB keys once for quick membership tests
db_cache_paths = {c["file_path"] for c in db_get_all_cached_pages()}
for root, _, files in os.walk(abs_cache):
for name in files:
if not name.lower().endswith(".html"):
continue
fp = os.path.abspath(os.path.join(root, name))
rel_fp = os.path.relpath(fp, start=abs_cache)
if rel_fp in db_cache_paths:
continue
try:
stat = os.stat(fp)
mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
size = stat.st_size
except OSError:
mtime = None
size = None
url_guess = get_url_from_filename(name)
job_id = url_to_job_id(url_guess)
upsert_cached_page(file_path=rel_fp, url_guess=url_guess,
last_modified=mtime, size_bytes=size, job_id=job_id)
def normalize_cached_page_paths() -> int:
"""Ensure all cached_pages.file_path values are absolute. Returns number of rows updated/normalized."""
# Convert any absolute paths in DB to relative paths (relative to cache dir)
changed = 0
abs_cache = os.path.abspath(get_cache_dir())
with _ensure_session() as session:
rows = session.execute(text(
"SELECT file_path, url_guess, last_modified, size_bytes, job_id FROM cached_pages")).fetchall()
for (fp, url_guess, last_modified, size_bytes, job_id) in rows:
if os.path.isabs(fp):
rel_fp = os.path.relpath(fp, start=abs_cache)
upsert_cached_page(
file_path=rel_fp,
url_guess=url_guess,
last_modified=last_modified,
size_bytes=size_bytes,
job_id=job_id,
)
with _ensure_session() as session:
session.execute(
text("DELETE FROM cached_pages WHERE file_path = :fp"), {"fp": fp})
session.commit()
changed += 1
return changed
def db_get_keywords() -> List[str]:
"""Return a list of all unique keywords from job listings."""
with _ensure_session() as session:
@@ -453,15 +291,10 @@ SELECT l.job_id
,l.timestamp
,d.posted_time
,l.url
,c.file_path
,c.last_modified
,c.url_guess
,CASE WHEN c.url_guess != l.url THEN 1 ELSE 0 END AS url_guess_stale
FROM job_listings AS l
INNER JOIN job_descriptions AS d
ON l.job_id = d.job_id
AND l.url = d.url
LEFT JOIN cached_pages AS c ON l.job_id = c.job_id
ORDER BY d.posted_time DESC
"""
with _ensure_session() as session:
@@ -479,27 +312,11 @@ ORDER BY d.posted_time DESC
"timestamp": row[7],
"posted_time": row[8],
"url": row[9],
# file_path is stored relative to cache dir; provide both forms
"file_path": row[10],
"file_path_abs": os.path.join(os.path.abspath(get_cache_dir()), row[10]) if row[10] else None,
"last_modified": row[11],
"url_guess": row[12],
"url_guess_stale": row[13],
}
jobs.append(job)
return jobs
def db_get_cached_abs_path(db_file_path: Optional[str]) -> Optional[str]:
"""Return absolute cache file path given a DB-stored (relative) file_path.
Returns None if input is falsy.
"""
if not db_file_path:
return None
return os.path.join(os.path.abspath(get_cache_dir()), db_file_path)
def db_get_all_job_urls() -> List[str]:
"""Return list of job URLs from job_listings."""
with _ensure_session() as session:
@@ -522,10 +339,6 @@ def remove_job(url):
try:
jid = url_to_job_id(url)
db_delete_job(jid)
cache_fp = get_cache_path(url)
remove_cached_page(os.path.abspath(cache_fp))
if os.path.exists(cache_fp):
os.remove(cache_fp)
except Exception:
pass

View File

@@ -1,7 +1,7 @@
from datetime import datetime, UTC
from bs4 import BeautifulSoup
from typing import List, Dict, Set
from web.utils import get_base_url, cache_page, safe_get_text, safe_get_attr, is_cached, get_cached_content, make_request_with_retry
from web.utils import get_base_url, safe_get_text, safe_get_attr, make_request_with_retry
def scrape_listings_page(listing, region: str, keyword: str, seen_urls: Set[str]) -> List:
@@ -108,14 +108,7 @@ def scrape_job_data(content: str, region: str, keyword: str, seen_urls: Set[str]
def process_region_keyword(region: str, keyword: str, seen_urls: Set[str]) -> List[List]:
"""Process a single region and keyword."""
url = get_base_url().format(region=region, keyword=keyword.replace(" ", "+"))
if is_cached(url):
content = get_cached_content(url)
cache_status = "CACHED"
else:
content = make_request_with_retry(url, 3)
if content is None:
return []
cache_page(url, content)
cache_status = "FETCHED"
_ = cache_status # no-op to silence unused var
content = make_request_with_retry(url, 3)
if content is None:
return []
return scrape_job_data(content, region, keyword, seen_urls)

View File

@@ -20,6 +20,7 @@
<a href="{{ url_for('index') }}">Home</a> |
<a href="{{ url_for('user_settings') }}">Preferences</a>
{% if current_user and current_user.is_admin %} |
<a href="{{ url_for('scrape_page') }}">Scrape Jobs</a> |
<a href="{{ url_for('admin_taxonomy') }}">Taxonomy</a> |
<a href="{{ url_for('admin_users') }}">Users</a> {% endif %} {% if
session.get('username') %} |

View File

@@ -23,13 +23,5 @@ styles %}{% endblock %} {% block content %}
>{{ job.title }}</a
>
</p>
{% if job.file_path_abs or job.file_path %}
<p>
<strong>Cached copy:</strong>
<a href="{{ url_for('serve_cached', job_id=job.id) }}" target="_blank"
>View cached copy</a
>
</p>
{% endif %}
</div>
{% endblock %}

67
web/templates/scrape.html Normal file
View File

@@ -0,0 +1,67 @@
{% extends "base.html" %} {% block title %}Scrape Jobs{% endblock %} {% block
content %}
<div id="scrape-container">
<h2>Job Scraping Progress</h2>
<button id="start-scrape" onclick="startScrape()">Start Scraping</button>
<div
id="output"
style="
margin-top: 20px;
padding: 10px;
border: 1px solid #ccc;
height: 400px;
overflow-y: auto;
background-color: #f9f9f9;
font-family: monospace;
white-space: pre-wrap;
"
></div>
</div>
{% endblock %} {% block scripts %}
<script>
function startScrape() {
const output = document.getElementById("output");
const startButton = document.getElementById("start-scrape");
output.textContent = "Starting scrape...\n";
startButton.disabled = true;
startButton.textContent = "Scraping...";
fetch("/scrape")
.then((response) => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function readStream() {
reader
.read()
.then(({ done, value }) => {
if (done) {
output.textContent += "\nScraping completed!";
startButton.disabled = false;
startButton.textContent = "Start Scraping";
return;
}
const chunk = decoder.decode(value, { stream: true });
output.textContent += chunk;
output.scrollTop = output.scrollHeight;
readStream();
})
.catch((error) => {
output.textContent += `\nError: ${error.message}`;
startButton.disabled = false;
startButton.textContent = "Start Scraping";
});
}
readStream();
})
.catch((error) => {
output.textContent = `Error starting scrape: ${error.message}`;
startButton.disabled = false;
startButton.textContent = "Start Scraping";
});
}
</script>
{% endblock %}

View File

@@ -93,8 +93,7 @@ def get_paths() -> dict:
return get_config().get('paths', {})
def get_cache_dir() -> str:
return get_paths().get('cache_dir', 'cache')
def get_logs_dir() -> str:
@@ -129,9 +128,7 @@ def get_base_url() -> str:
return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel")
def ensure_cache_dir():
"""Ensure cache directory exists."""
os.makedirs(get_cache_dir(), exist_ok=True)
def now_iso() -> str:
@@ -173,10 +170,7 @@ def get_url_from_filename(name: str) -> str:
return url_guess
def get_cached_content(url: str) -> str:
"""Get cached content for URL."""
with open(get_cache_path(url), "r", encoding="utf-8") as f:
return f.read()
def safe_get_text(element, default="N/A"):
@@ -194,51 +188,19 @@ def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_
return random.uniform(min_delay, max_delay)
def get_cache_path(url: str) -> str:
"""Get cache file path for URL."""
return os.path.join(get_cache_dir(), f"{get_filename_from_url(url)}.html")
def cache_page(url: str, content: str):
"""Cache the page content with a timestamp."""
cache_path = get_cache_path(url)
with open(cache_path, "w", encoding="utf-8") as f:
f.write(content)
# Update the file's modification time to the current time
os.utime(cache_path, None)
def is_cached(url: str) -> bool:
"""Check if the page is cached and not older than 24 hours."""
cache_path = get_cache_path(url)
if not os.path.isfile(cache_path):
return False
# Check the file's age if it's a search result page
if 'search' in url:
file_age = time.time() - os.path.getmtime(cache_path)
if file_age > 24 * 3600: # 24 hours in seconds
return False
return True
def is_cache_stale(last_modified: str, days: int = 1) -> bool:
"""Check if the cached page is stale (older than 24 hours)."""
if not last_modified:
return True
last_datetime = datetime.fromisoformat(last_modified)
file_age = time.time() - last_datetime.timestamp()
return file_age > days * 24 * 3600 # days in seconds
def delete_cached_page(url: str):
cache_fp = get_cache_path(url)
if os.path.exists(cache_fp):
try:
os.remove(cache_fp)
except Exception:
pass
def get_color_from_string(s: str) -> str: