Compare commits

...

8 Commits

Author SHA1 Message Date
zwitschi 3412a5ccaa feat: enhance message handling with content pattern matching and update environment configuration
Build and Deploy Docker Container / test (push) Successful in 14s
Build and Deploy Docker Container / build-and-deploy (push) Successful in 29s
2026-05-10 14:24:07 +02:00
zwitschi 915c55d7ed feat: remove pip installation command from nixpacks configuration
Build and Deploy Docker Container / test (push) Successful in 17s
Build and Deploy Docker Container / build-and-deploy (push) Successful in 49s
2026-05-10 13:44:39 +02:00
zwitschi 788f3ea6b7 feat: remove deployment step from Docker workflow and add nixpacks configuration
Build and Deploy Docker Container / test (push) Successful in 14s
Build and Deploy Docker Container / build-and-deploy (push) Successful in 29s
2026-05-10 13:31:30 +02:00
zwitschi 0952c21c7b feat: update dashboard port from 8080 to 8420 in configuration and documentation 2026-05-10 13:24:49 +02:00
zwitschi f88f60a019 feat: add functions to check and update 4:20 timezone cache
Build and Deploy Docker Container / test (push) Successful in 14s
Build and Deploy Docker Container / build-and-deploy (push) Failing after 30s
2026-05-10 13:06:31 +02:00
zwitschi 565c4078bb feat: enhance color handling in templates and dashboard with hex support 2026-05-10 12:55:39 +02:00
zwitschi 8f8c3655db refactor: replace pandas with csv module for timezone data loading 2026-05-10 12:43:11 +02:00
zwitschi 584231b0df chore: remove unused numpy and pandas dependencies from requirements 2026-05-10 12:37:31 +02:00
17 changed files with 370 additions and 99 deletions
+9
View File
@@ -1,2 +1,11 @@
# Replace with your actual Discord webhook URL # Replace with your actual Discord webhook URL
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/<your_webhook_id>/<your_webhook_token> DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/<your_webhook_id>/<your_webhook_token>
# Replace with your Discord bot token
DISCORD_BOT_TOKEN=<your_bot_token>
# Replace with your Discord channel ID
DISCORD_CHANNEL_ID=<your_channel_id>
# Replace with your Discord guild/server ID
DISCORD_GUILD_ID=<your_guild_id>
-12
View File
@@ -53,15 +53,3 @@ jobs:
context: . context: .
push: true push: true
tags: git.allucanget.biz/${{ secrets.REGISTRY_USERNAME }}/thc-webhook:latest tags: git.allucanget.biz/${{ secrets.REGISTRY_USERNAME }}/thc-webhook:latest
- name: Deploy to Portainer
uses: appleboy/ssh-action@v0.1.7
with:
host: ${{ secrets.SERVER_HOST }}
username: ${{ secrets.SERVER_USER }}
key: ${{ secrets.SERVER_SSH_KEY }}
script: |
docker stop thc-webhook || true
docker rm thc-webhook || true
docker pull git.allucanget.biz/${{ secrets.REGISTRY_USERNAME }}/thc-webhook:latest
docker run -d --name thc-webhook -e DISCORD_WEBHOOK_URL=${{ secrets.DISCORD_WEBHOOK_URL }} -p 8080:8080 git.allucanget.biz/${{ secrets.REGISTRY_USERNAME }}/thc-webhook:latest
+1 -1
View File
@@ -12,7 +12,7 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY . . COPY . .
# Dashboard (Flask) port # Dashboard (Flask) port
EXPOSE 8080 EXPOSE 8420
# Create a non-root user # Create a non-root user
RUN addgroup -S appgroup && adduser -S appuser -G appgroup RUN addgroup -S appgroup && adduser -S appuser -G appgroup
+2 -2
View File
@@ -38,13 +38,13 @@ The app will run continuously and send notifications at the scheduled times.
### Dashboard ### Dashboard
By default, a minimal dashboard is available at `http://localhost:8080/`. By default, a minimal dashboard is available at `http://localhost:8420/`.
You can disable it by setting `DASHBOARD_ENABLED=0`. You can disable it by setting `DASHBOARD_ENABLED=0`.
### Admin ### Admin
You can edit the embed message templates at `http://localhost:8080/admin`. You can edit the embed message templates at `http://localhost:8420/admin`.
- Templates are saved to `templates.json` by default. - Templates are saved to `templates.json` by default.
- Override the location with `TEMPLATES_PATH=/path/to/templates.json`. - Override the location with `TEMPLATES_PATH=/path/to/templates.json`.
+17 -2
View File
@@ -32,6 +32,17 @@ def get_html_template(content) -> str:
return HTML_TEMPLATE.format(content=content) return HTML_TEMPLATE.format(content=content)
def _as_hex_color(value: int | str | None) -> str:
if isinstance(value, int):
return f"#{value:06X}"
if isinstance(value, str):
try:
return f"#{parse_color(value):06X}"
except ValueError:
return "#000000"
return "#000000"
def create_app( def create_app(
*, *,
get_state: Callable[[], dict], get_state: Callable[[], dict],
@@ -77,13 +88,14 @@ def create_app(
blocks = [] blocks = []
for key, tpl in templates.items(): for key, tpl in templates.items():
text = (tpl.get("text") or "").replace("'", "&#39;") text = (tpl.get("text") or "").replace("'", "&#39;")
color = tpl.get("color") color_hex = _as_hex_color(tpl.get("color"))
image_url = tpl.get("image_url") or "" image_url = tpl.get("image_url") or ""
blocks.append( blocks.append(
"<fieldset style='margin-bottom:16px;'>" "<fieldset style='margin-bottom:16px;'>"
f"<legend><strong>{key}</strong></legend>" f"<legend><strong>{key}</strong></legend>"
f"<label>Text<br><textarea name='{key}__text' rows='3' style='width:100%'>{text}</textarea></label><br>" f"<label>Text<br><textarea name='{key}__text' rows='3' style='width:100%'>{text}</textarea></label><br>"
f"<label>Color<br><input name='{key}__color' value='{color}' style='width:200px'></label><br>" f"<label>Color<br><input type='color' name='{key}__color_picker' value='{color_hex}' oninput=\"this.form['{key}__color'].value=this.value\"></label><br>"
f"<label>Color value<br><input name='{key}__color' value='{color_hex}' style='width:200px'></label><br>"
f"<label>Image URL (optional)<br><input name='{key}__image_url' value='{image_url}' style='width:100%'></label>" f"<label>Image URL (optional)<br><input name='{key}__image_url' value='{image_url}' style='width:100%'></label>"
"</fieldset>" "</fieldset>"
) )
@@ -108,6 +120,9 @@ def create_app(
for key in current.keys(): for key in current.keys():
text = request.form.get(f"{key}__text", "").strip() text = request.form.get(f"{key}__text", "").strip()
color_raw = request.form.get(f"{key}__color", "").strip() color_raw = request.form.get(f"{key}__color", "").strip()
if not color_raw:
color_raw = request.form.get(
f"{key}__color_picker", "").strip()
image_url = request.form.get(f"{key}__image_url", "").strip() image_url = request.form.get(f"{key}__image_url", "").strip()
if not text: if not text:
+2 -2
View File
@@ -4,10 +4,10 @@ services:
container_name: thc-webhook-app container_name: thc-webhook-app
environment: environment:
- DISCORD_WEBHOOK_URL=${DISCORD_WEBHOOK_URL} - DISCORD_WEBHOOK_URL=${DISCORD_WEBHOOK_URL}
- DASHBOARD_PORT=8080 - DASHBOARD_PORT=8420
restart: unless-stopped restart: unless-stopped
ports: ports:
- "8080:8080" - "8420:8420"
logging: logging:
driver: json-file driver: json-file
options: options:
+66 -39
View File
@@ -50,6 +50,7 @@ logging.basicConfig(
load_dotenv() load_dotenv()
WEBHOOK_URL = os.getenv('DISCORD_WEBHOOK_URL') WEBHOOK_URL = os.getenv('DISCORD_WEBHOOK_URL')
TEST_MESSAGE_DELETE_PATTERN = r"test notification"
def get_state() -> dict: def get_state() -> dict:
@@ -71,6 +72,45 @@ def get_state_snapshot() -> dict:
return dict(STATE) return dict(STATE)
def _check_420(tz_list: list[str] | None = None) -> str:
cache = get_tzdb_cache()
timezones = load_timezones() if cache is None else []
countries = load_countries() if cache is None else []
if tz_list is None:
if cache is None:
tz_list = where_is_it_420(timezones, countries)
else:
tz_list = where_is_it_420(
timezones,
countries,
tz_names=cache.get("tz_names"),
tz_to_country_code=cache.get("tz_to_country_code"),
country_code_to_name=cache.get("country_code_to_name"),
)
if tz_list:
tz_str = "\n".join(tz_list)
return f"\nIt's 4:20 in:\n{tz_str}"
return ""
def _update_420_cache() -> None:
try:
cache = get_tzdb_cache()
if cache is None:
tz_list = where_is_it_420(load_timezones(), load_countries())
else:
tz_list = where_is_it_420(
[],
[],
tz_names=cache.get("tz_names"),
tz_to_country_code=cache.get("tz_to_country_code"),
country_code_to_name=cache.get("country_code_to_name"),
)
_update_state(last_locations=tz_list or [])
except Exception as e:
_update_state(last_locations=[], last_error=str(e))
def get_next_scheduled_event(now: datetime | None = None) -> dict: def get_next_scheduled_event(now: datetime | None = None) -> dict:
"""Return the next scheduled notification time/type based on known minute marks.""" """Return the next scheduled notification time/type based on known minute marks."""
if now is None: if now is None:
@@ -98,9 +138,6 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
""" """
templates_path = os.getenv("TEMPLATES_PATH", "templates.json") templates_path = os.getenv("TEMPLATES_PATH", "templates.json")
messages = load_templates(templates_path) messages = load_templates(templates_path)
cache = get_tzdb_cache()
timezones = load_timezones() if cache is None else []
countries = load_countries() if cache is None else []
if type in messages: if type in messages:
msg = messages[type] msg = messages[type]
image_url = msg.get("image_url") image_url = msg.get("image_url")
@@ -108,20 +145,7 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
msg["image"] = {"url": image_url} msg["image"] = {"url": image_url}
if type == "420": if type == "420":
# Check where it's 4:20 # Check where it's 4:20
if tz_list is None: msg["text"] += _check_420(tz_list)
if cache is None:
tz_list = where_is_it_420(timezones, countries)
else:
tz_list = where_is_it_420(
timezones,
countries,
tz_names=cache.get("tz_names"),
tz_to_country_code=cache.get("tz_to_country_code"),
country_code_to_name=cache.get("country_code_to_name"),
)
if tz_list:
tz_str = "\n".join(tz_list)
msg["text"] += f"\nIt's 4:20 in:\n{tz_str}"
else: else:
msg = {"text": "Unknown notification type", "color": 0xFF0000} msg = {"text": "Unknown notification type", "color": 0xFF0000}
embed = { embed = {
@@ -135,13 +159,16 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
return embed return embed
def send_notification(message: str) -> None: def send_notification(message: str) -> bool:
""" """
Send a notification to the Discord webhook. Send a notification to the Discord webhook.
Returns:
bool: True when the webhook accepted the notification, False otherwise.
""" """
if not WEBHOOK_URL: if not WEBHOOK_URL:
logging.error("WEBHOOK_URL not set") logging.error("WEBHOOK_URL not set")
return return False
_update_state( _update_state(
last_type=message, last_type=message,
@@ -159,21 +186,7 @@ def send_notification(message: str) -> None:
tz_list: list[str] | None = None tz_list: list[str] | None = None
if message == "420": if message == "420":
try: _update_420_cache()
cache = get_tzdb_cache()
if cache is None:
tz_list = where_is_it_420(load_timezones(), load_countries())
else:
tz_list = where_is_it_420(
[],
[],
tz_names=cache.get("tz_names"),
tz_to_country_code=cache.get("tz_to_country_code"),
country_code_to_name=cache.get("country_code_to_name"),
)
_update_state(last_locations=tz_list or [])
except Exception as e:
_update_state(last_locations=[], last_error=str(e))
embed = create_embed(message, tz_list=tz_list) embed = create_embed(message, tz_list=tz_list)
data = {"embeds": [embed]} data = {"embeds": [embed]}
@@ -183,6 +196,7 @@ def send_notification(message: str) -> None:
logging.info(f"Notification sent: {message}") logging.info(f"Notification sent: {message}")
_update_state(last_success_at=datetime.now(), _update_state(last_success_at=datetime.now(),
last_status_code=response.status_code) last_status_code=response.status_code)
return True
else: else:
logging.error( logging.error(
f"Failed to send notification: {response.status_code} - " f"Failed to send notification: {response.status_code} - "
@@ -190,9 +204,23 @@ def send_notification(message: str) -> None:
) )
_update_state(last_status_code=response.status_code, _update_state(last_status_code=response.status_code,
last_error=response.text) last_error=response.text)
return False
except requests.RequestException as e: except requests.RequestException as e:
logging.error(f"Error sending notification: {e}") logging.error(f"Error sending notification: {e}")
_update_state(last_error=str(e)) _update_state(last_error=str(e))
return False
def _schedule_startup_test_cleanup(test_sent: bool) -> None:
"""Schedule one-time cleanup for the startup test notification."""
if not test_sent:
return
def cleanup_startup_test_message() -> schedule.CancelJob:
delete_old_messages(1, content_pattern=TEST_MESSAGE_DELETE_PATTERN)
return schedule.CancelJob
schedule.every(1).minutes.do(cleanup_startup_test_message)
def schedule_notification(interval: str, at: str, type: str) -> None: def schedule_notification(interval: str, at: str, type: str) -> None:
@@ -208,7 +236,7 @@ def schedule_notification(interval: str, at: str, type: str) -> None:
def start_dashboard() -> None: def start_dashboard() -> None:
"""Compatibility hook for tests and optional dashboard startup.""" """Compatibility hook for tests and optional dashboard startup."""
app = create_app(get_state=get_state, get_next_event=get_next_event) app = create_app(get_state=get_state, get_next_event=get_next_event)
app.run(host="0.0.0.0", port=8080, debug=False, use_reloader=False) app.run(host="0.0.0.0", port=8420, debug=False, use_reloader=False)
def main() -> None: def main() -> None:
@@ -228,10 +256,9 @@ def main() -> None:
logging.info("Scheduler started.") logging.info("Scheduler started.")
# Test the notification on startup # Send one startup test message and cleanup only if send succeeded.
send_notification("test") test_sent = send_notification("test")
# delete the test message after a short delay to keep the channel clean _schedule_startup_test_cleanup(test_sent)
schedule.every(1).minutes.do(delete_old_messages, 1)
# delete old messages on startup to clean up any previous notifications # delete old messages on startup to clean up any previous notifications
# delete_old_messages(6) # delete_old_messages(6)
+52 -3
View File
@@ -1,5 +1,6 @@
import logging import logging
import os import os
import re
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -42,15 +43,55 @@ def should_delete_message(
webhook_id: str, webhook_id: str,
author_id: str, author_id: str,
cutoff: int, cutoff: int,
content_pattern: str | None = None,
) -> bool: ) -> bool:
message_timestamp = int(parse_message_timestamp(message).timestamp()) message_timestamp = int(parse_message_timestamp(message).timestamp())
return ( return (
message_timestamp <= cutoff message_timestamp <= cutoff
and message.get("webhook_id") == webhook_id and message.get("webhook_id") == webhook_id
and message.get("author", {}).get("id") == author_id and message.get("author", {}).get("id") == author_id
and message_matches_pattern(message, content_pattern)
) )
def message_matches_pattern(message: dict, content_pattern: str | None = None) -> bool:
"""Return True when message content/embed text matches the optional pattern."""
if not content_pattern:
return True
text_chunks: list[str] = []
content = message.get("content")
if isinstance(content, str) and content:
text_chunks.append(content)
embeds = message.get("embeds")
if isinstance(embeds, list):
for embed in embeds:
if not isinstance(embed, dict):
continue
title = embed.get("title")
description = embed.get("description")
if isinstance(title, str) and title:
text_chunks.append(title)
if isinstance(description, str) and description:
text_chunks.append(description)
footer = embed.get("footer")
if isinstance(footer, dict):
footer_text = footer.get("text")
if isinstance(footer_text, str) and footer_text:
text_chunks.append(footer_text)
if not text_chunks:
return False
searchable_text = "\n".join(text_chunks)
try:
return re.search(content_pattern, searchable_text, flags=re.IGNORECASE) is not None
except re.error:
return content_pattern.lower() in searchable_text.lower()
def get_rate_limit_retry_after(response: requests.Response) -> float | None: def get_rate_limit_retry_after(response: requests.Response) -> float | None:
header_retry_after = parse_float(response.headers.get("Retry-After")) header_retry_after = parse_float(response.headers.get("Retry-After"))
if header_retry_after is not None: if header_retry_after is not None:
@@ -137,7 +178,7 @@ def find_last_message_by_author(
return None return None
def fetch_messages_to_delete(headers: dict, channel_id: str, webhook_id: str, author_id: str, cutoff: int, last_message_id: str | None = None) -> tuple[list[dict], str | None]: def fetch_messages_to_delete(headers: dict, channel_id: str, webhook_id: str, author_id: str, cutoff: int, last_message_id: str | None = None, content_pattern: str | None = None) -> tuple[list[dict], str | None]:
""" """
Fetch messages from the channel that are older than the cutoff timestamp and sent by the webhook. Fetch messages from the channel that are older than the cutoff timestamp and sent by the webhook.
Uses pagination with the 'before' parameter to resume from the last processed message. Uses pagination with the 'before' parameter to resume from the last processed message.
@@ -180,6 +221,7 @@ def fetch_messages_to_delete(headers: dict, channel_id: str, webhook_id: str, au
webhook_id, webhook_id,
author_id, author_id,
cutoff, cutoff,
content_pattern,
): ):
delete_list.append(build_delete_entry(message)) delete_list.append(build_delete_entry(message))
@@ -240,7 +282,7 @@ def delete_message(headers: dict, channel_id: str, message_id: str) -> tuple[boo
return False, None, False return False, None, False
def delete_old_messages(minutes: int = 6) -> None: def delete_old_messages(minutes: int = 6, content_pattern: str | None = None) -> None:
""" """
Delete all messages sent by the webhook in the last `minutes` minutes. Delete all messages sent by the webhook in the last `minutes` minutes.
Uses a dynamic slowdown to avoid hitting Discord API rate limits and pagination to fetch all messages. Uses a dynamic slowdown to avoid hitting Discord API rate limits and pagination to fetch all messages.
@@ -283,6 +325,7 @@ def delete_old_messages(minutes: int = 6) -> None:
webhook_id, webhook_id,
author_id, author_id,
cutoff, cutoff,
content_pattern,
): ):
anchor_message = build_delete_entry(last_author_message) anchor_message = build_delete_entry(last_author_message)
deleted, wait_seconds, abort_batch = delete_message( deleted, wait_seconds, abort_batch = delete_message(
@@ -303,7 +346,13 @@ def delete_old_messages(minutes: int = 6) -> None:
while True: while True:
delete_list, next_last_message_id = fetch_messages_to_delete( delete_list, next_last_message_id = fetch_messages_to_delete(
headers, discord_channel_id, webhook_id, author_id, cutoff, last_message_id headers,
discord_channel_id,
webhook_id,
author_id,
cutoff,
last_message_id,
content_pattern,
) )
if not delete_list: if not delete_list:
+11
View File
@@ -0,0 +1,11 @@
[phases.install]
[phases.test]
dependsOn = ["install"]
cmds = ["pytest --maxfail=1 --disable-warnings -q"]
[start]
cmd = "python main.py"
[variables]
PORT = "8420"
-2
View File
@@ -1,5 +1,3 @@
numpy
pandas
pytest pytest
python-dotenv python-dotenv
pytz pytz
+5 -5
View File
@@ -1,24 +1,24 @@
{ {
"420": { "420": {
"color": 3066993, "color": "#2ECC71",
"image_url": "https://copyparty.allucanget.biz/img/weed.png", "image_url": "https://copyparty.allucanget.biz/img/weed.png",
"text": "Blaze it!" "text": "Blaze it!"
}, },
"halftime": { "halftime": {
"color": 3066993, "color": "#2ECC71",
"image_url": "https://copyparty.allucanget.biz/img/weed.png", "image_url": "https://copyparty.allucanget.biz/img/weed.png",
"text": "Half-time!" "text": "Half-time!"
}, },
"reminder": { "reminder": {
"color": 15105570, "color": "#E67E22",
"text": "This is your 5 minute reminder to 420!" "text": "This is your 5 minute reminder to 420!"
}, },
"reminder_halftime": { "reminder_halftime": {
"color": 15105570, "color": "#E67E22",
"text": "Half-time in 5 minutes!" "text": "Half-time in 5 minutes!"
}, },
"test": { "test": {
"color": 3447003, "color": "#3498DB",
"text": "This is a test notification." "text": "This is a test notification."
} }
} }
+12 -1
View File
@@ -49,6 +49,11 @@ def _normalize_templates(raw: dict) -> dict[str, dict]:
color = incoming.get("color") color = incoming.get("color")
if isinstance(color, int): if isinstance(color, int):
out[key]["color"] = color out[key]["color"] = color
elif isinstance(color, str):
try:
out[key]["color"] = parse_color(color)
except ValueError:
pass
image_url = incoming.get("image_url") image_url = incoming.get("image_url")
if isinstance(image_url, str) and image_url.strip(): if isinstance(image_url, str) and image_url.strip():
@@ -74,10 +79,16 @@ def load_templates(path: str | Path) -> dict[str, dict]:
def save_templates(path: str | Path, templates: dict) -> None: def save_templates(path: str | Path, templates: dict) -> None:
p = Path(path) p = Path(path)
normalized = _normalize_templates(templates) normalized = _normalize_templates(templates)
serialized = deepcopy(normalized)
for tpl in serialized.values():
color = tpl.get("color")
if isinstance(color, int):
tpl["color"] = f"#{color:06X}"
p.parent.mkdir(parents=True, exist_ok=True) p.parent.mkdir(parents=True, exist_ok=True)
tmp = p.with_suffix(p.suffix + ".tmp") tmp = p.with_suffix(p.suffix + ".tmp")
tmp.write_text(json.dumps(normalized, indent=2, tmp.write_text(json.dumps(serialized, indent=2,
sort_keys=True) + "\n", encoding="utf-8") sort_keys=True) + "\n", encoding="utf-8")
tmp.replace(p) tmp.replace(p)
+52 -30
View File
@@ -1,6 +1,6 @@
import pytz import pytz
from datetime import datetime from datetime import datetime
import pandas as pd from csv import DictReader
def get_tz_info(tz_name: str, timezones: list[dict]) -> dict | None: def get_tz_info(tz_name: str, timezones: list[dict]) -> dict | None:
@@ -41,52 +41,74 @@ def load_tz_file():
"abbreviation", "time_start", "gmt_offset", "dst"] "abbreviation", "time_start", "gmt_offset", "dst"]
# columns to load # columns to load
load_columns = ["zone_name", "country_code"] load_columns = ["zone_name", "country_code"]
# read csv with pandas # read csv
df = pd.read_csv(timezone_file, names=timezone_names) with open(timezone_file, newline='') as csvfile:
reader = DictReader(csvfile, fieldnames=timezone_names)
csv = [row for row in reader]
# drop all columns except load_columns # drop all columns except load_columns
df = df[load_columns] csv = [{k: v for k, v in row.items() if k in load_columns} for row in csv]
# distinct zone_names # distinct zone_names
df = df.drop_duplicates(subset=["zone_name"]) seen = set()
unique_csv = []
for row in csv:
if row["zone_name"] not in seen:
seen.add(row["zone_name"])
unique_csv.append(row)
csv = unique_csv
# reset index return csv
df = df.reset_index(drop=True)
return df
def main(): def main():
# read csv with pandas # read csv file and load timezones and countries
df_file = load_tz_file() csv = load_tz_file()
# split zone_name into components by "/" # split zone_name into components by "/"
df_file[['region', 'city']] = df_file['zone_name'].str.split( for row in csv:
'/', expand=True, n=1) parts = row["zone_name"].split("/", 1)
row["region"] = parts[0]
row["city"] = parts[1] if len(parts) > 1 else None
# drop regions with no country_code (like Etc, GMT, etc) # drop regions with no country_code (like Etc, GMT, etc)
df_file = df_file[df_file['country_code'].notna()] csv = [row for row in csv if row["country_code"]]
# get all timezones from pytz and split into region and city
tz = [{"zone_name": tz} for tz in pytz.all_timezones]
df_tz = pd.DataFrame(pytz.all_timezones)
# rename column to zone_name
df_tz = df_tz.rename(columns={0: 'zone_name'})
# split zone_name into components by "/" # split zone_name into components by "/"
df_tz[['region', 'city']] = df_tz['zone_name'].str.split( for row in tz:
'/', expand=True, n=1) parts = row["zone_name"].split("/", 1)
row["region"] = parts[0]
row["city"] = parts[1] if len(parts) > 1 else None
# drop regions with no city (like UTC, GMT, etc) # drop regions with no city (like UTC, GMT, etc)
df_tz = df_tz[df_tz['city'].notna()] tz = [row for row in tz if row["city"]]
# drop rows where region is 'Etc' # drop rows where region is 'Etc'
df_tz = df_tz[df_tz['region'] != 'Etc'] tz = [row for row in tz if row["region"] != "Etc"]
# join data on region and city
timezones = []
for tz_row in tz:
for csv_row in csv:
if tz_row["region"] == csv_row["region"] and tz_row["city"] == csv_row["city"]:
timezones.append({
"zone_name": tz_row["zone_name"],
"country_code": csv_row["country_code"],
"region": tz_row["region"],
"city": tz_row["city"],
})
break
# join dataframes on region and city
df_merged = pd.merge(df_file, df_tz, on=[
'region', 'city'], how='inner', indicator=True)
# reorder columns # reorder columns
df_merged = df_merged[['region', 'city', 'country_code']] timezones = [{k: row[k] for k in ['region', 'city', 'country_code']}
# print merged dataframe for row in timezones]
print(f"Merged timezones: {len(df_merged)}")
print(df_merged.sample(20).to_string(index=False)) # print merged data
regions = df_merged['region'].unique() print(f"Merged timezones: {len(timezones)}")
print(timezones[:20])
regions = set(row['region'] for row in timezones)
for region in regions: for region in regions:
df_region = df_merged[df_merged['region'] == region] df_region = [row for row in timezones if row['region'] == region]
print(f"{len(df_region)} merged in {region}") print(f"{len(df_region)} merged in {region}")
+2
View File
@@ -72,6 +72,8 @@ def test_admin_get_renders_template_form(monkeypatch):
assert "Admin: templates" in body assert "Admin: templates" in body
assert "name='420__text'" in body assert "name='420__text'" in body
assert "name='420__color'" in body assert "name='420__color'" in body
assert "name='420__color_picker'" in body
assert "type='color'" in body
assert "name='420__image_url'" in body assert "name='420__image_url'" in body
+98
View File
@@ -23,3 +23,101 @@ def test_get_next_scheduled_event():
nxt = main.get_next_scheduled_event(now) nxt = main.get_next_scheduled_event(now)
assert nxt["type"] == "reminder" assert nxt["type"] == "reminder"
assert nxt["at"].hour == 11 and nxt["at"].minute == 15 assert nxt["at"].hour == 11 and nxt["at"].minute == 15
def test_schedule_startup_test_cleanup_when_sent(monkeypatch):
captured: dict[str, object] = {}
delete_calls: list[tuple[int, str | None]] = []
class FakeEvery:
@property
def minutes(self):
return self
def do(self, fn, *args, **kwargs):
captured["job"] = lambda: fn(*args, **kwargs)
return object()
monkeypatch.setattr(main.schedule, "every", lambda n: FakeEvery())
monkeypatch.setattr(
main,
"delete_old_messages",
lambda minutes, content_pattern=None: delete_calls.append(
(minutes, content_pattern)),
)
main._schedule_startup_test_cleanup(True)
assert "job" in captured
result = captured["job"]()
assert result == main.schedule.CancelJob
assert delete_calls == [(1, main.TEST_MESSAGE_DELETE_PATTERN)]
def test_schedule_startup_test_cleanup_skips_when_not_sent(monkeypatch):
called = {"value": False}
class FakeEvery:
@property
def minutes(self):
return self
def do(self, fn, *args, **kwargs):
called["value"] = True
return object()
monkeypatch.setattr(main.schedule, "every", lambda n: FakeEvery())
main._schedule_startup_test_cleanup(False)
assert called["value"] is False
def test_main_sends_startup_test_and_deletes_it(monkeypatch):
send_calls: list[str] = []
delete_calls: list[tuple[int, str | None]] = []
scheduled_jobs: dict[int, list[object]] = {1: [], 5: []}
monkeypatch.setattr(main, "start_dashboard", lambda: None)
monkeypatch.setattr(main, "schedule_notification",
lambda interval, at, type: None)
class FakeEvery:
def __init__(self, minutes_value: int):
self.minutes_value = minutes_value
@property
def minutes(self):
return self
def do(self, fn, *args, **kwargs):
scheduled_jobs.setdefault(self.minutes_value, []).append(
lambda: fn(*args, **kwargs)
)
return object()
monkeypatch.setattr(main.schedule, "every", lambda n: FakeEvery(n))
def fake_send_notification(message: str) -> bool:
send_calls.append(message)
return True
def fake_delete_old_messages(minutes: int = 6, content_pattern: str | None = None):
delete_calls.append((minutes, content_pattern))
def fake_run_pending():
for job in scheduled_jobs.get(1, []):
job()
raise KeyboardInterrupt()
monkeypatch.setattr(main, "send_notification", fake_send_notification)
monkeypatch.setattr(main, "delete_old_messages", fake_delete_old_messages)
monkeypatch.setattr(main.schedule, "run_pending", fake_run_pending)
monkeypatch.setattr(main.time, "sleep", lambda s: None)
monkeypatch.setenv("DISCORD_WEBHOOK_URL", "http://example.com/webhook")
main.WEBHOOK_URL = "http://example.com/webhook"
main.main()
assert send_calls == ["test"]
assert delete_calls == [(1, main.TEST_MESSAGE_DELETE_PATTERN)]
+38
View File
@@ -53,6 +53,44 @@ def test_should_delete_message():
) )
def test_message_matches_pattern_content_and_embeds():
message = {
"content": "This is a smoke test payload",
"embeds": [
{"title": "Reminder", "description": "Half-time in 5 minutes"}
],
}
assert maintenance.message_matches_pattern(message, r"smoke test")
assert maintenance.message_matches_pattern(message, r"half-time")
assert not maintenance.message_matches_pattern(message, r"does-not-match")
def test_should_delete_message_with_content_pattern():
ts = int(datetime(2026, 1, 1, 10, 0, 0, tzinfo=timezone.utc).timestamp())
message = {
"timestamp": "2026-01-01T10:00:00Z",
"webhook_id": "w",
"author": {"id": "a"},
"embeds": [{"description": "This is a test notification."}],
}
assert maintenance.should_delete_message(
message,
webhook_id="w",
author_id="a",
cutoff=ts,
content_pattern=r"test notification",
)
assert not maintenance.should_delete_message(
message,
webhook_id="w",
author_id="a",
cutoff=ts,
content_pattern=r"production-only",
)
def test_get_rate_limit_retry_after_header_priority(): def test_get_rate_limit_retry_after_header_priority():
response = DummyResponse( response = DummyResponse(
headers={ headers={
+3
View File
@@ -20,6 +20,9 @@ def test_save_and_load_templates_roundtrip(tmp_path):
} }
save_templates(path, data) save_templates(path, data)
raw = path.read_text(encoding="utf-8")
assert '"color": "#00007B"' in raw
loaded = load_templates(path) loaded = load_templates(path)
assert loaded["420"]["text"] == "Custom" assert loaded["420"]["text"] == "Custom"
assert loaded["420"]["color"] == 123 assert loaded["420"]["color"] == 123