Files
contact.allucanget.biz/server/rate_limit.py
zwitschi 4cefd4e3ab
Some checks failed
CI / test (3.11) (push) Failing after 5m36s
CI / build-image (push) Has been skipped
v1
2025-10-22 16:48:55 +02:00

76 lines
2.4 KiB
Python

"""Rate limiting helpers with optional Redis support."""
from __future__ import annotations
import logging
import os
import time
from collections import defaultdict, deque
from typing import DefaultDict, Deque
from . import settings
try:
import redis
except Exception: # redis is optional
redis = None # type: ignore
_rate_tracker: DefaultDict[str, Deque[float]] = defaultdict(deque)
def allow_request(client_ip: str) -> bool:
"""Return True when the client is allowed to make a request."""
if settings.RATE_LIMIT_MAX <= 0:
return True
if settings.REDIS_URL and redis is not None:
try:
client = redis.from_url(settings.REDIS_URL, decode_responses=True)
key = f"rl:{client_ip}"
lua = (
"local key=KEYS[1]\n"
"local now=tonumber(ARGV[1])\n"
"local window=tonumber(ARGV[2])\n"
"local limit=tonumber(ARGV[3])\n"
"local member=ARGV[4]\n"
"redis.call('ZADD', key, now, member)\n"
"redis.call('ZREMRANGEBYSCORE', key, 0, now - window)\n"
"local cnt = redis.call('ZCARD', key)\n"
"redis.call('EXPIRE', key, window)\n"
"if cnt > limit then return 0 end\n"
"return cnt\n"
)
now_ts = int(time.time() * 1000)
member = f"{now_ts}-{os.getpid()}-{int(time.time_ns() % 1000000)}"
result = client.eval(
lua,
1,
key,
str(now_ts),
str(settings.RATE_LIMIT_WINDOW * 1000),
str(settings.RATE_LIMIT_MAX),
member,
)
try:
count = int(str(result))
except Exception:
logging.exception("Unexpected Redis eval result: %r", result)
return False
return count != 0
except Exception as exc:
logging.exception("Redis rate limiter error, falling back to memory: %s", exc)
now = time.time()
bucket = _rate_tracker[client_ip]
while bucket and now - bucket[0] > settings.RATE_LIMIT_WINDOW:
bucket.popleft()
if len(bucket) >= settings.RATE_LIMIT_MAX:
return False
bucket.append(now)
if len(bucket) > settings.RATE_LIMIT_MAX * 2:
while len(bucket) > settings.RATE_LIMIT_MAX:
bucket.popleft()
return True