from __future__ import annotations """MySQL persistence layer for Craigslist scraper (SQLAlchemy ORM only). Tables: - users(user_id PK, username UNIQUE, created_at) - job_listings(job_id PK, url UNIQUE, region, keyword, title, pay, location, timestamp) - job_descriptions(job_id PK FK -> job_listings, title, company, location, description, posted_time, url) - user_interactions(job_id PK FK -> job_listings, user_id FK -> users, seen_at, url_visited, is_user_favorite) - regions(region_id PK, name UNIQUE) - keywords(keyword_id PK, name UNIQUE) - user_regions(user_id FK -> users, region_id FK -> regions, composite PK) - user_keywords(user_id FK -> users, keyword_id FK -> keywords, composite PK) """ from datetime import datetime, UTC from typing import Optional, Dict, Any, List from web.utils import ( get_color_from_string, url_to_job_id, normalize_job_id, now_iso, get_mysql_config, ) # --- SQLAlchemy setup ------------------------------------------------------- from sqlalchemy import ( create_engine, Column, String, Integer, Text, DateTime, Boolean, ForeignKey, text, ) from sqlalchemy.orm import declarative_base, relationship, sessionmaker, Session from werkzeug.security import generate_password_hash, check_password_hash from typing import cast engine = None # set in db_init() SessionLocal: Optional[sessionmaker] = None Base = declarative_base() # Length constants for MySQL compatibility JOB_ID_LEN = 64 URL_LEN = 512 FILE_PATH_LEN = 512 TITLE_LEN = 512 SHORT_LEN = 255 TIME_LEN = 64 # --- ORM Models -------------------------------------------------------------- class User(Base): __tablename__ = "users" user_id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(SHORT_LEN), unique=True, nullable=False) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) password_hash = Column(String(SHORT_LEN)) is_admin = Column(Boolean, default=False, nullable=False) is_active = Column(Boolean, default=True, nullable=False) last_login = Column(DateTime, nullable=True) interactions = relationship( "UserInteraction", back_populates="user", cascade="all, delete-orphan") class JobListing(Base): __tablename__ = "job_listings" job_id = Column(String(JOB_ID_LEN), primary_key=True) url = Column(String(URL_LEN), unique=True) region = Column(String(SHORT_LEN)) keyword = Column(String(SHORT_LEN)) title = Column(String(TITLE_LEN)) pay = Column(String(SHORT_LEN)) location = Column(String(SHORT_LEN)) timestamp = Column(String(TIME_LEN)) description = relationship( "JobDescription", back_populates="listing", uselist=False, cascade="all, delete-orphan") interactions = relationship( "UserInteraction", back_populates="listing", cascade="all, delete-orphan") class JobDescription(Base): __tablename__ = "job_descriptions" job_id = Column(String(JOB_ID_LEN), ForeignKey("job_listings.job_id", ondelete="CASCADE"), primary_key=True) title = Column(String(TITLE_LEN)) company = Column(String(SHORT_LEN)) location = Column(String(SHORT_LEN)) description = Column(Text) posted_time = Column(String(TIME_LEN)) url = Column(String(URL_LEN)) listing = relationship("JobListing", back_populates="description") class UserInteraction(Base): __tablename__ = "user_interactions" # composite uniqueness on (user_id, job_id) job_id = Column(String(JOB_ID_LEN), ForeignKey("job_listings.job_id", ondelete="CASCADE"), primary_key=True) user_id = Column(Integer, ForeignKey( "users.user_id", ondelete="CASCADE"), primary_key=True) seen_at = Column(String(TIME_LEN)) url_visited = Column(String(URL_LEN)) is_user_favorite = Column(Boolean, default=False) user = relationship("User", back_populates="interactions") listing = relationship("JobListing", back_populates="interactions") # --- New preference models: regions, keywords, and user mappings ---------- class Region(Base): __tablename__ = "regions" region_id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(SHORT_LEN), unique=True, nullable=False) color = Column(String(SHORT_LEN), nullable=True) class Keyword(Base): __tablename__ = "keywords" keyword_id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(SHORT_LEN), unique=True, nullable=False) color = Column(String(SHORT_LEN), nullable=True) class UserRegion(Base): __tablename__ = "user_regions" user_id = Column(Integer, ForeignKey( "users.user_id", ondelete="CASCADE"), primary_key=True) region_id = Column(Integer, ForeignKey( "regions.region_id", ondelete="CASCADE"), primary_key=True) class UserKeyword(Base): __tablename__ = "user_keywords" user_id = Column(Integer, ForeignKey( "users.user_id", ondelete="CASCADE"), primary_key=True) keyword_id = Column(Integer, ForeignKey( "keywords.keyword_id", ondelete="CASCADE"), primary_key=True) class Log(Base): __tablename__ = "logs" id = Column(Integer, primary_key=True, autoincrement=True) page_url = Column(String(URL_LEN)) region = Column(String(SHORT_LEN)) keyword = Column(String(SHORT_LEN)) fetched_at = Column(DateTime) def _ensure_session() -> Session: global engine, SessionLocal if engine is None or SessionLocal is None: db_init() assert SessionLocal is not None return cast(Session, SessionLocal()) def db_init(): """Initialize MySQL database and create tables if needed.""" global engine, SessionLocal cfg = get_mysql_config() # Create database if it doesn't exist root_url = f"mysql+pymysql://{cfg['user']}:{cfg['password']}@{cfg['host']}:{cfg['port']}/" dbname = cfg["database"] root_engine = create_engine(root_url, future=True) with root_engine.begin() as conn: conn.execute(text( f"CREATE DATABASE IF NOT EXISTS `{dbname}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")) # Create tables in target DB mysql_url = f"mysql+pymysql://{cfg['user']}:{cfg['password']}@{cfg['host']}:{cfg['port']}/{dbname}?charset=utf8mb4" engine = create_engine(mysql_url, future=True) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True) Base.metadata.create_all(engine) # Ensure new auth columns exist for existing databases (MySQL/MariaDB support IF NOT EXISTS) with engine.begin() as conn: try: conn.execute(text( "ALTER TABLE users ADD COLUMN IF NOT EXISTS password_hash VARCHAR(255) NULL")) except Exception: pass try: conn.execute(text( "ALTER TABLE users ADD COLUMN IF NOT EXISTS is_admin TINYINT(1) NOT NULL DEFAULT 0")) except Exception: pass try: conn.execute(text( "ALTER TABLE users ADD COLUMN IF NOT EXISTS is_active TINYINT(1) NOT NULL DEFAULT 1")) except Exception: pass try: conn.execute( text("ALTER TABLE users ADD COLUMN IF NOT EXISTS last_login DATETIME NULL")) except Exception: pass def upsert_user_interaction(job_id: str | int, *, user_id: Optional[int] = None, seen_at: Optional[str] = None, url_visited: Optional[str] = None, is_user_favorite: Optional[bool] = None): """Upsert a single interaction row for this job. Any provided field will be updated; absent fields keep their current value. """ if user_id is None: user_id = get_or_create_user("anonymous") job_id_str = str(job_id) with _ensure_session() as session: ui = session.get(UserInteraction, { "job_id": job_id_str, "user_id": int(user_id)}) if ui is None: ui = UserInteraction(job_id=job_id_str, user_id=int(user_id)) session.add(ui) if seen_at is not None: setattr(ui, "seen_at", seen_at) if url_visited is not None: setattr(ui, "url_visited", url_visited) if is_user_favorite is not None: setattr(ui, "is_user_favorite", bool(is_user_favorite)) session.commit() def upsert_listing(*, url: str, region: str, keyword: str, title: str, pay: str, location: str, timestamp: str, fetched_from: str | None = None, fetched_at: Optional[datetime] = None): """Insert or update a job listing row based on job_id derived from URL.""" job_id = str(url_to_job_id(url)) with _ensure_session() as session: obj = session.get(JobListing, job_id) if obj is None: obj = JobListing(job_id=job_id) session.add(obj) setattr(obj, "url", url) setattr(obj, "region", region) setattr(obj, "keyword", keyword) setattr(obj, "title", title) setattr(obj, "pay", pay) setattr(obj, "location", location) setattr(obj, "timestamp", timestamp) session.commit() # Optionally record a fetch log for the listing if source provided if fetched_from: try: insert_log(fetched_from, region=region, keyword=keyword, fetched_at=fetched_at or datetime.now()) except Exception: pass def insert_log(page_url: str, region: str | None = None, keyword: str | None = None, fetched_at: Optional[datetime] = None): """Insert a log row for a fetched page.""" fetched_at = fetched_at or datetime.now() with _ensure_session() as session: l = Log(page_url=page_url, region=region or '', keyword=keyword or '', fetched_at=fetched_at) session.add(l) session.commit() def get_last_fetch_time(page_url: str) -> Optional[datetime]: """Return the latest fetched_at for a given page_url, or None if never fetched.""" with _ensure_session() as session: row = session.execute(text("SELECT fetched_at FROM logs WHERE page_url = :u ORDER BY fetched_at DESC LIMIT 1"), { "u": page_url}).fetchone() if row and row[0]: return row[0] return None def upsert_job_details(job_data: Dict[str, Any], region: str = "", keyword: str = ""): """Upsert into job_descriptions table using scraped job details dict. Behavior additions: - If the provided job `url` has a log entry with fetched_at less than 24 hours ago, the function will skip updating to avoid unnecessary work. - On successful upsert, a log entry is recorded with `insert_log(url, ...)`. """ url = job_data.get("url") job_id = normalize_job_id(job_data.get("id"), url) if not job_id: return # Skip if job page was fetched recently (24 hours) try: if isinstance(url, str) and url: last = get_last_fetch_time(url) if last is not None: # normalize tz-awareness from datetime import timezone as _tz now = datetime.now(_tz.utc) last_dt = last if getattr( last, 'tzinfo', None) is not None else last.replace(tzinfo=_tz.utc) if (now - last_dt).total_seconds() < 24 * 3600: return except Exception: # if log lookup fails, proceed normally pass title = job_data.get("title") or None company = job_data.get("company") or None location = job_data.get("location") or None description = job_data.get("description") or None posted_time = job_data.get("posted_time") or None job_id = str(job_id) with _ensure_session() as session: obj = session.get(JobDescription, job_id) if obj is None: obj = JobDescription(job_id=job_id) session.add(obj) setattr(obj, "title", title) setattr(obj, "company", company) setattr(obj, "location", location) setattr(obj, "description", description) setattr(obj, "posted_time", posted_time) setattr(obj, "url", url) session.commit() # Record that we fetched/updated this job page try: if isinstance(url, str) and url: insert_log(url, region=None, keyword=None, fetched_at=datetime.now()) except Exception: pass def db_get_keywords() -> List[str]: """Return a list of all unique keywords from job listings.""" with _ensure_session() as session: rows = session.execute( text("SELECT DISTINCT keyword FROM job_listings")).fetchall() return [r[0] for r in rows] def db_get_regions() -> List[str]: """Return a list of all unique regions from job listings.""" with _ensure_session() as session: rows = session.execute( text("SELECT DISTINCT region FROM job_listings")).fetchall() return [r[0] for r in rows] def get_all_jobs(): query = """ SELECT l.job_id ,l.title ,d.description ,l.region ,l.keyword ,d.company ,l.location ,l.timestamp ,d.posted_time ,l.url FROM job_listings AS l INNER JOIN job_descriptions AS d ON l.job_id = d.job_id AND l.url = d.url ORDER BY d.posted_time DESC """ with _ensure_session() as session: rows = session.execute(text(query)).fetchall() jobs = [] for row in rows: job = { "id": row[0], "title": row[1], "description": row[2].replace('\n', '
').strip(), "region": row[3], "keyword": row[4], "company": row[5], "location": row[6], "timestamp": row[7], "posted_time": row[8], "url": row[9], } jobs.append(job) return jobs def db_get_all_job_urls() -> List[dict]: """Return list of job URLs from job_listings. Returns: - List of dicts with keys: url, region, keyword """ with _ensure_session() as session: rows = session.execute( text("SELECT url, region, keyword FROM job_listings")).fetchall() return [{"url": r[0], "region": r[1], "keyword": r[2]} for r in rows] def db_delete_job(job_id: str | int): """Delete a job row (cascades to details and interactions).""" jid = str(job_id) with _ensure_session() as session: obj = session.get(JobListing, jid) if obj: session.delete(obj) session.commit() def remove_job(url): """Remove a job from the database.""" try: jid = url_to_job_id(url) db_delete_job(jid) except Exception: pass # ---------------- New ORM convenience helpers ------------------------------ def get_or_create_user(username: str) -> int: """Return user_id for username, creating if missing.""" # 2025-08-30T16:04:29.660245+00:00 is wrong. should be 2025-08-30T16:04:29 created_at = datetime.now(UTC).isoformat().split('.')[0] with _ensure_session() as session: row = session.execute( text("SELECT user_id FROM users WHERE username = :u"), { "u": username} ).fetchone() if row: return int(row[0]) session.execute( text("INSERT INTO users(username, created_at) VALUES(:u, :c)"), {"u": username, "c": created_at}, ) session.commit() # open a new session to fetch the id with _ensure_session() as session: row2 = session.execute( text("SELECT user_id FROM users WHERE username = :u"), { "u": username} ).fetchone() if row2: return int(row2[0]) # Edge case retry return get_or_create_user(username) def mark_favorite(job_id: str | int, username: str, favorite: bool = True): user_id = get_or_create_user(username) upsert_user_interaction(job_id, user_id=user_id, is_user_favorite=favorite) def record_visit(job_id: str | int, username: str, url: Optional[str] = None): user_id = get_or_create_user(username) ts = now_iso() upsert_user_interaction(job_id, user_id=user_id, seen_at=ts, url_visited=url) # ---------------- User auth/admin helpers ---------------------------------- def create_or_update_user(username: str, password: Optional[str] = None, *, is_admin: Optional[bool] = None, is_active: Optional[bool] = None) -> int: """Create user if missing; update password/admin/active if provided. Returns user_id.""" username = (username or "").strip() if not username: raise ValueError("username required") uid = get_or_create_user(username) with _ensure_session() as session: # Build dynamic update fields = [] params: Dict[str, Any] = {"u": uid} if password is not None: fields.append("password_hash = :ph") params["ph"] = generate_password_hash(password) if is_admin is not None: fields.append("is_admin = :ia") params["ia"] = 1 if is_admin else 0 if is_active is not None: fields.append("is_active = :ac") params["ac"] = 1 if is_active else 0 if fields: q = f"UPDATE users SET {', '.join(fields)} WHERE user_id = :u" session.execute(text(q), params) session.commit() return uid def set_user_password(username: str, password: str) -> None: create_or_update_user(username, password=password) def set_user_admin(username: str, is_admin: bool) -> None: create_or_update_user(username, is_admin=is_admin) def set_user_active(username: str, is_active: bool) -> None: create_or_update_user(username, is_active=is_active) def verify_user_credentials(username: str, password: str) -> bool: """Validate username/password against stored password_hash.""" with _ensure_session() as session: row = session.execute(text("SELECT password_hash, is_active FROM users WHERE username = :u"), { "u": username}).fetchone() if not row: return False ph, active = row[0], bool(row[1]) if not active or not ph: return False ok = check_password_hash(ph, password) if ok: # record last_login try: session.execute(text("UPDATE users SET last_login = :ts WHERE username = :u"), { "ts": datetime.now(UTC), "u": username}) session.commit() except Exception: pass return ok def get_users() -> List[Dict[str, Any]]: with _ensure_session() as session: rows = session.execute(text( "SELECT user_id, username, created_at, is_admin, is_active, last_login, (password_hash IS NOT NULL) AS has_pw FROM users ORDER BY username ASC")).fetchall() out: List[Dict[str, Any]] = [] for r in rows: out.append({ "user_id": int(r[0]), "username": r[1], "created_at": r[2].isoformat() if isinstance(r[2], datetime) else (r[2] or None), "is_admin": bool(r[3]), "is_active": bool(r[4]), "last_login": r[5].isoformat() if r[5] else None, "has_password": bool(r[6]), }) return out def get_user(username: str) -> Optional[Dict[str, Any]]: """Return single user dict or None.""" with _ensure_session() as session: row = session.execute(text( "SELECT user_id, username, created_at, is_admin, is_active, last_login, (password_hash IS NOT NULL) AS has_pw FROM users WHERE username = :u" ), {"u": username}).fetchone() if not row: return None return { "user_id": int(row[0]), "username": row[1], "created_at": row[2].isoformat() if isinstance(row[2], datetime) else (row[2] or None), "is_admin": bool(row[3]), "is_active": bool(row[4]), "last_login": row[5].isoformat() if row[5] else None, "has_password": bool(row[6]), } def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]: """Return single user dict or None.""" with _ensure_session() as session: row = session.execute(text( "SELECT user_id, username, created_at, is_admin, is_active, last_login, (password_hash IS NOT NULL) AS has_pw FROM users WHERE user_id = :u" ), {"u": user_id}).fetchone() if not row: return None return { "user_id": int(row[0]), "username": row[1], "created_at": row[2].isoformat() if isinstance(row[2], datetime) else (row[2] or None), "is_admin": bool(row[3]), "is_active": bool(row[4]), "last_login": row[5].isoformat() if row[5] else None, "has_password": bool(row[6]), } def delete_user_by_id(user_id: int) -> bool: with _ensure_session() as session: result = session.execute( text("DELETE FROM users WHERE user_id = :u"), {"u": user_id}) session.commit() rc = getattr(result, 'rowcount', None) if rc is None: # Unable to determine rowcount; assume success if no exception return True return rc > 0 # ---------------- Regions/Keywords helpers --------------------------------- def upsert_region(name: str) -> int: """Get or create a region by name; return region_id.""" name = (name or "").strip() if not name: raise ValueError("Region name cannot be empty") with _ensure_session() as session: row = session.execute(text("SELECT region_id FROM regions WHERE name = :n"), { "n": name}).fetchone() if row: return int(row[0]) session.execute( text("INSERT INTO regions(name) VALUES (:n)"), {"n": name}) session.commit() with _ensure_session() as session: row2 = session.execute(text("SELECT region_id FROM regions WHERE name = :n"), { "n": name}).fetchone() if row2: return int(row2[0]) # unlikely retry return upsert_region(name) def upsert_keyword(name: str) -> int: """Get or create a keyword by name; return keyword_id.""" name = (name or "").strip() if not name: raise ValueError("Keyword name cannot be empty") with _ensure_session() as session: row = session.execute(text("SELECT keyword_id FROM keywords WHERE name = :n"), { "n": name}).fetchone() if row: return int(row[0]) session.execute( text("INSERT INTO keywords(name) VALUES (:n)"), {"n": name}) session.commit() with _ensure_session() as session: row2 = session.execute(text("SELECT keyword_id FROM keywords WHERE name = :n"), { "n": name}).fetchone() if row2: return int(row2[0]) return upsert_keyword(name) def set_user_regions(username: str, region_names: List[str]) -> None: """Replace user's preferred regions with given names.""" user_id = get_or_create_user(username) # Normalize and get ids names = sorted({(n or "").strip() for n in region_names if (n or "").strip()}) region_ids: List[int] = [upsert_region(n) for n in names] if not region_ids and not names: # Clear all if explicitly empty list with _ensure_session() as session: session.execute( text("DELETE FROM user_regions WHERE user_id = :u"), {"u": user_id}) session.commit() return desired = set(region_ids) with _ensure_session() as session: rows = session.execute(text("SELECT region_id FROM user_regions WHERE user_id = :u"), { "u": user_id}).fetchall() current = set(int(r[0]) for r in rows) to_add = desired - current to_remove = current - desired for rid in to_remove: session.execute(text("DELETE FROM user_regions WHERE user_id = :u AND region_id = :r"), { "u": user_id, "r": int(rid)}) for rid in to_add: session.execute(text("INSERT INTO user_regions(user_id, region_id) VALUES(:u, :r)"), { "u": user_id, "r": int(rid)}) session.commit() def set_user_keywords(username: str, keyword_names: List[str]) -> None: """Replace user's preferred keywords with given names.""" user_id = get_or_create_user(username) names = sorted({(n or "").strip() for n in keyword_names if (n or "").strip()}) keyword_ids: List[int] = [upsert_keyword(n) for n in names] if not keyword_ids and not names: with _ensure_session() as session: session.execute( text("DELETE FROM user_keywords WHERE user_id = :u"), {"u": user_id}) session.commit() return desired = set(keyword_ids) with _ensure_session() as session: rows = session.execute(text("SELECT keyword_id FROM user_keywords WHERE user_id = :u"), { "u": user_id}).fetchall() current = set(int(r[0]) for r in rows) to_add = desired - current to_remove = current - desired for kid in to_remove: session.execute(text("DELETE FROM user_keywords WHERE user_id = :u AND keyword_id = :k"), { "u": user_id, "k": int(kid)}) for kid in to_add: session.execute(text("INSERT INTO user_keywords(user_id, keyword_id) VALUES(:u, :k)"), { "u": user_id, "k": int(kid)}) session.commit() def get_user_regions(username: str) -> List[Dict[str, str]]: """Return preferred region names for a user (empty if none).""" with _ensure_session() as session: row = session.execute(text("SELECT user_id FROM users WHERE username = :u"), { "u": username}).fetchone() if not row: return [] user_id = int(row[0]) rows = session.execute(text( """ SELECT r.name, r.color FROM regions r INNER JOIN user_regions ur ON ur.region_id = r.region_id WHERE ur.user_id = :u ORDER BY r.name ASC """ ), {"u": user_id}).fetchall() return [{"name": r[0], "color": r[1]} for r in rows] def get_user_keywords(username: str) -> List[Dict[str, str]]: """Return preferred keyword names for a user (empty if none).""" with _ensure_session() as session: row = session.execute(text("SELECT user_id FROM users WHERE username = :u"), { "u": username}).fetchone() if not row: return [] user_id = int(row[0]) rows = session.execute(text( """ SELECT k.name, k.color FROM keywords k INNER JOIN user_keywords uk ON uk.keyword_id = k.keyword_id WHERE uk.user_id = :u ORDER BY k.name ASC """ ), {"u": user_id}).fetchall() return [{"name": r[0], "color": r[1]} for r in rows] def get_all_regions() -> List[Dict[str, str]]: """Return all region names from regions table (sorted).""" with _ensure_session() as session: rows = session.execute( text("SELECT name, color FROM regions ORDER BY name ASC")).fetchall() return [{"name": r[0], "color": r[1]} for r in rows] def get_all_keywords() -> List[Dict[str, str]]: """Return all keyword names from keywords table (sorted).""" with _ensure_session() as session: rows = session.execute( text("SELECT name, color FROM keywords ORDER BY name ASC")).fetchall() return [{"name": r[0], "color": r[1]} for r in rows] def seed_regions_keywords_from_listings() -> Dict[str, int]: """Seed regions/keywords tables from distinct values in job_listings if empty. Returns dict with counts inserted: {"regions": n1, "keywords": n2}. """ inserted = {"regions": 0, "keywords": 0} with _ensure_session() as session: # Regions existing_regions = session.execute( text("SELECT COUNT(*) FROM regions")).scalar_one() if int(existing_regions or 0) == 0: rows = session.execute(text( "SELECT DISTINCT region FROM job_listings WHERE region IS NOT NULL AND region != ''")).fetchall() for r in rows: name = r[0] if name: try: session.execute( text("INSERT IGNORE INTO regions(name, color) VALUES(:n, :c)"), {"n": name, "c": get_color_from_string(name)}) inserted["regions"] += 1 except Exception: pass session.commit() # Keywords existing_keywords = session.execute( text("SELECT COUNT(*) FROM keywords")).scalar_one() if int(existing_keywords or 0) == 0: rows = session.execute(text( "SELECT DISTINCT keyword FROM job_listings WHERE keyword IS NOT NULL AND keyword != ''")).fetchall() for r in rows: name = r[0] if name: try: session.execute( text("INSERT IGNORE INTO keywords(name, color) VALUES(:n, :c)"), {"n": name, "c": get_color_from_string(name)}) inserted["keywords"] += 1 except Exception: pass session.commit() return inserted def list_regions_full() -> List[Dict[str, Any]]: with _ensure_session() as session: rows = session.execute( text("SELECT region_id, name, color FROM regions ORDER BY name ASC")).fetchall() return [{"region_id": int(r[0]), "name": r[1], "color": r[2]} for r in rows] def list_keywords_full() -> List[Dict[str, Any]]: with _ensure_session() as session: rows = session.execute( text("SELECT keyword_id, name, color FROM keywords ORDER BY name ASC")).fetchall() return [{"keyword_id": int(r[0]), "name": r[1], "color": r[2]} for r in rows] def rename_region(region_id: int, new_name: str) -> bool: new_name = (new_name or "").strip() if not new_name: raise ValueError("new_name required") with _ensure_session() as session: try: session.execute(text("UPDATE regions SET name = :n WHERE region_id = :id"), { "n": new_name, "id": int(region_id)}) session.commit() return True except Exception: session.rollback() return False def rename_keyword(keyword_id: int, new_name: str) -> bool: new_name = (new_name or "").strip() if not new_name: raise ValueError("new_name required") with _ensure_session() as session: try: session.execute(text("UPDATE keywords SET name = :n WHERE keyword_id = :id"), { "n": new_name, "id": int(keyword_id)}) session.commit() return True except Exception: session.rollback() return False def change_region_color(region_id: int, new_color: str) -> bool: new_color = (new_color or "").strip() if not new_color: raise ValueError("new_color required") with _ensure_session() as session: try: session.execute(text("UPDATE regions SET color = :c WHERE region_id = :id"), { "c": new_color, "id": int(region_id)}) session.commit() return True except Exception: session.rollback() return False def change_keyword_color(keyword_id: int, new_color: str) -> bool: new_color = (new_color or "").strip() if not new_color: raise ValueError("new_color required") with _ensure_session() as session: try: session.execute(text("UPDATE keywords SET color = :c WHERE keyword_id = :id"), { "c": new_color, "id": int(keyword_id)}) session.commit() return True except Exception: session.rollback() return False def stats_overview() -> Dict[str, Any]: """Return an overview of job DB statistics. Returns a dict with keys: - total_jobs: int - total_keywords: int (distinct keywords in listings) - total_regions: int (distinct regions in listings) - jobs_per_keyword: List[{"keyword": str, "count": int}] - jobs_per_region: List[{"region": str, "count": int}] """ with _ensure_session() as session: total_jobs = session.execute(text( "SELECT COUNT(*) FROM job_listings l INNER JOIN job_descriptions d ON l.job_id = d.job_id AND l.url = d.url" )).scalar_one() total_keywords = session.execute(text( "SELECT COUNT(DISTINCT keyword) FROM job_listings WHERE keyword IS NOT NULL AND keyword != ''" )).scalar_one() total_regions = session.execute(text( "SELECT COUNT(DISTINCT region) FROM job_listings WHERE region IS NOT NULL AND region != ''" )).scalar_one() rows = session.execute(text( "SELECT COALESCE(keyword, '') AS keyword, COUNT(*) as cnt FROM job_listings l INNER JOIN job_descriptions d ON l.job_id = d.job_id AND l.url = d.url GROUP BY keyword ORDER BY cnt DESC" )).fetchall() jobs_per_keyword = [ {"keyword": r[0], "count": int(r[1])} for r in rows] rows = session.execute(text( "SELECT COALESCE(region, '') AS region, COUNT(*) as cnt FROM job_listings l INNER JOIN job_descriptions d ON l.job_id = d.job_id AND l.url = d.url GROUP BY region ORDER BY cnt DESC" )).fetchall() jobs_per_region = [{"region": r[0], "count": int(r[1])} for r in rows] return { "total_jobs": int(total_jobs or 0), "total_keywords": int(total_keywords or 0), "total_regions": int(total_regions or 0), "jobs_per_keyword": jobs_per_keyword, "jobs_per_region": jobs_per_region, }