Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f88f60a019 | |||
| 565c4078bb | |||
| 8f8c3655db | |||
| 584231b0df |
+17
-2
@@ -32,6 +32,17 @@ def get_html_template(content) -> str:
|
||||
return HTML_TEMPLATE.format(content=content)
|
||||
|
||||
|
||||
def _as_hex_color(value: int | str | None) -> str:
|
||||
if isinstance(value, int):
|
||||
return f"#{value:06X}"
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return f"#{parse_color(value):06X}"
|
||||
except ValueError:
|
||||
return "#000000"
|
||||
return "#000000"
|
||||
|
||||
|
||||
def create_app(
|
||||
*,
|
||||
get_state: Callable[[], dict],
|
||||
@@ -77,13 +88,14 @@ def create_app(
|
||||
blocks = []
|
||||
for key, tpl in templates.items():
|
||||
text = (tpl.get("text") or "").replace("'", "'")
|
||||
color = tpl.get("color")
|
||||
color_hex = _as_hex_color(tpl.get("color"))
|
||||
image_url = tpl.get("image_url") or ""
|
||||
blocks.append(
|
||||
"<fieldset style='margin-bottom:16px;'>"
|
||||
f"<legend><strong>{key}</strong></legend>"
|
||||
f"<label>Text<br><textarea name='{key}__text' rows='3' style='width:100%'>{text}</textarea></label><br>"
|
||||
f"<label>Color<br><input name='{key}__color' value='{color}' style='width:200px'></label><br>"
|
||||
f"<label>Color<br><input type='color' name='{key}__color_picker' value='{color_hex}' oninput=\"this.form['{key}__color'].value=this.value\"></label><br>"
|
||||
f"<label>Color value<br><input name='{key}__color' value='{color_hex}' style='width:200px'></label><br>"
|
||||
f"<label>Image URL (optional)<br><input name='{key}__image_url' value='{image_url}' style='width:100%'></label>"
|
||||
"</fieldset>"
|
||||
)
|
||||
@@ -108,6 +120,9 @@ def create_app(
|
||||
for key in current.keys():
|
||||
text = request.form.get(f"{key}__text", "").strip()
|
||||
color_raw = request.form.get(f"{key}__color", "").strip()
|
||||
if not color_raw:
|
||||
color_raw = request.form.get(
|
||||
f"{key}__color_picker", "").strip()
|
||||
image_url = request.form.get(f"{key}__image_url", "").strip()
|
||||
|
||||
if not text:
|
||||
|
||||
@@ -71,6 +71,45 @@ def get_state_snapshot() -> dict:
|
||||
return dict(STATE)
|
||||
|
||||
|
||||
def _check_420(tz_list: list[str] | None = None) -> str:
|
||||
cache = get_tzdb_cache()
|
||||
timezones = load_timezones() if cache is None else []
|
||||
countries = load_countries() if cache is None else []
|
||||
if tz_list is None:
|
||||
if cache is None:
|
||||
tz_list = where_is_it_420(timezones, countries)
|
||||
else:
|
||||
tz_list = where_is_it_420(
|
||||
timezones,
|
||||
countries,
|
||||
tz_names=cache.get("tz_names"),
|
||||
tz_to_country_code=cache.get("tz_to_country_code"),
|
||||
country_code_to_name=cache.get("country_code_to_name"),
|
||||
)
|
||||
if tz_list:
|
||||
tz_str = "\n".join(tz_list)
|
||||
return f"\nIt's 4:20 in:\n{tz_str}"
|
||||
return ""
|
||||
|
||||
|
||||
def _update_420_cache() -> None:
|
||||
try:
|
||||
cache = get_tzdb_cache()
|
||||
if cache is None:
|
||||
tz_list = where_is_it_420(load_timezones(), load_countries())
|
||||
else:
|
||||
tz_list = where_is_it_420(
|
||||
[],
|
||||
[],
|
||||
tz_names=cache.get("tz_names"),
|
||||
tz_to_country_code=cache.get("tz_to_country_code"),
|
||||
country_code_to_name=cache.get("country_code_to_name"),
|
||||
)
|
||||
_update_state(last_locations=tz_list or [])
|
||||
except Exception as e:
|
||||
_update_state(last_locations=[], last_error=str(e))
|
||||
|
||||
|
||||
def get_next_scheduled_event(now: datetime | None = None) -> dict:
|
||||
"""Return the next scheduled notification time/type based on known minute marks."""
|
||||
if now is None:
|
||||
@@ -98,9 +137,6 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
|
||||
"""
|
||||
templates_path = os.getenv("TEMPLATES_PATH", "templates.json")
|
||||
messages = load_templates(templates_path)
|
||||
cache = get_tzdb_cache()
|
||||
timezones = load_timezones() if cache is None else []
|
||||
countries = load_countries() if cache is None else []
|
||||
if type in messages:
|
||||
msg = messages[type]
|
||||
image_url = msg.get("image_url")
|
||||
@@ -108,20 +144,7 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
|
||||
msg["image"] = {"url": image_url}
|
||||
if type == "420":
|
||||
# Check where it's 4:20
|
||||
if tz_list is None:
|
||||
if cache is None:
|
||||
tz_list = where_is_it_420(timezones, countries)
|
||||
else:
|
||||
tz_list = where_is_it_420(
|
||||
timezones,
|
||||
countries,
|
||||
tz_names=cache.get("tz_names"),
|
||||
tz_to_country_code=cache.get("tz_to_country_code"),
|
||||
country_code_to_name=cache.get("country_code_to_name"),
|
||||
)
|
||||
if tz_list:
|
||||
tz_str = "\n".join(tz_list)
|
||||
msg["text"] += f"\nIt's 4:20 in:\n{tz_str}"
|
||||
msg["text"] += _check_420(tz_list)
|
||||
else:
|
||||
msg = {"text": "Unknown notification type", "color": 0xFF0000}
|
||||
embed = {
|
||||
@@ -159,21 +182,7 @@ def send_notification(message: str) -> None:
|
||||
|
||||
tz_list: list[str] | None = None
|
||||
if message == "420":
|
||||
try:
|
||||
cache = get_tzdb_cache()
|
||||
if cache is None:
|
||||
tz_list = where_is_it_420(load_timezones(), load_countries())
|
||||
else:
|
||||
tz_list = where_is_it_420(
|
||||
[],
|
||||
[],
|
||||
tz_names=cache.get("tz_names"),
|
||||
tz_to_country_code=cache.get("tz_to_country_code"),
|
||||
country_code_to_name=cache.get("country_code_to_name"),
|
||||
)
|
||||
_update_state(last_locations=tz_list or [])
|
||||
except Exception as e:
|
||||
_update_state(last_locations=[], last_error=str(e))
|
||||
_update_420_cache()
|
||||
|
||||
embed = create_embed(message, tz_list=tz_list)
|
||||
data = {"embeds": [embed]}
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
numpy
|
||||
pandas
|
||||
pytest
|
||||
python-dotenv
|
||||
pytz
|
||||
|
||||
+5
-5
@@ -1,24 +1,24 @@
|
||||
{
|
||||
"420": {
|
||||
"color": 3066993,
|
||||
"color": "#2ECC71",
|
||||
"image_url": "https://copyparty.allucanget.biz/img/weed.png",
|
||||
"text": "Blaze it!"
|
||||
},
|
||||
"halftime": {
|
||||
"color": 3066993,
|
||||
"color": "#2ECC71",
|
||||
"image_url": "https://copyparty.allucanget.biz/img/weed.png",
|
||||
"text": "Half-time!"
|
||||
},
|
||||
"reminder": {
|
||||
"color": 15105570,
|
||||
"color": "#E67E22",
|
||||
"text": "This is your 5 minute reminder to 420!"
|
||||
},
|
||||
"reminder_halftime": {
|
||||
"color": 15105570,
|
||||
"color": "#E67E22",
|
||||
"text": "Half-time in 5 minutes!"
|
||||
},
|
||||
"test": {
|
||||
"color": 3447003,
|
||||
"color": "#3498DB",
|
||||
"text": "This is a test notification."
|
||||
}
|
||||
}
|
||||
|
||||
+12
-1
@@ -49,6 +49,11 @@ def _normalize_templates(raw: dict) -> dict[str, dict]:
|
||||
color = incoming.get("color")
|
||||
if isinstance(color, int):
|
||||
out[key]["color"] = color
|
||||
elif isinstance(color, str):
|
||||
try:
|
||||
out[key]["color"] = parse_color(color)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
image_url = incoming.get("image_url")
|
||||
if isinstance(image_url, str) and image_url.strip():
|
||||
@@ -74,10 +79,16 @@ def load_templates(path: str | Path) -> dict[str, dict]:
|
||||
def save_templates(path: str | Path, templates: dict) -> None:
|
||||
p = Path(path)
|
||||
normalized = _normalize_templates(templates)
|
||||
serialized = deepcopy(normalized)
|
||||
|
||||
for tpl in serialized.values():
|
||||
color = tpl.get("color")
|
||||
if isinstance(color, int):
|
||||
tpl["color"] = f"#{color:06X}"
|
||||
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = p.with_suffix(p.suffix + ".tmp")
|
||||
tmp.write_text(json.dumps(normalized, indent=2,
|
||||
tmp.write_text(json.dumps(serialized, indent=2,
|
||||
sort_keys=True) + "\n", encoding="utf-8")
|
||||
tmp.replace(p)
|
||||
|
||||
|
||||
+52
-30
@@ -1,6 +1,6 @@
|
||||
import pytz
|
||||
from datetime import datetime
|
||||
import pandas as pd
|
||||
from csv import DictReader
|
||||
|
||||
|
||||
def get_tz_info(tz_name: str, timezones: list[dict]) -> dict | None:
|
||||
@@ -41,52 +41,74 @@ def load_tz_file():
|
||||
"abbreviation", "time_start", "gmt_offset", "dst"]
|
||||
# columns to load
|
||||
load_columns = ["zone_name", "country_code"]
|
||||
# read csv with pandas
|
||||
df = pd.read_csv(timezone_file, names=timezone_names)
|
||||
# read csv
|
||||
with open(timezone_file, newline='') as csvfile:
|
||||
reader = DictReader(csvfile, fieldnames=timezone_names)
|
||||
csv = [row for row in reader]
|
||||
# drop all columns except load_columns
|
||||
df = df[load_columns]
|
||||
csv = [{k: v for k, v in row.items() if k in load_columns} for row in csv]
|
||||
# distinct zone_names
|
||||
df = df.drop_duplicates(subset=["zone_name"])
|
||||
seen = set()
|
||||
unique_csv = []
|
||||
for row in csv:
|
||||
if row["zone_name"] not in seen:
|
||||
seen.add(row["zone_name"])
|
||||
unique_csv.append(row)
|
||||
csv = unique_csv
|
||||
|
||||
# reset index
|
||||
df = df.reset_index(drop=True)
|
||||
|
||||
return df
|
||||
return csv
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
# read csv with pandas
|
||||
df_file = load_tz_file()
|
||||
# read csv file and load timezones and countries
|
||||
csv = load_tz_file()
|
||||
|
||||
# split zone_name into components by "/"
|
||||
df_file[['region', 'city']] = df_file['zone_name'].str.split(
|
||||
'/', expand=True, n=1)
|
||||
for row in csv:
|
||||
parts = row["zone_name"].split("/", 1)
|
||||
row["region"] = parts[0]
|
||||
row["city"] = parts[1] if len(parts) > 1 else None
|
||||
# drop regions with no country_code (like Etc, GMT, etc)
|
||||
df_file = df_file[df_file['country_code'].notna()]
|
||||
csv = [row for row in csv if row["country_code"]]
|
||||
|
||||
# get all timezones from pytz and split into region and city
|
||||
|
||||
tz = [{"zone_name": tz} for tz in pytz.all_timezones]
|
||||
|
||||
df_tz = pd.DataFrame(pytz.all_timezones)
|
||||
# rename column to zone_name
|
||||
df_tz = df_tz.rename(columns={0: 'zone_name'})
|
||||
# split zone_name into components by "/"
|
||||
df_tz[['region', 'city']] = df_tz['zone_name'].str.split(
|
||||
'/', expand=True, n=1)
|
||||
for row in tz:
|
||||
parts = row["zone_name"].split("/", 1)
|
||||
row["region"] = parts[0]
|
||||
row["city"] = parts[1] if len(parts) > 1 else None
|
||||
# drop regions with no city (like UTC, GMT, etc)
|
||||
df_tz = df_tz[df_tz['city'].notna()]
|
||||
tz = [row for row in tz if row["city"]]
|
||||
# drop rows where region is 'Etc'
|
||||
df_tz = df_tz[df_tz['region'] != 'Etc']
|
||||
tz = [row for row in tz if row["region"] != "Etc"]
|
||||
|
||||
# join data on region and city
|
||||
timezones = []
|
||||
for tz_row in tz:
|
||||
for csv_row in csv:
|
||||
if tz_row["region"] == csv_row["region"] and tz_row["city"] == csv_row["city"]:
|
||||
timezones.append({
|
||||
"zone_name": tz_row["zone_name"],
|
||||
"country_code": csv_row["country_code"],
|
||||
"region": tz_row["region"],
|
||||
"city": tz_row["city"],
|
||||
})
|
||||
break
|
||||
|
||||
# join dataframes on region and city
|
||||
df_merged = pd.merge(df_file, df_tz, on=[
|
||||
'region', 'city'], how='inner', indicator=True)
|
||||
# reorder columns
|
||||
df_merged = df_merged[['region', 'city', 'country_code']]
|
||||
# print merged dataframe
|
||||
print(f"Merged timezones: {len(df_merged)}")
|
||||
print(df_merged.sample(20).to_string(index=False))
|
||||
regions = df_merged['region'].unique()
|
||||
timezones = [{k: row[k] for k in ['region', 'city', 'country_code']}
|
||||
for row in timezones]
|
||||
|
||||
# print merged data
|
||||
print(f"Merged timezones: {len(timezones)}")
|
||||
print(timezones[:20])
|
||||
regions = set(row['region'] for row in timezones)
|
||||
for region in regions:
|
||||
df_region = df_merged[df_merged['region'] == region]
|
||||
df_region = [row for row in timezones if row['region'] == region]
|
||||
print(f"{len(df_region)} merged in {region}")
|
||||
|
||||
|
||||
|
||||
@@ -72,6 +72,8 @@ def test_admin_get_renders_template_form(monkeypatch):
|
||||
assert "Admin: templates" in body
|
||||
assert "name='420__text'" in body
|
||||
assert "name='420__color'" in body
|
||||
assert "name='420__color_picker'" in body
|
||||
assert "type='color'" in body
|
||||
assert "name='420__image_url'" in body
|
||||
|
||||
|
||||
|
||||
@@ -20,6 +20,9 @@ def test_save_and_load_templates_roundtrip(tmp_path):
|
||||
}
|
||||
save_templates(path, data)
|
||||
|
||||
raw = path.read_text(encoding="utf-8")
|
||||
assert '"color": "#00007B"' in raw
|
||||
|
||||
loaded = load_templates(path)
|
||||
assert loaded["420"]["text"] == "Custom"
|
||||
assert loaded["420"]["color"] == 123
|
||||
|
||||
Reference in New Issue
Block a user