Compare commits

..

8 Commits

Author SHA1 Message Date
zwitschi 3412a5ccaa feat: enhance message handling with content pattern matching and update environment configuration
Build and Deploy Docker Container / test (push) Successful in 14s
Build and Deploy Docker Container / build-and-deploy (push) Successful in 29s
2026-05-10 14:24:07 +02:00
zwitschi 915c55d7ed feat: remove pip installation command from nixpacks configuration
Build and Deploy Docker Container / test (push) Successful in 17s
Build and Deploy Docker Container / build-and-deploy (push) Successful in 49s
2026-05-10 13:44:39 +02:00
zwitschi 788f3ea6b7 feat: remove deployment step from Docker workflow and add nixpacks configuration
Build and Deploy Docker Container / test (push) Successful in 14s
Build and Deploy Docker Container / build-and-deploy (push) Successful in 29s
2026-05-10 13:31:30 +02:00
zwitschi 0952c21c7b feat: update dashboard port from 8080 to 8420 in configuration and documentation 2026-05-10 13:24:49 +02:00
zwitschi f88f60a019 feat: add functions to check and update 4:20 timezone cache
Build and Deploy Docker Container / test (push) Successful in 14s
Build and Deploy Docker Container / build-and-deploy (push) Failing after 30s
2026-05-10 13:06:31 +02:00
zwitschi 565c4078bb feat: enhance color handling in templates and dashboard with hex support 2026-05-10 12:55:39 +02:00
zwitschi 8f8c3655db refactor: replace pandas with csv module for timezone data loading 2026-05-10 12:43:11 +02:00
zwitschi 584231b0df chore: remove unused numpy and pandas dependencies from requirements 2026-05-10 12:37:31 +02:00
17 changed files with 370 additions and 99 deletions
+9
View File
@@ -1,2 +1,11 @@
# Replace with your actual Discord webhook URL
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/<your_webhook_id>/<your_webhook_token>
# Replace with your Discord bot token
DISCORD_BOT_TOKEN=<your_bot_token>
# Replace with your Discord channel ID
DISCORD_CHANNEL_ID=<your_channel_id>
# Replace with your Discord guild/server ID
DISCORD_GUILD_ID=<your_guild_id>
-12
View File
@@ -53,15 +53,3 @@ jobs:
context: .
push: true
tags: git.allucanget.biz/${{ secrets.REGISTRY_USERNAME }}/thc-webhook:latest
- name: Deploy to Portainer
uses: appleboy/ssh-action@v0.1.7
with:
host: ${{ secrets.SERVER_HOST }}
username: ${{ secrets.SERVER_USER }}
key: ${{ secrets.SERVER_SSH_KEY }}
script: |
docker stop thc-webhook || true
docker rm thc-webhook || true
docker pull git.allucanget.biz/${{ secrets.REGISTRY_USERNAME }}/thc-webhook:latest
docker run -d --name thc-webhook -e DISCORD_WEBHOOK_URL=${{ secrets.DISCORD_WEBHOOK_URL }} -p 8080:8080 git.allucanget.biz/${{ secrets.REGISTRY_USERNAME }}/thc-webhook:latest
+1 -1
View File
@@ -12,7 +12,7 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Dashboard (Flask) port
EXPOSE 8080
EXPOSE 8420
# Create a non-root user
RUN addgroup -S appgroup && adduser -S appuser -G appgroup
+2 -2
View File
@@ -38,13 +38,13 @@ The app will run continuously and send notifications at the scheduled times.
### Dashboard
By default, a minimal dashboard is available at `http://localhost:8080/`.
By default, a minimal dashboard is available at `http://localhost:8420/`.
You can disable it by setting `DASHBOARD_ENABLED=0`.
### Admin
You can edit the embed message templates at `http://localhost:8080/admin`.
You can edit the embed message templates at `http://localhost:8420/admin`.
- Templates are saved to `templates.json` by default.
- Override the location with `TEMPLATES_PATH=/path/to/templates.json`.
+17 -2
View File
@@ -32,6 +32,17 @@ def get_html_template(content) -> str:
return HTML_TEMPLATE.format(content=content)
def _as_hex_color(value: int | str | None) -> str:
if isinstance(value, int):
return f"#{value:06X}"
if isinstance(value, str):
try:
return f"#{parse_color(value):06X}"
except ValueError:
return "#000000"
return "#000000"
def create_app(
*,
get_state: Callable[[], dict],
@@ -77,13 +88,14 @@ def create_app(
blocks = []
for key, tpl in templates.items():
text = (tpl.get("text") or "").replace("'", "&#39;")
color = tpl.get("color")
color_hex = _as_hex_color(tpl.get("color"))
image_url = tpl.get("image_url") or ""
blocks.append(
"<fieldset style='margin-bottom:16px;'>"
f"<legend><strong>{key}</strong></legend>"
f"<label>Text<br><textarea name='{key}__text' rows='3' style='width:100%'>{text}</textarea></label><br>"
f"<label>Color<br><input name='{key}__color' value='{color}' style='width:200px'></label><br>"
f"<label>Color<br><input type='color' name='{key}__color_picker' value='{color_hex}' oninput=\"this.form['{key}__color'].value=this.value\"></label><br>"
f"<label>Color value<br><input name='{key}__color' value='{color_hex}' style='width:200px'></label><br>"
f"<label>Image URL (optional)<br><input name='{key}__image_url' value='{image_url}' style='width:100%'></label>"
"</fieldset>"
)
@@ -108,6 +120,9 @@ def create_app(
for key in current.keys():
text = request.form.get(f"{key}__text", "").strip()
color_raw = request.form.get(f"{key}__color", "").strip()
if not color_raw:
color_raw = request.form.get(
f"{key}__color_picker", "").strip()
image_url = request.form.get(f"{key}__image_url", "").strip()
if not text:
+2 -2
View File
@@ -4,10 +4,10 @@ services:
container_name: thc-webhook-app
environment:
- DISCORD_WEBHOOK_URL=${DISCORD_WEBHOOK_URL}
- DASHBOARD_PORT=8080
- DASHBOARD_PORT=8420
restart: unless-stopped
ports:
- "8080:8080"
- "8420:8420"
logging:
driver: json-file
options:
+66 -39
View File
@@ -50,6 +50,7 @@ logging.basicConfig(
load_dotenv()
WEBHOOK_URL = os.getenv('DISCORD_WEBHOOK_URL')
TEST_MESSAGE_DELETE_PATTERN = r"test notification"
def get_state() -> dict:
@@ -71,6 +72,45 @@ def get_state_snapshot() -> dict:
return dict(STATE)
def _check_420(tz_list: list[str] | None = None) -> str:
cache = get_tzdb_cache()
timezones = load_timezones() if cache is None else []
countries = load_countries() if cache is None else []
if tz_list is None:
if cache is None:
tz_list = where_is_it_420(timezones, countries)
else:
tz_list = where_is_it_420(
timezones,
countries,
tz_names=cache.get("tz_names"),
tz_to_country_code=cache.get("tz_to_country_code"),
country_code_to_name=cache.get("country_code_to_name"),
)
if tz_list:
tz_str = "\n".join(tz_list)
return f"\nIt's 4:20 in:\n{tz_str}"
return ""
def _update_420_cache() -> None:
try:
cache = get_tzdb_cache()
if cache is None:
tz_list = where_is_it_420(load_timezones(), load_countries())
else:
tz_list = where_is_it_420(
[],
[],
tz_names=cache.get("tz_names"),
tz_to_country_code=cache.get("tz_to_country_code"),
country_code_to_name=cache.get("country_code_to_name"),
)
_update_state(last_locations=tz_list or [])
except Exception as e:
_update_state(last_locations=[], last_error=str(e))
def get_next_scheduled_event(now: datetime | None = None) -> dict:
"""Return the next scheduled notification time/type based on known minute marks."""
if now is None:
@@ -98,9 +138,6 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
"""
templates_path = os.getenv("TEMPLATES_PATH", "templates.json")
messages = load_templates(templates_path)
cache = get_tzdb_cache()
timezones = load_timezones() if cache is None else []
countries = load_countries() if cache is None else []
if type in messages:
msg = messages[type]
image_url = msg.get("image_url")
@@ -108,20 +145,7 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
msg["image"] = {"url": image_url}
if type == "420":
# Check where it's 4:20
if tz_list is None:
if cache is None:
tz_list = where_is_it_420(timezones, countries)
else:
tz_list = where_is_it_420(
timezones,
countries,
tz_names=cache.get("tz_names"),
tz_to_country_code=cache.get("tz_to_country_code"),
country_code_to_name=cache.get("country_code_to_name"),
)
if tz_list:
tz_str = "\n".join(tz_list)
msg["text"] += f"\nIt's 4:20 in:\n{tz_str}"
msg["text"] += _check_420(tz_list)
else:
msg = {"text": "Unknown notification type", "color": 0xFF0000}
embed = {
@@ -135,13 +159,16 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
return embed
def send_notification(message: str) -> None:
def send_notification(message: str) -> bool:
"""
Send a notification to the Discord webhook.
Returns:
bool: True when the webhook accepted the notification, False otherwise.
"""
if not WEBHOOK_URL:
logging.error("WEBHOOK_URL not set")
return
return False
_update_state(
last_type=message,
@@ -159,21 +186,7 @@ def send_notification(message: str) -> None:
tz_list: list[str] | None = None
if message == "420":
try:
cache = get_tzdb_cache()
if cache is None:
tz_list = where_is_it_420(load_timezones(), load_countries())
else:
tz_list = where_is_it_420(
[],
[],
tz_names=cache.get("tz_names"),
tz_to_country_code=cache.get("tz_to_country_code"),
country_code_to_name=cache.get("country_code_to_name"),
)
_update_state(last_locations=tz_list or [])
except Exception as e:
_update_state(last_locations=[], last_error=str(e))
_update_420_cache()
embed = create_embed(message, tz_list=tz_list)
data = {"embeds": [embed]}
@@ -183,6 +196,7 @@ def send_notification(message: str) -> None:
logging.info(f"Notification sent: {message}")
_update_state(last_success_at=datetime.now(),
last_status_code=response.status_code)
return True
else:
logging.error(
f"Failed to send notification: {response.status_code} - "
@@ -190,9 +204,23 @@ def send_notification(message: str) -> None:
)
_update_state(last_status_code=response.status_code,
last_error=response.text)
return False
except requests.RequestException as e:
logging.error(f"Error sending notification: {e}")
_update_state(last_error=str(e))
return False
def _schedule_startup_test_cleanup(test_sent: bool) -> None:
"""Schedule one-time cleanup for the startup test notification."""
if not test_sent:
return
def cleanup_startup_test_message() -> schedule.CancelJob:
delete_old_messages(1, content_pattern=TEST_MESSAGE_DELETE_PATTERN)
return schedule.CancelJob
schedule.every(1).minutes.do(cleanup_startup_test_message)
def schedule_notification(interval: str, at: str, type: str) -> None:
@@ -208,7 +236,7 @@ def schedule_notification(interval: str, at: str, type: str) -> None:
def start_dashboard() -> None:
"""Compatibility hook for tests and optional dashboard startup."""
app = create_app(get_state=get_state, get_next_event=get_next_event)
app.run(host="0.0.0.0", port=8080, debug=False, use_reloader=False)
app.run(host="0.0.0.0", port=8420, debug=False, use_reloader=False)
def main() -> None:
@@ -228,10 +256,9 @@ def main() -> None:
logging.info("Scheduler started.")
# Test the notification on startup
send_notification("test")
# delete the test message after a short delay to keep the channel clean
schedule.every(1).minutes.do(delete_old_messages, 1)
# Send one startup test message and cleanup only if send succeeded.
test_sent = send_notification("test")
_schedule_startup_test_cleanup(test_sent)
# delete old messages on startup to clean up any previous notifications
# delete_old_messages(6)
+52 -3
View File
@@ -1,5 +1,6 @@
import logging
import os
import re
import time
from datetime import datetime, timedelta
@@ -42,15 +43,55 @@ def should_delete_message(
webhook_id: str,
author_id: str,
cutoff: int,
content_pattern: str | None = None,
) -> bool:
message_timestamp = int(parse_message_timestamp(message).timestamp())
return (
message_timestamp <= cutoff
and message.get("webhook_id") == webhook_id
and message.get("author", {}).get("id") == author_id
and message_matches_pattern(message, content_pattern)
)
def message_matches_pattern(message: dict, content_pattern: str | None = None) -> bool:
"""Return True when message content/embed text matches the optional pattern."""
if not content_pattern:
return True
text_chunks: list[str] = []
content = message.get("content")
if isinstance(content, str) and content:
text_chunks.append(content)
embeds = message.get("embeds")
if isinstance(embeds, list):
for embed in embeds:
if not isinstance(embed, dict):
continue
title = embed.get("title")
description = embed.get("description")
if isinstance(title, str) and title:
text_chunks.append(title)
if isinstance(description, str) and description:
text_chunks.append(description)
footer = embed.get("footer")
if isinstance(footer, dict):
footer_text = footer.get("text")
if isinstance(footer_text, str) and footer_text:
text_chunks.append(footer_text)
if not text_chunks:
return False
searchable_text = "\n".join(text_chunks)
try:
return re.search(content_pattern, searchable_text, flags=re.IGNORECASE) is not None
except re.error:
return content_pattern.lower() in searchable_text.lower()
def get_rate_limit_retry_after(response: requests.Response) -> float | None:
header_retry_after = parse_float(response.headers.get("Retry-After"))
if header_retry_after is not None:
@@ -137,7 +178,7 @@ def find_last_message_by_author(
return None
def fetch_messages_to_delete(headers: dict, channel_id: str, webhook_id: str, author_id: str, cutoff: int, last_message_id: str | None = None) -> tuple[list[dict], str | None]:
def fetch_messages_to_delete(headers: dict, channel_id: str, webhook_id: str, author_id: str, cutoff: int, last_message_id: str | None = None, content_pattern: str | None = None) -> tuple[list[dict], str | None]:
"""
Fetch messages from the channel that are older than the cutoff timestamp and sent by the webhook.
Uses pagination with the 'before' parameter to resume from the last processed message.
@@ -180,6 +221,7 @@ def fetch_messages_to_delete(headers: dict, channel_id: str, webhook_id: str, au
webhook_id,
author_id,
cutoff,
content_pattern,
):
delete_list.append(build_delete_entry(message))
@@ -240,7 +282,7 @@ def delete_message(headers: dict, channel_id: str, message_id: str) -> tuple[boo
return False, None, False
def delete_old_messages(minutes: int = 6) -> None:
def delete_old_messages(minutes: int = 6, content_pattern: str | None = None) -> None:
"""
Delete all messages sent by the webhook in the last `minutes` minutes.
Uses a dynamic slowdown to avoid hitting Discord API rate limits and pagination to fetch all messages.
@@ -283,6 +325,7 @@ def delete_old_messages(minutes: int = 6) -> None:
webhook_id,
author_id,
cutoff,
content_pattern,
):
anchor_message = build_delete_entry(last_author_message)
deleted, wait_seconds, abort_batch = delete_message(
@@ -303,7 +346,13 @@ def delete_old_messages(minutes: int = 6) -> None:
while True:
delete_list, next_last_message_id = fetch_messages_to_delete(
headers, discord_channel_id, webhook_id, author_id, cutoff, last_message_id
headers,
discord_channel_id,
webhook_id,
author_id,
cutoff,
last_message_id,
content_pattern,
)
if not delete_list:
+11
View File
@@ -0,0 +1,11 @@
[phases.install]
[phases.test]
dependsOn = ["install"]
cmds = ["pytest --maxfail=1 --disable-warnings -q"]
[start]
cmd = "python main.py"
[variables]
PORT = "8420"
-2
View File
@@ -1,5 +1,3 @@
numpy
pandas
pytest
python-dotenv
pytz
+5 -5
View File
@@ -1,24 +1,24 @@
{
"420": {
"color": 3066993,
"color": "#2ECC71",
"image_url": "https://copyparty.allucanget.biz/img/weed.png",
"text": "Blaze it!"
},
"halftime": {
"color": 3066993,
"color": "#2ECC71",
"image_url": "https://copyparty.allucanget.biz/img/weed.png",
"text": "Half-time!"
},
"reminder": {
"color": 15105570,
"color": "#E67E22",
"text": "This is your 5 minute reminder to 420!"
},
"reminder_halftime": {
"color": 15105570,
"color": "#E67E22",
"text": "Half-time in 5 minutes!"
},
"test": {
"color": 3447003,
"color": "#3498DB",
"text": "This is a test notification."
}
}
+12 -1
View File
@@ -49,6 +49,11 @@ def _normalize_templates(raw: dict) -> dict[str, dict]:
color = incoming.get("color")
if isinstance(color, int):
out[key]["color"] = color
elif isinstance(color, str):
try:
out[key]["color"] = parse_color(color)
except ValueError:
pass
image_url = incoming.get("image_url")
if isinstance(image_url, str) and image_url.strip():
@@ -74,10 +79,16 @@ def load_templates(path: str | Path) -> dict[str, dict]:
def save_templates(path: str | Path, templates: dict) -> None:
p = Path(path)
normalized = _normalize_templates(templates)
serialized = deepcopy(normalized)
for tpl in serialized.values():
color = tpl.get("color")
if isinstance(color, int):
tpl["color"] = f"#{color:06X}"
p.parent.mkdir(parents=True, exist_ok=True)
tmp = p.with_suffix(p.suffix + ".tmp")
tmp.write_text(json.dumps(normalized, indent=2,
tmp.write_text(json.dumps(serialized, indent=2,
sort_keys=True) + "\n", encoding="utf-8")
tmp.replace(p)
+52 -30
View File
@@ -1,6 +1,6 @@
import pytz
from datetime import datetime
import pandas as pd
from csv import DictReader
def get_tz_info(tz_name: str, timezones: list[dict]) -> dict | None:
@@ -41,52 +41,74 @@ def load_tz_file():
"abbreviation", "time_start", "gmt_offset", "dst"]
# columns to load
load_columns = ["zone_name", "country_code"]
# read csv with pandas
df = pd.read_csv(timezone_file, names=timezone_names)
# read csv
with open(timezone_file, newline='') as csvfile:
reader = DictReader(csvfile, fieldnames=timezone_names)
csv = [row for row in reader]
# drop all columns except load_columns
df = df[load_columns]
csv = [{k: v for k, v in row.items() if k in load_columns} for row in csv]
# distinct zone_names
df = df.drop_duplicates(subset=["zone_name"])
seen = set()
unique_csv = []
for row in csv:
if row["zone_name"] not in seen:
seen.add(row["zone_name"])
unique_csv.append(row)
csv = unique_csv
# reset index
df = df.reset_index(drop=True)
return df
return csv
def main():
# read csv with pandas
df_file = load_tz_file()
# read csv file and load timezones and countries
csv = load_tz_file()
# split zone_name into components by "/"
df_file[['region', 'city']] = df_file['zone_name'].str.split(
'/', expand=True, n=1)
for row in csv:
parts = row["zone_name"].split("/", 1)
row["region"] = parts[0]
row["city"] = parts[1] if len(parts) > 1 else None
# drop regions with no country_code (like Etc, GMT, etc)
df_file = df_file[df_file['country_code'].notna()]
csv = [row for row in csv if row["country_code"]]
# get all timezones from pytz and split into region and city
tz = [{"zone_name": tz} for tz in pytz.all_timezones]
df_tz = pd.DataFrame(pytz.all_timezones)
# rename column to zone_name
df_tz = df_tz.rename(columns={0: 'zone_name'})
# split zone_name into components by "/"
df_tz[['region', 'city']] = df_tz['zone_name'].str.split(
'/', expand=True, n=1)
for row in tz:
parts = row["zone_name"].split("/", 1)
row["region"] = parts[0]
row["city"] = parts[1] if len(parts) > 1 else None
# drop regions with no city (like UTC, GMT, etc)
df_tz = df_tz[df_tz['city'].notna()]
tz = [row for row in tz if row["city"]]
# drop rows where region is 'Etc'
df_tz = df_tz[df_tz['region'] != 'Etc']
tz = [row for row in tz if row["region"] != "Etc"]
# join data on region and city
timezones = []
for tz_row in tz:
for csv_row in csv:
if tz_row["region"] == csv_row["region"] and tz_row["city"] == csv_row["city"]:
timezones.append({
"zone_name": tz_row["zone_name"],
"country_code": csv_row["country_code"],
"region": tz_row["region"],
"city": tz_row["city"],
})
break
# join dataframes on region and city
df_merged = pd.merge(df_file, df_tz, on=[
'region', 'city'], how='inner', indicator=True)
# reorder columns
df_merged = df_merged[['region', 'city', 'country_code']]
# print merged dataframe
print(f"Merged timezones: {len(df_merged)}")
print(df_merged.sample(20).to_string(index=False))
regions = df_merged['region'].unique()
timezones = [{k: row[k] for k in ['region', 'city', 'country_code']}
for row in timezones]
# print merged data
print(f"Merged timezones: {len(timezones)}")
print(timezones[:20])
regions = set(row['region'] for row in timezones)
for region in regions:
df_region = df_merged[df_merged['region'] == region]
df_region = [row for row in timezones if row['region'] == region]
print(f"{len(df_region)} merged in {region}")
+2
View File
@@ -72,6 +72,8 @@ def test_admin_get_renders_template_form(monkeypatch):
assert "Admin: templates" in body
assert "name='420__text'" in body
assert "name='420__color'" in body
assert "name='420__color_picker'" in body
assert "type='color'" in body
assert "name='420__image_url'" in body
+98
View File
@@ -23,3 +23,101 @@ def test_get_next_scheduled_event():
nxt = main.get_next_scheduled_event(now)
assert nxt["type"] == "reminder"
assert nxt["at"].hour == 11 and nxt["at"].minute == 15
def test_schedule_startup_test_cleanup_when_sent(monkeypatch):
captured: dict[str, object] = {}
delete_calls: list[tuple[int, str | None]] = []
class FakeEvery:
@property
def minutes(self):
return self
def do(self, fn, *args, **kwargs):
captured["job"] = lambda: fn(*args, **kwargs)
return object()
monkeypatch.setattr(main.schedule, "every", lambda n: FakeEvery())
monkeypatch.setattr(
main,
"delete_old_messages",
lambda minutes, content_pattern=None: delete_calls.append(
(minutes, content_pattern)),
)
main._schedule_startup_test_cleanup(True)
assert "job" in captured
result = captured["job"]()
assert result == main.schedule.CancelJob
assert delete_calls == [(1, main.TEST_MESSAGE_DELETE_PATTERN)]
def test_schedule_startup_test_cleanup_skips_when_not_sent(monkeypatch):
called = {"value": False}
class FakeEvery:
@property
def minutes(self):
return self
def do(self, fn, *args, **kwargs):
called["value"] = True
return object()
monkeypatch.setattr(main.schedule, "every", lambda n: FakeEvery())
main._schedule_startup_test_cleanup(False)
assert called["value"] is False
def test_main_sends_startup_test_and_deletes_it(monkeypatch):
send_calls: list[str] = []
delete_calls: list[tuple[int, str | None]] = []
scheduled_jobs: dict[int, list[object]] = {1: [], 5: []}
monkeypatch.setattr(main, "start_dashboard", lambda: None)
monkeypatch.setattr(main, "schedule_notification",
lambda interval, at, type: None)
class FakeEvery:
def __init__(self, minutes_value: int):
self.minutes_value = minutes_value
@property
def minutes(self):
return self
def do(self, fn, *args, **kwargs):
scheduled_jobs.setdefault(self.minutes_value, []).append(
lambda: fn(*args, **kwargs)
)
return object()
monkeypatch.setattr(main.schedule, "every", lambda n: FakeEvery(n))
def fake_send_notification(message: str) -> bool:
send_calls.append(message)
return True
def fake_delete_old_messages(minutes: int = 6, content_pattern: str | None = None):
delete_calls.append((minutes, content_pattern))
def fake_run_pending():
for job in scheduled_jobs.get(1, []):
job()
raise KeyboardInterrupt()
monkeypatch.setattr(main, "send_notification", fake_send_notification)
monkeypatch.setattr(main, "delete_old_messages", fake_delete_old_messages)
monkeypatch.setattr(main.schedule, "run_pending", fake_run_pending)
monkeypatch.setattr(main.time, "sleep", lambda s: None)
monkeypatch.setenv("DISCORD_WEBHOOK_URL", "http://example.com/webhook")
main.WEBHOOK_URL = "http://example.com/webhook"
main.main()
assert send_calls == ["test"]
assert delete_calls == [(1, main.TEST_MESSAGE_DELETE_PATTERN)]
+38
View File
@@ -53,6 +53,44 @@ def test_should_delete_message():
)
def test_message_matches_pattern_content_and_embeds():
message = {
"content": "This is a smoke test payload",
"embeds": [
{"title": "Reminder", "description": "Half-time in 5 minutes"}
],
}
assert maintenance.message_matches_pattern(message, r"smoke test")
assert maintenance.message_matches_pattern(message, r"half-time")
assert not maintenance.message_matches_pattern(message, r"does-not-match")
def test_should_delete_message_with_content_pattern():
ts = int(datetime(2026, 1, 1, 10, 0, 0, tzinfo=timezone.utc).timestamp())
message = {
"timestamp": "2026-01-01T10:00:00Z",
"webhook_id": "w",
"author": {"id": "a"},
"embeds": [{"description": "This is a test notification."}],
}
assert maintenance.should_delete_message(
message,
webhook_id="w",
author_id="a",
cutoff=ts,
content_pattern=r"test notification",
)
assert not maintenance.should_delete_message(
message,
webhook_id="w",
author_id="a",
cutoff=ts,
content_pattern=r"production-only",
)
def test_get_rate_limit_retry_after_header_priority():
response = DummyResponse(
headers={
+3
View File
@@ -20,6 +20,9 @@ def test_save_and_load_templates_roundtrip(tmp_path):
}
save_templates(path, data)
raw = path.read_text(encoding="utf-8")
assert '"color": "#00007B"' in raw
loaded = load_templates(path)
assert loaded["420"]["text"] == "Custom"
assert loaded["420"]["color"] == 123