import logging import os import time from datetime import datetime, timedelta import requests WEBHOOK_AUTHOR_ID = "1413817504194760766" def get_discord_headers() -> dict[str, str]: token = os.getenv("DISCORD_BOT_TOKEN") return { "Authorization": f"Bot {token}", "Content-Type": "application/json", } def parse_message_timestamp(message: dict) -> datetime: return datetime.fromisoformat(message["timestamp"].replace("Z", "")) def build_delete_entry(message: dict) -> dict: return { "id": message.get("id"), "timestamp": parse_message_timestamp(message), } def parse_float(value: str | int | float | None) -> float | None: if value is None: return None try: return float(value) except (TypeError, ValueError): return None def should_delete_message( message: dict, webhook_id: str, author_id: str, cutoff: int, ) -> bool: message_timestamp = int(parse_message_timestamp(message).timestamp()) return ( message_timestamp <= cutoff and message.get("webhook_id") == webhook_id and message.get("author", {}).get("id") == author_id ) def get_rate_limit_retry_after(response: requests.Response) -> float | None: header_retry_after = parse_float(response.headers.get("Retry-After")) if header_retry_after is not None: return header_retry_after reset_after = parse_float(response.headers.get("X-RateLimit-Reset-After")) if reset_after is not None: return reset_after try: payload = response.json() except ValueError: return None return parse_float(payload.get("retry_after")) def get_bucket_exhausted_delay(response: requests.Response) -> float | None: remaining = response.headers.get("X-RateLimit-Remaining") if remaining != "0": return None return parse_float(response.headers.get("X-RateLimit-Reset-After")) def sleep_for_rate_limit(delay_seconds: float, reason: str) -> None: if delay_seconds <= 0: return logging.info( f"Waiting {delay_seconds:.3f}s for Discord rate limit reset ({reason}).") time.sleep(delay_seconds) def find_last_message_by_author( headers: dict[str, str], guild_id: str, channel_id: str, author_id: str, ) -> dict | None: """Find the newest indexed message for the author in the target guild channel.""" url = f"https://discord.com/api/v10/guilds/{guild_id}/messages/search" params = [ ("author_id", author_id), ("author_type", "webhook"), ("channel_id", channel_id), ("sort_by", "timestamp"), ("sort_order", "desc"), ("limit", "10"), ] for _ in range(3): try: response = requests.get( url, headers=headers, params=params, timeout=10) except requests.RequestException as e: logging.error(f"Error searching guild messages: {e}") return None if response.status_code == 202: payload = response.json() retry_after = float(payload.get("retry_after", 1) or 1) logging.info( f"Guild search index not ready. Retrying after {retry_after} seconds." ) time.sleep(retry_after) continue if response.status_code == 429: retry_after = get_rate_limit_retry_after(response) or 1.0 sleep_for_rate_limit(retry_after, "guild search") continue if response.status_code != 200: logging.error( f"Failed to search guild messages: {response.status_code} - {response.text}" ) return None message_groups = response.json().get("messages", []) if not message_groups or not message_groups[0]: return None return message_groups[0][0] logging.error("Guild search index did not become available in time.") return None def fetch_messages_to_delete(headers: dict, channel_id: str, webhook_id: str, author_id: str, cutoff: int, last_message_id: str | None = None) -> tuple[list[dict], str | None]: """ Fetch messages from the channel that are older than the cutoff timestamp and sent by the webhook. Uses pagination with the 'before' parameter to resume from the last processed message. Returns a tuple of (list of messages to delete, last message ID for pagination). """ url = f"https://discord.com/api/v10/channels/{channel_id}/messages" params: dict[str, str | int] = { "limit": 100, } if last_message_id: params["before"] = last_message_id try: for _ in range(3): response = requests.get(url, headers=headers, params=params, timeout=10) if response.status_code == 429: retry_after = get_rate_limit_retry_after(response) or 1.0 sleep_for_rate_limit(retry_after, "channel message fetch") continue break else: logging.error( "Failed to fetch messages after repeated rate limits.") return [], last_message_id if response.status_code == 200: messages = response.json() delete_list = [] new_last_message_id = None for message in messages: new_last_message_id = message.get("id") if should_delete_message( message, webhook_id, author_id, cutoff, ): delete_list.append(build_delete_entry(message)) if len(delete_list) >= 100: break return delete_list, new_last_message_id logging.error( f"Failed to fetch messages: {response.status_code} - {response.text}") return [], last_message_id except requests.RequestException as e: logging.error(f"Error fetching messages: {e}") return [], last_message_id def delete_message(headers: dict, channel_id: str, message_id: str) -> tuple[bool, float | None, bool]: """ Delete a single message from the channel. Returns: tuple[bool, float | None, bool]: - whether the delete succeeded - how long to wait before the next request, if any - whether to abort the batch because further requests would be invalid """ delete_url = f"https://discord.com/api/v10/channels/{channel_id}/messages/{message_id}" delete_response = requests.delete(delete_url, headers=headers, timeout=10) if delete_response.status_code == 204: return True, get_bucket_exhausted_delay(delete_response), False if delete_response.status_code == 429: retry_after = get_rate_limit_retry_after(delete_response) or 1.0 scope = delete_response.headers.get("X-RateLimit-Scope", "unknown") is_global = delete_response.headers.get( "X-RateLimit-Global", "false").lower() == "true" logging.warning( "Discord rate limit hit while deleting message %s: scope=%s global=%s retry_after=%.3fs", message_id, scope, is_global, retry_after, ) return False, retry_after, False if delete_response.status_code in {401, 403}: logging.error( "Failed to delete message %s: %s - %s. Stopping deletes to avoid invalid request spam.", message_id, delete_response.status_code, delete_response.text, ) return False, None, True logging.error( f"Failed to delete message {message_id}: {delete_response.status_code} - {delete_response.text}") return False, None, False def delete_old_messages(minutes: int = 6) -> None: """ Delete all messages sent by the webhook in the last `minutes` minutes. Uses a dynamic slowdown to avoid hitting Discord API rate limits and pagination to fetch all messages. """ discord_bot_token = os.getenv("DISCORD_BOT_TOKEN") discord_channel_id = os.getenv("DISCORD_CHANNEL_ID") guild_id = os.getenv("DISCORD_GUILD_ID") if not discord_bot_token or not discord_channel_id or not guild_id: logging.error( "DISCORD_BOT_TOKEN, DISCORD_CHANNEL_ID, or DISCORD_GUILD_ID not set") return headers = get_discord_headers() cutoff_timestamp = datetime.now() - timedelta(minutes=minutes) cutoff = int(cutoff_timestamp.timestamp()) webhook_id = WEBHOOK_AUTHOR_ID author_id = WEBHOOK_AUTHOR_ID last_author_message = find_last_message_by_author( headers, guild_id, discord_channel_id, author_id, ) if last_author_message is None: logging.info("No indexed messages found for the target author.") return last_message_id = last_author_message.get("id") if not last_message_id: logging.info("Search result did not contain a message id.") return deleted_count = 0 if should_delete_message( last_author_message, webhook_id, author_id, cutoff, ): anchor_message = build_delete_entry(last_author_message) deleted, wait_seconds, abort_batch = delete_message( headers, discord_channel_id, anchor_message["id"], ) if deleted: deleted_count += 1 logging.info( f"Deleted message {anchor_message['id']} from {anchor_message['timestamp'].isoformat()}" ) elif abort_batch: return if wait_seconds is not None: sleep_for_rate_limit(wait_seconds, "delete bucket") while True: delete_list, next_last_message_id = fetch_messages_to_delete( headers, discord_channel_id, webhook_id, author_id, cutoff, last_message_id ) if not delete_list: if deleted_count == 0: logging.info("No messages to delete.") else: logging.info("No more messages to delete.") break for message in delete_list: message_id = message["id"] message_time = message["timestamp"] deleted, wait_seconds, abort_batch = delete_message( headers, discord_channel_id, message_id, ) if deleted: deleted_count += 1 logging.info( f"Deleted message {message_id} from {message_time.isoformat()}") elif abort_batch: logging.info( "Stopping delete batch after an invalid Discord response.") return if wait_seconds is not None: sleep_for_rate_limit(wait_seconds, "delete bucket") if next_last_message_id is None or next_last_message_id == last_message_id: break last_message_id = next_last_message_id logging.info( f"Deleted {deleted_count} messages older than {minutes} minutes sent by the webhook.")