786 lines
29 KiB
Python
786 lines
29 KiB
Python
from __future__ import annotations
|
|
|
|
"""MySQL persistence layer for Craigslist scraper (SQLAlchemy ORM only).
|
|
|
|
Tables:
|
|
- users(user_id PK, username UNIQUE, created_at)
|
|
- job_listings(job_id PK, url UNIQUE, region, keyword, title, pay, location, timestamp)
|
|
- job_descriptions(job_id PK FK -> job_listings, title, company, location, description, posted_time, url)
|
|
- user_interactions(job_id PK FK -> job_listings, user_id FK -> users, seen_at, url_visited, is_user_favorite)
|
|
- regions(region_id PK, name UNIQUE)
|
|
- keywords(keyword_id PK, name UNIQUE)
|
|
- user_regions(user_id FK -> users, region_id FK -> regions, composite PK)
|
|
- user_keywords(user_id FK -> users, keyword_id FK -> keywords, composite PK)
|
|
"""
|
|
|
|
from datetime import datetime, UTC
|
|
import os
|
|
from typing import Optional, Dict, Any, List
|
|
from web.utils import (
|
|
get_url_from_filename,
|
|
get_color_from_string,
|
|
url_to_job_id,
|
|
normalize_job_id,
|
|
now_iso,
|
|
get_mysql_config,
|
|
)
|
|
|
|
# --- SQLAlchemy setup -------------------------------------------------------
|
|
from sqlalchemy import (
|
|
create_engine,
|
|
Column,
|
|
String,
|
|
Integer,
|
|
Text,
|
|
DateTime,
|
|
Boolean,
|
|
ForeignKey,
|
|
text,
|
|
)
|
|
from sqlalchemy.orm import declarative_base, relationship, sessionmaker, Session
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
from typing import cast
|
|
|
|
engine = None # set in db_init()
|
|
SessionLocal: Optional[sessionmaker] = None
|
|
Base = declarative_base()
|
|
|
|
# Length constants for MySQL compatibility
|
|
JOB_ID_LEN = 64
|
|
URL_LEN = 512
|
|
FILE_PATH_LEN = 512
|
|
TITLE_LEN = 512
|
|
SHORT_LEN = 255
|
|
TIME_LEN = 64
|
|
|
|
|
|
# --- ORM Models --------------------------------------------------------------
|
|
class User(Base):
|
|
__tablename__ = "users"
|
|
user_id = Column(Integer, primary_key=True, autoincrement=True)
|
|
username = Column(String(SHORT_LEN), unique=True, nullable=False)
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
password_hash = Column(String(SHORT_LEN))
|
|
is_admin = Column(Boolean, default=False, nullable=False)
|
|
is_active = Column(Boolean, default=True, nullable=False)
|
|
last_login = Column(DateTime, nullable=True)
|
|
|
|
interactions = relationship(
|
|
"UserInteraction", back_populates="user", cascade="all, delete-orphan")
|
|
|
|
|
|
class JobListing(Base):
|
|
__tablename__ = "job_listings"
|
|
job_id = Column(String(JOB_ID_LEN), primary_key=True)
|
|
url = Column(String(URL_LEN), unique=True)
|
|
region = Column(String(SHORT_LEN))
|
|
keyword = Column(String(SHORT_LEN))
|
|
title = Column(String(TITLE_LEN))
|
|
pay = Column(String(SHORT_LEN))
|
|
location = Column(String(SHORT_LEN))
|
|
timestamp = Column(String(TIME_LEN))
|
|
|
|
description = relationship(
|
|
"JobDescription", back_populates="listing", uselist=False, cascade="all, delete-orphan")
|
|
interactions = relationship(
|
|
"UserInteraction", back_populates="listing", cascade="all, delete-orphan")
|
|
|
|
|
|
class JobDescription(Base):
|
|
__tablename__ = "job_descriptions"
|
|
job_id = Column(String(JOB_ID_LEN), ForeignKey("job_listings.job_id",
|
|
ondelete="CASCADE"), primary_key=True)
|
|
title = Column(String(TITLE_LEN))
|
|
company = Column(String(SHORT_LEN))
|
|
location = Column(String(SHORT_LEN))
|
|
description = Column(Text)
|
|
posted_time = Column(String(TIME_LEN))
|
|
url = Column(String(URL_LEN))
|
|
|
|
listing = relationship("JobListing", back_populates="description")
|
|
|
|
|
|
class UserInteraction(Base):
|
|
__tablename__ = "user_interactions"
|
|
# composite uniqueness on (user_id, job_id)
|
|
job_id = Column(String(JOB_ID_LEN), ForeignKey("job_listings.job_id",
|
|
ondelete="CASCADE"), primary_key=True)
|
|
user_id = Column(Integer, ForeignKey(
|
|
"users.user_id", ondelete="CASCADE"), primary_key=True)
|
|
seen_at = Column(String(TIME_LEN))
|
|
url_visited = Column(String(URL_LEN))
|
|
is_user_favorite = Column(Boolean, default=False)
|
|
|
|
user = relationship("User", back_populates="interactions")
|
|
listing = relationship("JobListing", back_populates="interactions")
|
|
|
|
|
|
# --- New preference models: regions, keywords, and user mappings ----------
|
|
class Region(Base):
|
|
__tablename__ = "regions"
|
|
region_id = Column(Integer, primary_key=True, autoincrement=True)
|
|
name = Column(String(SHORT_LEN), unique=True, nullable=False)
|
|
color = Column(String(SHORT_LEN), nullable=True)
|
|
|
|
|
|
class Keyword(Base):
|
|
__tablename__ = "keywords"
|
|
keyword_id = Column(Integer, primary_key=True, autoincrement=True)
|
|
name = Column(String(SHORT_LEN), unique=True, nullable=False)
|
|
color = Column(String(SHORT_LEN), nullable=True)
|
|
|
|
|
|
class UserRegion(Base):
|
|
__tablename__ = "user_regions"
|
|
user_id = Column(Integer, ForeignKey(
|
|
"users.user_id", ondelete="CASCADE"), primary_key=True)
|
|
region_id = Column(Integer, ForeignKey(
|
|
"regions.region_id", ondelete="CASCADE"), primary_key=True)
|
|
|
|
|
|
class UserKeyword(Base):
|
|
__tablename__ = "user_keywords"
|
|
user_id = Column(Integer, ForeignKey(
|
|
"users.user_id", ondelete="CASCADE"), primary_key=True)
|
|
keyword_id = Column(Integer, ForeignKey(
|
|
"keywords.keyword_id", ondelete="CASCADE"), primary_key=True)
|
|
|
|
|
|
def _ensure_session() -> Session:
|
|
global engine, SessionLocal
|
|
if engine is None or SessionLocal is None:
|
|
db_init()
|
|
assert SessionLocal is not None
|
|
return cast(Session, SessionLocal())
|
|
|
|
|
|
def db_init():
|
|
"""Initialize MySQL database and create tables if needed."""
|
|
global engine, SessionLocal
|
|
cfg = get_mysql_config()
|
|
# Create database if it doesn't exist
|
|
root_url = f"mysql+pymysql://{cfg['user']}:{cfg['password']}@{cfg['host']}:{cfg['port']}/"
|
|
dbname = cfg["database"]
|
|
root_engine = create_engine(root_url, future=True)
|
|
with root_engine.begin() as conn:
|
|
conn.execute(text(
|
|
f"CREATE DATABASE IF NOT EXISTS `{dbname}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"))
|
|
# Create tables in target DB
|
|
mysql_url = f"mysql+pymysql://{cfg['user']}:{cfg['password']}@{cfg['host']}:{cfg['port']}/{dbname}?charset=utf8mb4"
|
|
engine = create_engine(mysql_url, future=True)
|
|
SessionLocal = sessionmaker(bind=engine, autoflush=False,
|
|
autocommit=False, future=True)
|
|
Base.metadata.create_all(engine)
|
|
# Ensure new auth columns exist for existing databases (MySQL/MariaDB support IF NOT EXISTS)
|
|
with engine.begin() as conn:
|
|
try:
|
|
conn.execute(text(
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS password_hash VARCHAR(255) NULL"))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
conn.execute(text(
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS is_admin TINYINT(1) NOT NULL DEFAULT 0"))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
conn.execute(text(
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS is_active TINYINT(1) NOT NULL DEFAULT 1"))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
conn.execute(
|
|
text("ALTER TABLE users ADD COLUMN IF NOT EXISTS last_login DATETIME NULL"))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def upsert_user_interaction(job_id: str | int, *, user_id: Optional[int] = None, seen_at: Optional[str] = None, url_visited: Optional[str] = None, is_user_favorite: Optional[bool] = None):
|
|
"""Upsert a single interaction row for this job.
|
|
Any provided field will be updated; absent fields keep their current value.
|
|
"""
|
|
if user_id is None:
|
|
user_id = get_or_create_user("anonymous")
|
|
job_id_str = str(job_id)
|
|
with _ensure_session() as session:
|
|
ui = session.get(UserInteraction, {
|
|
"job_id": job_id_str, "user_id": int(user_id)})
|
|
if ui is None:
|
|
ui = UserInteraction(job_id=job_id_str, user_id=int(user_id))
|
|
session.add(ui)
|
|
if seen_at is not None:
|
|
setattr(ui, "seen_at", seen_at)
|
|
if url_visited is not None:
|
|
setattr(ui, "url_visited", url_visited)
|
|
if is_user_favorite is not None:
|
|
setattr(ui, "is_user_favorite", bool(is_user_favorite))
|
|
session.commit()
|
|
|
|
|
|
def upsert_listing(*, url: str, region: str, keyword: str, title: str, pay: str, location: str, timestamp: str):
|
|
"""Insert or update a job listing row based on job_id derived from URL."""
|
|
job_id = str(url_to_job_id(url))
|
|
with _ensure_session() as session:
|
|
obj = session.get(JobListing, job_id)
|
|
if obj is None:
|
|
obj = JobListing(job_id=job_id)
|
|
session.add(obj)
|
|
setattr(obj, "url", url)
|
|
setattr(obj, "region", region)
|
|
setattr(obj, "keyword", keyword)
|
|
setattr(obj, "title", title)
|
|
setattr(obj, "pay", pay)
|
|
setattr(obj, "location", location)
|
|
setattr(obj, "timestamp", timestamp)
|
|
session.commit()
|
|
|
|
|
|
def upsert_job_details(job_data: Dict[str, Any]):
|
|
"""Upsert into job_descriptions table using scraped job details dict."""
|
|
url = job_data.get("url")
|
|
job_id = normalize_job_id(job_data.get("id"), url)
|
|
if not job_id:
|
|
return
|
|
title = job_data.get("title") or None
|
|
company = job_data.get("company") or None
|
|
location = job_data.get("location") or None
|
|
description = job_data.get("description") or None
|
|
posted_time = job_data.get("posted_time") or None
|
|
|
|
job_id = str(job_id)
|
|
with _ensure_session() as session:
|
|
obj = session.get(JobDescription, job_id)
|
|
if obj is None:
|
|
obj = JobDescription(job_id=job_id)
|
|
session.add(obj)
|
|
setattr(obj, "title", title)
|
|
setattr(obj, "company", company)
|
|
setattr(obj, "location", location)
|
|
setattr(obj, "description", description)
|
|
setattr(obj, "posted_time", posted_time)
|
|
setattr(obj, "url", url)
|
|
session.commit()
|
|
|
|
|
|
def db_get_keywords() -> List[str]:
|
|
"""Return a list of all unique keywords from job listings."""
|
|
with _ensure_session() as session:
|
|
rows = session.execute(
|
|
text("SELECT DISTINCT keyword FROM job_listings")).fetchall()
|
|
return [r[0] for r in rows]
|
|
|
|
|
|
def db_get_regions() -> List[str]:
|
|
"""Return a list of all unique regions from job listings."""
|
|
with _ensure_session() as session:
|
|
rows = session.execute(
|
|
text("SELECT DISTINCT region FROM job_listings")).fetchall()
|
|
return [r[0] for r in rows]
|
|
|
|
|
|
def get_all_jobs():
|
|
query = """
|
|
SELECT l.job_id
|
|
,l.title
|
|
,d.description
|
|
,l.region
|
|
,l.keyword
|
|
,d.company
|
|
,l.location
|
|
,l.timestamp
|
|
,d.posted_time
|
|
,l.url
|
|
FROM job_listings AS l
|
|
INNER JOIN job_descriptions AS d
|
|
ON l.job_id = d.job_id
|
|
AND l.url = d.url
|
|
ORDER BY d.posted_time DESC
|
|
"""
|
|
with _ensure_session() as session:
|
|
rows = session.execute(text(query)).fetchall()
|
|
jobs = []
|
|
for row in rows:
|
|
job = {
|
|
"id": row[0],
|
|
"title": row[1],
|
|
"description": row[2].replace('\n', '<br />').strip(),
|
|
"region": row[3],
|
|
"keyword": row[4],
|
|
"company": row[5],
|
|
"location": row[6],
|
|
"timestamp": row[7],
|
|
"posted_time": row[8],
|
|
"url": row[9],
|
|
}
|
|
jobs.append(job)
|
|
return jobs
|
|
|
|
|
|
def db_get_all_job_urls() -> List[str]:
|
|
"""Return list of job URLs from job_listings."""
|
|
with _ensure_session() as session:
|
|
rows = session.execute(text("SELECT url FROM job_listings")).fetchall()
|
|
return [r[0] for r in rows]
|
|
|
|
|
|
def db_delete_job(job_id: str | int):
|
|
"""Delete a job row (cascades to details and interactions)."""
|
|
jid = str(job_id)
|
|
with _ensure_session() as session:
|
|
obj = session.get(JobListing, jid)
|
|
if obj:
|
|
session.delete(obj)
|
|
session.commit()
|
|
|
|
|
|
def remove_job(url):
|
|
"""Remove a job from the database."""
|
|
try:
|
|
jid = url_to_job_id(url)
|
|
db_delete_job(jid)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ---------------- New ORM convenience helpers ------------------------------
|
|
|
|
def get_or_create_user(username: str) -> int:
|
|
"""Return user_id for username, creating if missing."""
|
|
# 2025-08-30T16:04:29.660245+00:00 is wrong. should be 2025-08-30T16:04:29
|
|
created_at = datetime.now(UTC).isoformat().split('.')[0]
|
|
with _ensure_session() as session:
|
|
row = session.execute(
|
|
text("SELECT user_id FROM users WHERE username = :u"), {
|
|
"u": username}
|
|
).fetchone()
|
|
if row:
|
|
return int(row[0])
|
|
session.execute(
|
|
text("INSERT INTO users(username, created_at) VALUES(:u, :c)"),
|
|
{"u": username, "c": created_at},
|
|
)
|
|
session.commit()
|
|
# open a new session to fetch the id
|
|
with _ensure_session() as session:
|
|
row2 = session.execute(
|
|
text("SELECT user_id FROM users WHERE username = :u"), {
|
|
"u": username}
|
|
).fetchone()
|
|
if row2:
|
|
return int(row2[0])
|
|
# Edge case retry
|
|
return get_or_create_user(username)
|
|
|
|
|
|
def mark_favorite(job_id: str | int, username: str, favorite: bool = True):
|
|
user_id = get_or_create_user(username)
|
|
upsert_user_interaction(job_id, user_id=user_id, is_user_favorite=favorite)
|
|
|
|
|
|
def record_visit(job_id: str | int, username: str, url: Optional[str] = None):
|
|
user_id = get_or_create_user(username)
|
|
ts = now_iso()
|
|
upsert_user_interaction(job_id, user_id=user_id,
|
|
seen_at=ts, url_visited=url)
|
|
|
|
|
|
# ---------------- User auth/admin helpers ----------------------------------
|
|
def create_or_update_user(username: str, password: Optional[str] = None, *, is_admin: Optional[bool] = None, is_active: Optional[bool] = None) -> int:
|
|
"""Create user if missing; update password/admin/active if provided. Returns user_id."""
|
|
username = (username or "").strip()
|
|
if not username:
|
|
raise ValueError("username required")
|
|
uid = get_or_create_user(username)
|
|
with _ensure_session() as session:
|
|
# Build dynamic update
|
|
fields = []
|
|
params: Dict[str, Any] = {"u": uid}
|
|
if password is not None:
|
|
fields.append("password_hash = :ph")
|
|
params["ph"] = generate_password_hash(password)
|
|
if is_admin is not None:
|
|
fields.append("is_admin = :ia")
|
|
params["ia"] = 1 if is_admin else 0
|
|
if is_active is not None:
|
|
fields.append("is_active = :ac")
|
|
params["ac"] = 1 if is_active else 0
|
|
if fields:
|
|
q = f"UPDATE users SET {', '.join(fields)} WHERE user_id = :u"
|
|
session.execute(text(q), params)
|
|
session.commit()
|
|
return uid
|
|
|
|
|
|
def set_user_password(username: str, password: str) -> None:
|
|
create_or_update_user(username, password=password)
|
|
|
|
|
|
def set_user_admin(username: str, is_admin: bool) -> None:
|
|
create_or_update_user(username, is_admin=is_admin)
|
|
|
|
|
|
def set_user_active(username: str, is_active: bool) -> None:
|
|
create_or_update_user(username, is_active=is_active)
|
|
|
|
|
|
def verify_user_credentials(username: str, password: str) -> bool:
|
|
"""Validate username/password against stored password_hash."""
|
|
with _ensure_session() as session:
|
|
row = session.execute(text("SELECT password_hash, is_active FROM users WHERE username = :u"), {
|
|
"u": username}).fetchone()
|
|
if not row:
|
|
return False
|
|
ph, active = row[0], bool(row[1])
|
|
if not active or not ph:
|
|
return False
|
|
ok = check_password_hash(ph, password)
|
|
if ok:
|
|
# record last_login
|
|
try:
|
|
session.execute(text("UPDATE users SET last_login = :ts WHERE username = :u"), {
|
|
"ts": datetime.now(UTC), "u": username})
|
|
session.commit()
|
|
except Exception:
|
|
pass
|
|
return ok
|
|
|
|
|
|
def get_users() -> List[Dict[str, Any]]:
|
|
with _ensure_session() as session:
|
|
rows = session.execute(text(
|
|
"SELECT user_id, username, created_at, is_admin, is_active, last_login, (password_hash IS NOT NULL) AS has_pw FROM users ORDER BY username ASC")).fetchall()
|
|
out: List[Dict[str, Any]] = []
|
|
for r in rows:
|
|
out.append({
|
|
"user_id": int(r[0]),
|
|
"username": r[1],
|
|
"created_at": r[2].isoformat() if isinstance(r[2], datetime) else (r[2] or None),
|
|
"is_admin": bool(r[3]),
|
|
"is_active": bool(r[4]),
|
|
"last_login": r[5].isoformat() if r[5] else None,
|
|
"has_password": bool(r[6]),
|
|
})
|
|
return out
|
|
|
|
|
|
def get_user(username: str) -> Optional[Dict[str, Any]]:
|
|
"""Return single user dict or None."""
|
|
with _ensure_session() as session:
|
|
row = session.execute(text(
|
|
"SELECT user_id, username, created_at, is_admin, is_active, last_login, (password_hash IS NOT NULL) AS has_pw FROM users WHERE username = :u"
|
|
), {"u": username}).fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"user_id": int(row[0]),
|
|
"username": row[1],
|
|
"created_at": row[2].isoformat() if isinstance(row[2], datetime) else (row[2] or None),
|
|
"is_admin": bool(row[3]),
|
|
"is_active": bool(row[4]),
|
|
"last_login": row[5].isoformat() if row[5] else None,
|
|
"has_password": bool(row[6]),
|
|
}
|
|
|
|
|
|
def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Return single user dict or None."""
|
|
with _ensure_session() as session:
|
|
row = session.execute(text(
|
|
"SELECT user_id, username, created_at, is_admin, is_active, last_login, (password_hash IS NOT NULL) AS has_pw FROM users WHERE user_id = :u"
|
|
), {"u": user_id}).fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"user_id": int(row[0]),
|
|
"username": row[1],
|
|
"created_at": row[2].isoformat() if isinstance(row[2], datetime) else (row[2] or None),
|
|
"is_admin": bool(row[3]),
|
|
"is_active": bool(row[4]),
|
|
"last_login": row[5].isoformat() if row[5] else None,
|
|
"has_password": bool(row[6]),
|
|
}
|
|
|
|
|
|
def delete_user_by_id(user_id: int) -> bool:
|
|
with _ensure_session() as session:
|
|
result = session.execute(
|
|
text("DELETE FROM users WHERE user_id = :u"), {"u": user_id})
|
|
session.commit()
|
|
return result.rowcount > 0
|
|
|
|
# ---------------- Regions/Keywords helpers ---------------------------------
|
|
|
|
|
|
def upsert_region(name: str) -> int:
|
|
"""Get or create a region by name; return region_id."""
|
|
name = (name or "").strip()
|
|
if not name:
|
|
raise ValueError("Region name cannot be empty")
|
|
with _ensure_session() as session:
|
|
row = session.execute(text("SELECT region_id FROM regions WHERE name = :n"), {
|
|
"n": name}).fetchone()
|
|
if row:
|
|
return int(row[0])
|
|
session.execute(
|
|
text("INSERT INTO regions(name) VALUES (:n)"), {"n": name})
|
|
session.commit()
|
|
with _ensure_session() as session:
|
|
row2 = session.execute(text("SELECT region_id FROM regions WHERE name = :n"), {
|
|
"n": name}).fetchone()
|
|
if row2:
|
|
return int(row2[0])
|
|
# unlikely retry
|
|
return upsert_region(name)
|
|
|
|
|
|
def upsert_keyword(name: str) -> int:
|
|
"""Get or create a keyword by name; return keyword_id."""
|
|
name = (name or "").strip()
|
|
if not name:
|
|
raise ValueError("Keyword name cannot be empty")
|
|
with _ensure_session() as session:
|
|
row = session.execute(text("SELECT keyword_id FROM keywords WHERE name = :n"), {
|
|
"n": name}).fetchone()
|
|
if row:
|
|
return int(row[0])
|
|
session.execute(
|
|
text("INSERT INTO keywords(name) VALUES (:n)"), {"n": name})
|
|
session.commit()
|
|
with _ensure_session() as session:
|
|
row2 = session.execute(text("SELECT keyword_id FROM keywords WHERE name = :n"), {
|
|
"n": name}).fetchone()
|
|
if row2:
|
|
return int(row2[0])
|
|
return upsert_keyword(name)
|
|
|
|
|
|
def set_user_regions(username: str, region_names: List[str]) -> None:
|
|
"""Replace user's preferred regions with given names."""
|
|
user_id = get_or_create_user(username)
|
|
# Normalize and get ids
|
|
names = sorted({(n or "").strip()
|
|
for n in region_names if (n or "").strip()})
|
|
region_ids: List[int] = [upsert_region(n) for n in names]
|
|
if not region_ids and not names:
|
|
# Clear all if explicitly empty list
|
|
with _ensure_session() as session:
|
|
session.execute(
|
|
text("DELETE FROM user_regions WHERE user_id = :u"), {"u": user_id})
|
|
session.commit()
|
|
return
|
|
desired = set(region_ids)
|
|
with _ensure_session() as session:
|
|
rows = session.execute(text("SELECT region_id FROM user_regions WHERE user_id = :u"), {
|
|
"u": user_id}).fetchall()
|
|
current = set(int(r[0]) for r in rows)
|
|
to_add = desired - current
|
|
to_remove = current - desired
|
|
for rid in to_remove:
|
|
session.execute(text("DELETE FROM user_regions WHERE user_id = :u AND region_id = :r"), {
|
|
"u": user_id, "r": int(rid)})
|
|
for rid in to_add:
|
|
session.execute(text("INSERT INTO user_regions(user_id, region_id) VALUES(:u, :r)"), {
|
|
"u": user_id, "r": int(rid)})
|
|
session.commit()
|
|
|
|
|
|
def set_user_keywords(username: str, keyword_names: List[str]) -> None:
|
|
"""Replace user's preferred keywords with given names."""
|
|
user_id = get_or_create_user(username)
|
|
names = sorted({(n or "").strip()
|
|
for n in keyword_names if (n or "").strip()})
|
|
keyword_ids: List[int] = [upsert_keyword(n) for n in names]
|
|
if not keyword_ids and not names:
|
|
with _ensure_session() as session:
|
|
session.execute(
|
|
text("DELETE FROM user_keywords WHERE user_id = :u"), {"u": user_id})
|
|
session.commit()
|
|
return
|
|
desired = set(keyword_ids)
|
|
with _ensure_session() as session:
|
|
rows = session.execute(text("SELECT keyword_id FROM user_keywords WHERE user_id = :u"), {
|
|
"u": user_id}).fetchall()
|
|
current = set(int(r[0]) for r in rows)
|
|
to_add = desired - current
|
|
to_remove = current - desired
|
|
for kid in to_remove:
|
|
session.execute(text("DELETE FROM user_keywords WHERE user_id = :u AND keyword_id = :k"), {
|
|
"u": user_id, "k": int(kid)})
|
|
for kid in to_add:
|
|
session.execute(text("INSERT INTO user_keywords(user_id, keyword_id) VALUES(:u, :k)"), {
|
|
"u": user_id, "k": int(kid)})
|
|
session.commit()
|
|
|
|
|
|
def get_user_regions(username: str) -> List[Dict[str, str]]:
|
|
"""Return preferred region names for a user (empty if none)."""
|
|
with _ensure_session() as session:
|
|
row = session.execute(text("SELECT user_id FROM users WHERE username = :u"), {
|
|
"u": username}).fetchone()
|
|
if not row:
|
|
return []
|
|
user_id = int(row[0])
|
|
rows = session.execute(text(
|
|
"""
|
|
SELECT r.name, r.color
|
|
FROM regions r
|
|
INNER JOIN user_regions ur ON ur.region_id = r.region_id
|
|
WHERE ur.user_id = :u
|
|
ORDER BY r.name ASC
|
|
"""
|
|
), {"u": user_id}).fetchall()
|
|
return [{"name": r[0], "color": r[1]} for r in rows]
|
|
|
|
|
|
def get_user_keywords(username: str) -> List[Dict[str, str]]:
|
|
"""Return preferred keyword names for a user (empty if none)."""
|
|
with _ensure_session() as session:
|
|
row = session.execute(text("SELECT user_id FROM users WHERE username = :u"), {
|
|
"u": username}).fetchone()
|
|
if not row:
|
|
return []
|
|
user_id = int(row[0])
|
|
rows = session.execute(text(
|
|
"""
|
|
SELECT k.name, k.color
|
|
FROM keywords k
|
|
INNER JOIN user_keywords uk ON uk.keyword_id = k.keyword_id
|
|
WHERE uk.user_id = :u
|
|
ORDER BY k.name ASC
|
|
"""
|
|
), {"u": user_id}).fetchall()
|
|
return [{"name": r[0], "color": r[1]} for r in rows]
|
|
|
|
|
|
def get_all_regions() -> List[Dict[str, str]]:
|
|
"""Return all region names from regions table (sorted)."""
|
|
with _ensure_session() as session:
|
|
rows = session.execute(
|
|
text("SELECT name, color FROM regions ORDER BY name ASC")).fetchall()
|
|
return [{"name": r[0], "color": r[1]} for r in rows]
|
|
|
|
|
|
def get_all_keywords() -> List[Dict[str, str]]:
|
|
"""Return all keyword names from keywords table (sorted)."""
|
|
with _ensure_session() as session:
|
|
rows = session.execute(
|
|
text("SELECT name, color FROM keywords ORDER BY name ASC")).fetchall()
|
|
return [{"name": r[0], "color": r[1]} for r in rows]
|
|
|
|
|
|
def seed_regions_keywords_from_listings() -> Dict[str, int]:
|
|
"""Seed regions/keywords tables from distinct values in job_listings if empty.
|
|
|
|
Returns dict with counts inserted: {"regions": n1, "keywords": n2}.
|
|
"""
|
|
inserted = {"regions": 0, "keywords": 0}
|
|
with _ensure_session() as session:
|
|
# Regions
|
|
existing_regions = session.execute(
|
|
text("SELECT COUNT(*) FROM regions")).scalar_one()
|
|
if int(existing_regions or 0) == 0:
|
|
rows = session.execute(text(
|
|
"SELECT DISTINCT region FROM job_listings WHERE region IS NOT NULL AND region != ''")).fetchall()
|
|
for r in rows:
|
|
name = r[0]
|
|
if name:
|
|
try:
|
|
session.execute(
|
|
text("INSERT IGNORE INTO regions(name, color) VALUES(:n, :c)"), {"n": name, "c": get_color_from_string(name)})
|
|
inserted["regions"] += 1
|
|
except Exception:
|
|
pass
|
|
session.commit()
|
|
# Keywords
|
|
existing_keywords = session.execute(
|
|
text("SELECT COUNT(*) FROM keywords")).scalar_one()
|
|
if int(existing_keywords or 0) == 0:
|
|
rows = session.execute(text(
|
|
"SELECT DISTINCT keyword FROM job_listings WHERE keyword IS NOT NULL AND keyword != ''")).fetchall()
|
|
for r in rows:
|
|
name = r[0]
|
|
if name:
|
|
try:
|
|
session.execute(
|
|
text("INSERT IGNORE INTO keywords(name, color) VALUES(:n, :c)"), {"n": name, "c": get_color_from_string(name)})
|
|
inserted["keywords"] += 1
|
|
except Exception:
|
|
pass
|
|
session.commit()
|
|
return inserted
|
|
|
|
|
|
def list_regions_full() -> List[Dict[str, Any]]:
|
|
with _ensure_session() as session:
|
|
rows = session.execute(
|
|
text("SELECT region_id, name, color FROM regions ORDER BY name ASC")).fetchall()
|
|
return [{"region_id": int(r[0]), "name": r[1], "color": r[2]} for r in rows]
|
|
|
|
|
|
def list_keywords_full() -> List[Dict[str, Any]]:
|
|
with _ensure_session() as session:
|
|
rows = session.execute(
|
|
text("SELECT keyword_id, name, color FROM keywords ORDER BY name ASC")).fetchall()
|
|
return [{"keyword_id": int(r[0]), "name": r[1], "color": r[2]} for r in rows]
|
|
|
|
|
|
def rename_region(region_id: int, new_name: str) -> bool:
|
|
new_name = (new_name or "").strip()
|
|
if not new_name:
|
|
raise ValueError("new_name required")
|
|
with _ensure_session() as session:
|
|
try:
|
|
session.execute(text("UPDATE regions SET name = :n WHERE region_id = :id"), {
|
|
"n": new_name, "id": int(region_id)})
|
|
session.commit()
|
|
return True
|
|
except Exception:
|
|
session.rollback()
|
|
return False
|
|
|
|
|
|
def rename_keyword(keyword_id: int, new_name: str) -> bool:
|
|
new_name = (new_name or "").strip()
|
|
if not new_name:
|
|
raise ValueError("new_name required")
|
|
with _ensure_session() as session:
|
|
try:
|
|
session.execute(text("UPDATE keywords SET name = :n WHERE keyword_id = :id"), {
|
|
"n": new_name, "id": int(keyword_id)})
|
|
session.commit()
|
|
return True
|
|
except Exception:
|
|
session.rollback()
|
|
return False
|
|
|
|
|
|
def change_region_color(region_id: int, new_color: str) -> bool:
|
|
new_color = (new_color or "").strip()
|
|
if not new_color:
|
|
raise ValueError("new_color required")
|
|
with _ensure_session() as session:
|
|
try:
|
|
session.execute(text("UPDATE regions SET color = :c WHERE region_id = :id"), {
|
|
"c": new_color, "id": int(region_id)})
|
|
session.commit()
|
|
return True
|
|
except Exception:
|
|
session.rollback()
|
|
return False
|
|
|
|
|
|
def change_keyword_color(keyword_id: int, new_color: str) -> bool:
|
|
new_color = (new_color or "").strip()
|
|
if not new_color:
|
|
raise ValueError("new_color required")
|
|
with _ensure_session() as session:
|
|
try:
|
|
session.execute(text("UPDATE keywords SET color = :c WHERE keyword_id = :id"), {
|
|
"c": new_color, "id": int(keyword_id)})
|
|
session.commit()
|
|
return True
|
|
except Exception:
|
|
session.rollback()
|
|
return False
|