import os from datetime import datetime, timedelta import time import logging import threading import requests import schedule from dotenv import load_dotenv from templates import load_templates from dashboard import create_app from maintenance import delete_old_messages from thctime import ( get_country_info, get_tz_info, get_tzdb_cache, init_tzdb_cache, load_countries, load_timezones, split_tz_name, where_is_it_420, ) SCHEDULED_NOTIFICATIONS = [ (15, "reminder"), (20, "420"), (45, "reminder_halftime"), (50, "halftime"), ] STATE_LOCK = threading.Lock() STATE: dict = { "running": True, "started_at": datetime.now(), "last_type": None, "last_attempt_at": None, "last_success_at": None, "last_status_code": None, "last_error": None, "last_locations": [], } # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # Load environment variables load_dotenv() WEBHOOK_URL = os.getenv('DISCORD_WEBHOOK_URL') TEST_MESSAGE_DELETE_PATTERN = r"test notification" def get_state() -> dict: with STATE_LOCK: return dict(STATE) def get_next_event() -> dict: return get_next_scheduled_event() def _update_state(**updates) -> None: with STATE_LOCK: STATE.update(updates) def get_state_snapshot() -> dict: with STATE_LOCK: return dict(STATE) def _check_420(tz_list: list[str] | None = None) -> str: cache = get_tzdb_cache() timezones = load_timezones() if cache is None else [] countries = load_countries() if cache is None else [] if tz_list is None: if cache is None: tz_list = where_is_it_420(timezones, countries) else: tz_list = where_is_it_420( timezones, countries, tz_names=cache.get("tz_names"), tz_to_country_code=cache.get("tz_to_country_code"), country_code_to_name=cache.get("country_code_to_name"), ) if tz_list: tz_str = "\n".join(tz_list) return f"\nIt's 4:20 in:\n{tz_str}" return "" def _update_420_cache() -> None: try: cache = get_tzdb_cache() if cache is None: tz_list = where_is_it_420(load_timezones(), load_countries()) else: tz_list = where_is_it_420( [], [], tz_names=cache.get("tz_names"), tz_to_country_code=cache.get("tz_to_country_code"), country_code_to_name=cache.get("country_code_to_name"), ) _update_state(last_locations=tz_list or []) except Exception as e: _update_state(last_locations=[], last_error=str(e)) def get_next_scheduled_event(now: datetime | None = None) -> dict: """Return the next scheduled notification time/type based on known minute marks.""" if now is None: now = datetime.now() candidates: list[tuple[datetime, str]] = [] for minute, msg_type in SCHEDULED_NOTIFICATIONS: candidate = now.replace(minute=minute, second=0, microsecond=0) if candidate > now: candidates.append((candidate, msg_type)) if not candidates: base = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) next_dt = base.replace(minute=SCHEDULED_NOTIFICATIONS[0][0]) return {"at": next_dt, "type": SCHEDULED_NOTIFICATIONS[0][1]} next_dt, next_type = min(candidates, key=lambda x: x[0]) return {"at": next_dt, "type": next_type} def create_embed(type: str, tz_list: list[str] | None = None) -> dict: """ Create a Discord embed message. """ templates_path = os.getenv("TEMPLATES_PATH", "templates.json") messages = load_templates(templates_path) if type in messages: msg = messages[type] image_url = msg.get("image_url") if isinstance(image_url, str) and image_url: msg["image"] = {"url": image_url} if type == "420": # Check where it's 4:20 msg["text"] += _check_420(tz_list) else: msg = {"text": "Unknown notification type", "color": 0xFF0000} embed = { "title": type.replace("_", " ").capitalize(), "description": msg["text"], "image": msg.get("image"), "color": msg["color"], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "footer": {"text": "THC - Toke Hash Coordinated"} } return embed def send_notification(message: str) -> bool: """ Send a notification to the Discord webhook. Returns: bool: True when the webhook accepted the notification, False otherwise. """ if not WEBHOOK_URL: logging.error("WEBHOOK_URL not set") return False _update_state( last_type=message, last_attempt_at=datetime.now(), last_status_code=None, last_error=None, ) # Warm the tzdb cache once per process to avoid repeated CSV parsing # and to avoid scanning all pytz timezones every hour. try: init_tzdb_cache() except Exception as e: logging.error(f"Failed to initialize tzdb cache: {e}") tz_list: list[str] | None = None if message == "420": _update_420_cache() embed = create_embed(message, tz_list=tz_list) data = {"embeds": [embed]} try: response = requests.post(WEBHOOK_URL, json=data, timeout=10) if response.status_code == 204: logging.info(f"Notification sent: {message}") _update_state(last_success_at=datetime.now(), last_status_code=response.status_code) return True else: logging.error( f"Failed to send notification: {response.status_code} - " f"{response.text}" ) _update_state(last_status_code=response.status_code, last_error=response.text) return False except requests.RequestException as e: logging.error(f"Error sending notification: {e}") _update_state(last_error=str(e)) return False def _schedule_startup_test_cleanup(test_sent: bool) -> None: """Schedule one-time cleanup for the startup test notification.""" if not test_sent: return def cleanup_startup_test_message() -> schedule.CancelJob: delete_old_messages(1, content_pattern=TEST_MESSAGE_DELETE_PATTERN) return schedule.CancelJob schedule.every(1).minutes.do(cleanup_startup_test_message) def schedule_notification(interval: str, at: str, type: str) -> None: """Example: schedule.every().hour.at(":15").do(send_notification, "reminder")""" if interval == "hour": schedule.every().hour.at(at).do(send_notification, type) elif interval == "day": schedule.every().day.at(at).do(send_notification, type) else: logging.error(f"Unsupported interval: {interval}") def start_dashboard() -> None: """Compatibility hook for tests and optional dashboard startup.""" app = create_app(get_state=get_state, get_next_event=get_next_event) app.run(host="0.0.0.0", port=8420, debug=False, use_reloader=False) def main() -> None: """ Main function to run the scheduler. """ # Start the dashboard in a separate thread dashboard_thread = threading.Thread(target=start_dashboard, daemon=True) dashboard_thread.start() # Schedule notifications based on the defined SCHEDULED_NOTIFICATIONS for minute, msg_type in SCHEDULED_NOTIFICATIONS: schedule_notification("hour", f":{minute:02d}", msg_type) # Schedule deletion of old messages every 5 minutes schedule.every(5).minutes.do(delete_old_messages, 6) logging.info("Scheduler started.") # Send one startup test message and cleanup only if send succeeded. test_sent = send_notification("test") _schedule_startup_test_cleanup(test_sent) # delete old messages on startup to clean up any previous notifications # delete_old_messages(6) try: while True: schedule.run_pending() time.sleep(1) # Check every second except KeyboardInterrupt: logging.info("Scheduler stopped by user") except Exception as e: logging.error(f"Unexpected error: {e}") if __name__ == "__main__": main()