updating db logic adding logging
This commit is contained in:
@@ -9,6 +9,8 @@ from web.db import (
|
||||
db_get_all_job_urls,
|
||||
db_delete_job,
|
||||
remove_job,
|
||||
insert_log,
|
||||
get_last_fetch_time,
|
||||
)
|
||||
|
||||
# Import utility functions
|
||||
@@ -51,8 +53,29 @@ def fetch_listings():
|
||||
keyword_name = keyword.get("name")
|
||||
if not keyword_name:
|
||||
continue
|
||||
# Build a canonical search identifier for this region+keyword combination.
|
||||
search_page_id = f"search:{region_name}:{keyword_name}"
|
||||
try:
|
||||
last = get_last_fetch_time(search_page_id)
|
||||
if last is not None:
|
||||
# skip if fetched within the last 24 hours
|
||||
age = datetime.now(
|
||||
timezone.utc) - (last if last.tzinfo is not None else last.replace(tzinfo=timezone.utc))
|
||||
if age.total_seconds() < 24 * 3600:
|
||||
yield f"Skipping {region_name} + {keyword_name} (fetched {age.seconds//3600}h ago)...\n"
|
||||
processed += 1
|
||||
continue
|
||||
except Exception:
|
||||
# if logging lookup fails, proceed with fetch
|
||||
pass
|
||||
processed += 1
|
||||
yield f"Processing {region_name} + {keyword_name} ({processed}/{total_combinations})...\n"
|
||||
# record that we're fetching this search page now
|
||||
try:
|
||||
insert_log(search_page_id, region=region_name,
|
||||
keyword=keyword_name, fetched_at=datetime.now(timezone.utc))
|
||||
except Exception:
|
||||
pass
|
||||
for row in process_region_keyword(region_name, keyword_name, discovered_urls):
|
||||
timestamp, region, keyword, title, pay, location, url = row
|
||||
discovered_urls.add(url)
|
||||
@@ -67,6 +90,8 @@ def fetch_listings():
|
||||
pay=pay,
|
||||
location=location,
|
||||
timestamp=timestamp,
|
||||
fetched_from=search_page_id,
|
||||
fetched_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
# Remove stale listings: those present in DB but not discovered now.
|
||||
|
||||
38
web/db.py
38
web/db.py
@@ -147,6 +147,15 @@ class UserKeyword(Base):
|
||||
"keywords.keyword_id", ondelete="CASCADE"), primary_key=True)
|
||||
|
||||
|
||||
class Log(Base):
|
||||
__tablename__ = "logs"
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
page_url = Column(String(URL_LEN))
|
||||
region = Column(String(SHORT_LEN))
|
||||
keyword = Column(String(SHORT_LEN))
|
||||
fetched_at = Column(DateTime)
|
||||
|
||||
|
||||
def _ensure_session() -> Session:
|
||||
global engine, SessionLocal
|
||||
if engine is None or SessionLocal is None:
|
||||
@@ -218,7 +227,7 @@ def upsert_user_interaction(job_id: str | int, *, user_id: Optional[int] = None,
|
||||
session.commit()
|
||||
|
||||
|
||||
def upsert_listing(*, url: str, region: str, keyword: str, title: str, pay: str, location: str, timestamp: str):
|
||||
def upsert_listing(*, url: str, region: str, keyword: str, title: str, pay: str, location: str, timestamp: str, fetched_from: str | None = None, fetched_at: Optional[datetime] = None):
|
||||
"""Insert or update a job listing row based on job_id derived from URL."""
|
||||
job_id = str(url_to_job_id(url))
|
||||
with _ensure_session() as session:
|
||||
@@ -234,6 +243,33 @@ def upsert_listing(*, url: str, region: str, keyword: str, title: str, pay: str,
|
||||
setattr(obj, "location", location)
|
||||
setattr(obj, "timestamp", timestamp)
|
||||
session.commit()
|
||||
# Optionally record a fetch log for the listing if source provided
|
||||
if fetched_from:
|
||||
try:
|
||||
insert_log(fetched_from, region=region, keyword=keyword,
|
||||
fetched_at=fetched_at or datetime.now())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def insert_log(page_url: str, region: str | None = None, keyword: str | None = None, fetched_at: Optional[datetime] = None):
|
||||
"""Insert a log row for a fetched page."""
|
||||
fetched_at = fetched_at or datetime.now()
|
||||
with _ensure_session() as session:
|
||||
l = Log(page_url=page_url, region=region or '',
|
||||
keyword=keyword or '', fetched_at=fetched_at)
|
||||
session.add(l)
|
||||
session.commit()
|
||||
|
||||
|
||||
def get_last_fetch_time(page_url: str) -> Optional[datetime]:
|
||||
"""Return the latest fetched_at for a given page_url, or None if never fetched."""
|
||||
with _ensure_session() as session:
|
||||
row = session.execute(text("SELECT fetched_at FROM logs WHERE page_url = :u ORDER BY fetched_at DESC LIMIT 1"), {
|
||||
"u": page_url}).fetchone()
|
||||
if row and row[0]:
|
||||
return row[0]
|
||||
return None
|
||||
|
||||
|
||||
def upsert_job_details(job_data: Dict[str, Any]):
|
||||
|
||||
24
web/utils.py
24
web/utils.py
@@ -93,9 +93,6 @@ def get_paths() -> dict:
|
||||
return get_config().get('paths', {})
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def get_logs_dir() -> str:
|
||||
return get_paths().get('logs_dir', 'logs')
|
||||
|
||||
@@ -128,9 +125,6 @@ def get_base_url() -> str:
|
||||
return get_config().get('scraper', {}).get('base_url', "https://{region}.craigslist.org/search/jjj?query={keyword}&sort=rel")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
"""Get the current time in ISO format."""
|
||||
return datetime.now(UTC).isoformat()
|
||||
@@ -170,9 +164,6 @@ def get_url_from_filename(name: str) -> str:
|
||||
return url_guess
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def safe_get_text(element, default="N/A"):
|
||||
"""Safely extract text from BeautifulSoup element."""
|
||||
return element.get_text(strip=True) if element else default
|
||||
@@ -188,21 +179,6 @@ def get_random_delay(min_delay: int = get_min_delay(), max_delay: int = get_max_
|
||||
return random.uniform(min_delay, max_delay)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def get_color_from_string(s: str) -> str:
|
||||
"""Generate a color code from a string."""
|
||||
hash_code = hash(s)
|
||||
|
||||
Reference in New Issue
Block a user