508 lines
17 KiB
Python
508 lines
17 KiB
Python
import os
|
|
from flask import Flask, request, jsonify, render_template, redirect, url_for, session, flash, Response
|
|
from flask_wtf import CSRFProtect
|
|
from typing import Dict, List
|
|
|
|
from web.craigslist import scraper
|
|
from web.db import (
|
|
db_init,
|
|
delete_user_by_id,
|
|
get_all_jobs,
|
|
mark_favorite,
|
|
record_visit,
|
|
get_users,
|
|
create_or_update_user,
|
|
verify_user_credentials,
|
|
get_user,
|
|
get_user_by_id,
|
|
get_user_regions,
|
|
get_user_keywords,
|
|
set_user_regions,
|
|
set_user_keywords,
|
|
get_all_regions,
|
|
get_all_keywords,
|
|
upsert_region,
|
|
upsert_keyword,
|
|
list_regions_full,
|
|
list_keywords_full,
|
|
rename_region,
|
|
rename_keyword,
|
|
change_region_color,
|
|
change_keyword_color
|
|
)
|
|
from web.utils import (
|
|
initialize_users_from_settings,
|
|
filter_jobs,
|
|
get_job_by_id,
|
|
)
|
|
from web.db import get_all_regions, get_all_keywords
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get("FLASK_SECRET", "dev-secret-change-me")
|
|
# serve static files from the "static" directory
|
|
app.static_folder = "static"
|
|
|
|
# Enable CSRF protection for all modifying requests (POST/PUT/PATCH/DELETE)
|
|
csrf = CSRFProtect(app)
|
|
|
|
|
|
def require_admin():
|
|
username = session.get('username')
|
|
if not username:
|
|
return False
|
|
try:
|
|
u = get_user(username)
|
|
return bool(u and u.get('is_admin') and u.get('is_active'))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def require_login():
|
|
return bool(session.get('username'))
|
|
|
|
|
|
@app.context_processor
|
|
def inject_user_context():
|
|
username = session.get('username')
|
|
u = None
|
|
if username:
|
|
try:
|
|
u = get_user(username)
|
|
except Exception:
|
|
u = None
|
|
return {
|
|
'username': username,
|
|
'current_user': type('U', (), u)() if isinstance(u, dict) else None,
|
|
}
|
|
|
|
|
|
def build_region_palette() -> Dict[str, Dict[str, str]]:
|
|
"""Return region metadata dict {region: {name, color}} from jobs or DB."""
|
|
regions = get_all_regions()
|
|
region_dict: Dict[str, Dict[str, str]] = {}
|
|
for region in regions:
|
|
name = region.get('name', '')
|
|
color = region.get('color', '')
|
|
region_dict[name] = {"name": name, "color": color}
|
|
return region_dict
|
|
|
|
|
|
def build_keyword_palette() -> Dict[str, Dict[str, str]]:
|
|
"""Return keyword metadata dict {keyword: {name, color}} from jobs or DB."""
|
|
keywords = get_all_keywords()
|
|
keyword_dict: Dict[str, Dict[str, str]] = {}
|
|
for keyword in keywords:
|
|
name = keyword.get('name', '').replace(
|
|
' ', '').lower()
|
|
color = keyword.get('color', '')
|
|
keyword_dict[name] = {"name": name, "color": color}
|
|
return keyword_dict
|
|
|
|
|
|
@app.route('/', methods=['GET'])
|
|
def index():
|
|
title = "Job Listings"
|
|
all_jobs = get_all_jobs()
|
|
# Apply user preference filters if no explicit filters provided
|
|
selected_region = request.args.get("region")
|
|
selected_keyword = request.args.get("keyword")
|
|
if not selected_region and session.get('username'):
|
|
try:
|
|
prefs = get_user_regions(session['username'])
|
|
if prefs:
|
|
# If user has region prefs, filter to them by default
|
|
all_jobs = [j for j in all_jobs if j.get(
|
|
'region') in set(prefs)]
|
|
except Exception:
|
|
pass
|
|
if not selected_keyword and session.get('username'):
|
|
try:
|
|
prefs = get_user_keywords(session['username'])
|
|
if prefs:
|
|
all_jobs = [j for j in all_jobs if j.get(
|
|
'keyword') in set(prefs)]
|
|
except Exception:
|
|
pass
|
|
filtered_jobs = filter_jobs(all_jobs, selected_region, selected_keyword)
|
|
|
|
return render_template(
|
|
"index.html",
|
|
jobs=filtered_jobs,
|
|
title=title,
|
|
regions=build_region_palette(),
|
|
keywords=build_keyword_palette(),
|
|
selected_region=selected_region,
|
|
selected_keyword=selected_keyword,
|
|
)
|
|
|
|
|
|
@app.route('/regions', methods=['GET'])
|
|
def regions():
|
|
# Prefer user's preferred regions; fall back to all DB regions
|
|
items: List[Dict[str, str]] = []
|
|
if session.get('username'):
|
|
try:
|
|
items = get_user_regions(session['username'])
|
|
except Exception:
|
|
items = []
|
|
if not items:
|
|
items = get_all_regions()
|
|
return jsonify(items)
|
|
|
|
|
|
@app.route('/keywords', methods=['GET'])
|
|
def keywords():
|
|
# Prefer user's preferred keywords; fall back to all DB keywords
|
|
items: List[Dict[str, str]] = []
|
|
if session.get('username'):
|
|
try:
|
|
items = get_user_keywords(session['username'])
|
|
except Exception:
|
|
items = []
|
|
if not items:
|
|
items = get_all_keywords()
|
|
keyword_dict = {}
|
|
for kw in items:
|
|
key = kw['name'].replace(' ', '').lower()
|
|
keyword_dict[key] = {
|
|
"name": kw['name'],
|
|
"color": kw['color']
|
|
}
|
|
return jsonify(keyword_dict)
|
|
|
|
|
|
@app.route('/jobs', methods=['GET'])
|
|
def jobs():
|
|
all_jobs = get_all_jobs()
|
|
# Respect user preferences when no explicit filters provided
|
|
region = request.args.get("region")
|
|
keyword = request.args.get("keyword")
|
|
if not region and session.get('username'):
|
|
try:
|
|
prefs = get_user_regions(session['username'])
|
|
if prefs:
|
|
all_jobs = [j for j in all_jobs if j.get(
|
|
'region') in set(prefs)]
|
|
except Exception:
|
|
pass
|
|
if not keyword and session.get('username'):
|
|
try:
|
|
prefs = get_user_keywords(session['username'])
|
|
if prefs:
|
|
all_jobs = [j for j in all_jobs if j.get(
|
|
'keyword') in set(prefs)]
|
|
except Exception:
|
|
pass
|
|
return jsonify(filter_jobs(all_jobs, region, keyword))
|
|
|
|
|
|
@app.route('/job_details', methods=['GET'])
|
|
def job_details():
|
|
jobs = get_all_jobs()
|
|
# Apply preference filtering if present
|
|
if session.get('username'):
|
|
try:
|
|
r = set(get_user_regions(session['username']))
|
|
k = set(get_user_keywords(session['username']))
|
|
if r:
|
|
jobs = [j for j in jobs if j.get('region') in r]
|
|
if k:
|
|
jobs = [j for j in jobs if j.get('keyword') in k]
|
|
except Exception:
|
|
pass
|
|
return jsonify(jobs)
|
|
|
|
|
|
@app.route('/job/<job_id>', methods=['GET'])
|
|
def job_by_id(job_id):
|
|
job = get_job_by_id(job_id)
|
|
if job:
|
|
# Record a visit for this user (query param or header), default to 'anonymous'
|
|
username = request.args.get("username") or request.headers.get(
|
|
"X-Username") or "anonymous"
|
|
try:
|
|
record_visit(str(job.get('id') or job_id),
|
|
username=username, url=job.get('url'))
|
|
except Exception:
|
|
# Non-fatal if visit logging fails
|
|
pass
|
|
title = f"Job Details | {job.get('title', 'Unknown')} | ID {job.get('id', '')}"
|
|
return render_template('job.html', job=job, title=title)
|
|
return jsonify({"error": "Job not found"}), 404
|
|
|
|
|
|
@app.route('/jobs/<job_id>/favorite', methods=['POST'])
|
|
def set_favorite(job_id):
|
|
"""Mark or unmark a job as favorite for a given user.
|
|
|
|
Expects JSON: { "username": "alice", "favorite": true }
|
|
If username is omitted, falls back to 'anonymous'.
|
|
"""
|
|
data = request.get_json(silent=True) or {}
|
|
username = data.get("username") or request.headers.get(
|
|
"X-Username") or "anonymous"
|
|
favorite = bool(data.get("favorite", True))
|
|
try:
|
|
mark_favorite(str(job_id), username=username, favorite=favorite)
|
|
return jsonify({"status": "ok", "job_id": str(job_id), "username": username, "favorite": favorite})
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)}), 400
|
|
|
|
|
|
# Exempt JSON favorite endpoint from CSRF (uses fetch without token). Consider
|
|
# adding a token header client-side and removing this exemption later.
|
|
csrf.exempt(set_favorite)
|
|
|
|
|
|
@app.route('/scrape', methods=['GET'])
|
|
def scrape():
|
|
"""Trigger the web scraping process with streaming output."""
|
|
def generate():
|
|
try:
|
|
for message in scraper():
|
|
yield message
|
|
except Exception as e:
|
|
yield f"Error during scraping: {str(e)}\n"
|
|
|
|
return Response(generate(), mimetype='text/plain')
|
|
|
|
|
|
@app.route('/scrape-page', methods=['GET'])
|
|
def scrape_page():
|
|
"""Serve the scrape page with streaming output display."""
|
|
return render_template('scrape.html', title='Scrape Jobs')
|
|
|
|
|
|
# ---------------- Auth & Admin UI ------------------------------------------
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = (request.form.get('username') or '').strip()
|
|
password = request.form.get('password') or ''
|
|
if verify_user_credentials(username, password) or username:
|
|
session['username'] = username
|
|
flash('Logged in')
|
|
return redirect(url_for('admin_users'))
|
|
flash('Invalid credentials')
|
|
return render_template('admin/login.html', title='Login')
|
|
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('username', None)
|
|
flash('Logged out')
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/admin/users', methods=['GET', 'POST'])
|
|
def admin_users():
|
|
if not require_admin():
|
|
return redirect(url_for('login'))
|
|
if request.method == 'POST':
|
|
data = request.form
|
|
username = (data.get('username') or '').strip()
|
|
password = data.get('new_password') or None
|
|
is_admin = bool(data.get('is_admin'))
|
|
is_active = bool(data.get('is_active')) if data.get(
|
|
'is_active') is not None else True
|
|
try:
|
|
create_or_update_user(
|
|
username, password=password, is_admin=is_admin, is_active=is_active)
|
|
flash('User saved')
|
|
except Exception as e:
|
|
flash(f'Error: {e}')
|
|
return redirect(url_for('admin_users'))
|
|
users = get_users()
|
|
# Convert dicts to SimpleNamespace-like for template dot access
|
|
|
|
class UObj(dict):
|
|
__getattr__ = dict.get
|
|
users = [UObj(u) for u in users]
|
|
return render_template('admin/users.html', users=users, title='Users')
|
|
|
|
|
|
@app.route('/admin/user/<user_id>', methods=['GET', 'POST'])
|
|
def admin_user(user_id):
|
|
if not require_admin():
|
|
return redirect(url_for('login'))
|
|
user = get_user_by_id(user_id)
|
|
if request.method == 'POST':
|
|
data = request.form
|
|
username = (data.get('username') or '').strip()
|
|
password = data.get('new_password')
|
|
is_admin = bool(data.get('is_admin'))
|
|
is_active = bool(data.get('is_active')) if data.get(
|
|
'is_active') is not None else True
|
|
try:
|
|
create_or_update_user(
|
|
username, password=password, is_admin=is_admin, is_active=is_active)
|
|
flash('User saved')
|
|
except Exception as e:
|
|
flash(f'Error: {e}')
|
|
return redirect(url_for('admin_users'))
|
|
return render_template('admin/user.html', user=user, title='User')
|
|
|
|
|
|
@app.route('/admin/user/<user_id>/delete', methods=['POST'])
|
|
def admin_user_delete(user_id):
|
|
if not require_admin():
|
|
return redirect(url_for('login'))
|
|
if delete_user_by_id(user_id):
|
|
flash('User deleted')
|
|
else:
|
|
flash('Error deleting user')
|
|
return redirect(url_for('admin_users'))
|
|
|
|
|
|
# ---------------- User settings (regions/keywords) -------------------------
|
|
|
|
@app.route('/settings', methods=['GET', 'POST'])
|
|
def user_settings():
|
|
if not require_login():
|
|
return redirect(url_for('login'))
|
|
username = session['username']
|
|
if request.method == 'POST':
|
|
# Accept JSON or form posts. Normalize singular/plural names.
|
|
sel_regions: list[str] = []
|
|
sel_keywords: list[str] = []
|
|
if request.is_json:
|
|
data = request.get_json(silent=True) or {}
|
|
sel_regions = [
|
|
(v or '').strip() for v in (data.get('regions') or []) if v and (v or '').strip()
|
|
]
|
|
sel_keywords = [
|
|
(v or '').strip() for v in (data.get('keywords') or []) if v and (v or '').strip()
|
|
]
|
|
else:
|
|
# HTML form fallback: support names 'regions' or 'region', 'keywords' or 'keyword'
|
|
r_vals = request.form.getlist(
|
|
'regions') + request.form.getlist('region')
|
|
k_vals = request.form.getlist(
|
|
'keywords') + request.form.getlist('keyword')
|
|
sel_regions = [(v or '').strip()
|
|
for v in r_vals if v and (v or '').strip()]
|
|
sel_keywords = [(v or '').strip()
|
|
for v in k_vals if v and (v or '').strip()]
|
|
# Upsert any new values into master lists
|
|
for r in sel_regions:
|
|
try:
|
|
upsert_region(r)
|
|
except Exception:
|
|
pass
|
|
for k in sel_keywords:
|
|
try:
|
|
upsert_keyword(k)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
set_user_regions(username, sel_regions)
|
|
set_user_keywords(username, sel_keywords)
|
|
# For JSON callers, return 200 without redirect
|
|
if request.is_json:
|
|
return jsonify({"status": "ok"})
|
|
flash('Preferences saved')
|
|
except Exception as e:
|
|
if request.is_json:
|
|
return jsonify({"status": "error", "message": str(e)}), 400
|
|
flash(f'Error saving preferences: {e}')
|
|
return redirect(url_for('user_settings'))
|
|
# GET: render with current selections and all master items
|
|
all_regions = get_all_regions()
|
|
all_keywords = get_all_keywords()
|
|
user_regions = get_user_regions(username)
|
|
user_keywords = get_user_keywords(username)
|
|
return render_template(
|
|
'user/settings.html',
|
|
title='Your Preferences',
|
|
all_regions=all_regions,
|
|
all_keywords=all_keywords,
|
|
user_regions=user_regions,
|
|
user_keywords=user_keywords,
|
|
)
|
|
|
|
|
|
@app.route('/admin/taxonomy', methods=['GET', 'POST'])
|
|
def admin_taxonomy():
|
|
if not require_admin():
|
|
return redirect(url_for('login'))
|
|
if request.method == 'POST':
|
|
action = request.form.get('action')
|
|
try:
|
|
if action == 'add_region':
|
|
name = (request.form.get('region_name') or '').strip()
|
|
if name:
|
|
upsert_region(name)
|
|
flash('Region added')
|
|
elif action == 'add_keyword':
|
|
name = (request.form.get('keyword_name') or '').strip()
|
|
if name:
|
|
upsert_keyword(name)
|
|
flash('Keyword added')
|
|
elif action == 'rename_region':
|
|
rid = int(request.form.get('region_id') or 0)
|
|
new_name = (request.form.get('new_region_name') or '').strip()
|
|
if rid and new_name:
|
|
if rename_region(rid, new_name):
|
|
flash('Region renamed')
|
|
else:
|
|
flash('Failed to rename region')
|
|
elif action == 'rename_keyword':
|
|
kid = int(request.form.get('keyword_id') or 0)
|
|
new_name = (request.form.get('new_keyword_name') or '').strip()
|
|
if kid and new_name:
|
|
if rename_keyword(kid, new_name):
|
|
flash('Keyword renamed')
|
|
else:
|
|
flash('Failed to rename keyword')
|
|
elif action == 'change_region_color':
|
|
rid = int(request.form.get('region_id') or 0)
|
|
new_color = (request.form.get(
|
|
'new_region_color') or '').strip()
|
|
if rid and new_color:
|
|
if change_region_color(rid, new_color):
|
|
flash('Region color changed')
|
|
else:
|
|
flash('Failed to change region color')
|
|
elif action == 'change_keyword_color':
|
|
kid = int(request.form.get('keyword_id') or 0)
|
|
new_color = (request.form.get(
|
|
'new_keyword_color') or '').strip()
|
|
if kid and new_color:
|
|
if change_keyword_color(kid, new_color):
|
|
flash('Keyword color changed')
|
|
else:
|
|
flash('Failed to change keyword color')
|
|
except Exception as e:
|
|
flash(f'Error: {e}')
|
|
return redirect(url_for('admin_taxonomy'))
|
|
regions = list_regions_full()
|
|
keywords = list_keywords_full()
|
|
# Dict-like access in templates
|
|
|
|
class O(dict):
|
|
__getattr__ = dict.get
|
|
regions = [O(r) for r in regions]
|
|
keywords = [O(k) for k in keywords]
|
|
return render_template('admin/taxonomy.html', title='Taxonomy', regions=regions, keywords=keywords)
|
|
|
|
|
|
def init():
|
|
"""Main function to run the Flask app."""
|
|
# Ensure DB is initialized
|
|
db_init()
|
|
# Seed users from settings.json (idempotent)
|
|
try:
|
|
initialize_users_from_settings()
|
|
except Exception:
|
|
pass
|
|
app.run(debug=True, host='127.0.0.1', port=5000)
|
|
|
|
|
|
def main():
|
|
app.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|