Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f88f60a019 | |||
| 565c4078bb | |||
| 8f8c3655db | |||
| 584231b0df |
+17
-2
@@ -32,6 +32,17 @@ def get_html_template(content) -> str:
|
|||||||
return HTML_TEMPLATE.format(content=content)
|
return HTML_TEMPLATE.format(content=content)
|
||||||
|
|
||||||
|
|
||||||
|
def _as_hex_color(value: int | str | None) -> str:
|
||||||
|
if isinstance(value, int):
|
||||||
|
return f"#{value:06X}"
|
||||||
|
if isinstance(value, str):
|
||||||
|
try:
|
||||||
|
return f"#{parse_color(value):06X}"
|
||||||
|
except ValueError:
|
||||||
|
return "#000000"
|
||||||
|
return "#000000"
|
||||||
|
|
||||||
|
|
||||||
def create_app(
|
def create_app(
|
||||||
*,
|
*,
|
||||||
get_state: Callable[[], dict],
|
get_state: Callable[[], dict],
|
||||||
@@ -77,13 +88,14 @@ def create_app(
|
|||||||
blocks = []
|
blocks = []
|
||||||
for key, tpl in templates.items():
|
for key, tpl in templates.items():
|
||||||
text = (tpl.get("text") or "").replace("'", "'")
|
text = (tpl.get("text") or "").replace("'", "'")
|
||||||
color = tpl.get("color")
|
color_hex = _as_hex_color(tpl.get("color"))
|
||||||
image_url = tpl.get("image_url") or ""
|
image_url = tpl.get("image_url") or ""
|
||||||
blocks.append(
|
blocks.append(
|
||||||
"<fieldset style='margin-bottom:16px;'>"
|
"<fieldset style='margin-bottom:16px;'>"
|
||||||
f"<legend><strong>{key}</strong></legend>"
|
f"<legend><strong>{key}</strong></legend>"
|
||||||
f"<label>Text<br><textarea name='{key}__text' rows='3' style='width:100%'>{text}</textarea></label><br>"
|
f"<label>Text<br><textarea name='{key}__text' rows='3' style='width:100%'>{text}</textarea></label><br>"
|
||||||
f"<label>Color<br><input name='{key}__color' value='{color}' style='width:200px'></label><br>"
|
f"<label>Color<br><input type='color' name='{key}__color_picker' value='{color_hex}' oninput=\"this.form['{key}__color'].value=this.value\"></label><br>"
|
||||||
|
f"<label>Color value<br><input name='{key}__color' value='{color_hex}' style='width:200px'></label><br>"
|
||||||
f"<label>Image URL (optional)<br><input name='{key}__image_url' value='{image_url}' style='width:100%'></label>"
|
f"<label>Image URL (optional)<br><input name='{key}__image_url' value='{image_url}' style='width:100%'></label>"
|
||||||
"</fieldset>"
|
"</fieldset>"
|
||||||
)
|
)
|
||||||
@@ -108,6 +120,9 @@ def create_app(
|
|||||||
for key in current.keys():
|
for key in current.keys():
|
||||||
text = request.form.get(f"{key}__text", "").strip()
|
text = request.form.get(f"{key}__text", "").strip()
|
||||||
color_raw = request.form.get(f"{key}__color", "").strip()
|
color_raw = request.form.get(f"{key}__color", "").strip()
|
||||||
|
if not color_raw:
|
||||||
|
color_raw = request.form.get(
|
||||||
|
f"{key}__color_picker", "").strip()
|
||||||
image_url = request.form.get(f"{key}__image_url", "").strip()
|
image_url = request.form.get(f"{key}__image_url", "").strip()
|
||||||
|
|
||||||
if not text:
|
if not text:
|
||||||
|
|||||||
@@ -71,6 +71,45 @@ def get_state_snapshot() -> dict:
|
|||||||
return dict(STATE)
|
return dict(STATE)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_420(tz_list: list[str] | None = None) -> str:
|
||||||
|
cache = get_tzdb_cache()
|
||||||
|
timezones = load_timezones() if cache is None else []
|
||||||
|
countries = load_countries() if cache is None else []
|
||||||
|
if tz_list is None:
|
||||||
|
if cache is None:
|
||||||
|
tz_list = where_is_it_420(timezones, countries)
|
||||||
|
else:
|
||||||
|
tz_list = where_is_it_420(
|
||||||
|
timezones,
|
||||||
|
countries,
|
||||||
|
tz_names=cache.get("tz_names"),
|
||||||
|
tz_to_country_code=cache.get("tz_to_country_code"),
|
||||||
|
country_code_to_name=cache.get("country_code_to_name"),
|
||||||
|
)
|
||||||
|
if tz_list:
|
||||||
|
tz_str = "\n".join(tz_list)
|
||||||
|
return f"\nIt's 4:20 in:\n{tz_str}"
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def _update_420_cache() -> None:
|
||||||
|
try:
|
||||||
|
cache = get_tzdb_cache()
|
||||||
|
if cache is None:
|
||||||
|
tz_list = where_is_it_420(load_timezones(), load_countries())
|
||||||
|
else:
|
||||||
|
tz_list = where_is_it_420(
|
||||||
|
[],
|
||||||
|
[],
|
||||||
|
tz_names=cache.get("tz_names"),
|
||||||
|
tz_to_country_code=cache.get("tz_to_country_code"),
|
||||||
|
country_code_to_name=cache.get("country_code_to_name"),
|
||||||
|
)
|
||||||
|
_update_state(last_locations=tz_list or [])
|
||||||
|
except Exception as e:
|
||||||
|
_update_state(last_locations=[], last_error=str(e))
|
||||||
|
|
||||||
|
|
||||||
def get_next_scheduled_event(now: datetime | None = None) -> dict:
|
def get_next_scheduled_event(now: datetime | None = None) -> dict:
|
||||||
"""Return the next scheduled notification time/type based on known minute marks."""
|
"""Return the next scheduled notification time/type based on known minute marks."""
|
||||||
if now is None:
|
if now is None:
|
||||||
@@ -98,9 +137,6 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
|
|||||||
"""
|
"""
|
||||||
templates_path = os.getenv("TEMPLATES_PATH", "templates.json")
|
templates_path = os.getenv("TEMPLATES_PATH", "templates.json")
|
||||||
messages = load_templates(templates_path)
|
messages = load_templates(templates_path)
|
||||||
cache = get_tzdb_cache()
|
|
||||||
timezones = load_timezones() if cache is None else []
|
|
||||||
countries = load_countries() if cache is None else []
|
|
||||||
if type in messages:
|
if type in messages:
|
||||||
msg = messages[type]
|
msg = messages[type]
|
||||||
image_url = msg.get("image_url")
|
image_url = msg.get("image_url")
|
||||||
@@ -108,20 +144,7 @@ def create_embed(type: str, tz_list: list[str] | None = None) -> dict:
|
|||||||
msg["image"] = {"url": image_url}
|
msg["image"] = {"url": image_url}
|
||||||
if type == "420":
|
if type == "420":
|
||||||
# Check where it's 4:20
|
# Check where it's 4:20
|
||||||
if tz_list is None:
|
msg["text"] += _check_420(tz_list)
|
||||||
if cache is None:
|
|
||||||
tz_list = where_is_it_420(timezones, countries)
|
|
||||||
else:
|
|
||||||
tz_list = where_is_it_420(
|
|
||||||
timezones,
|
|
||||||
countries,
|
|
||||||
tz_names=cache.get("tz_names"),
|
|
||||||
tz_to_country_code=cache.get("tz_to_country_code"),
|
|
||||||
country_code_to_name=cache.get("country_code_to_name"),
|
|
||||||
)
|
|
||||||
if tz_list:
|
|
||||||
tz_str = "\n".join(tz_list)
|
|
||||||
msg["text"] += f"\nIt's 4:20 in:\n{tz_str}"
|
|
||||||
else:
|
else:
|
||||||
msg = {"text": "Unknown notification type", "color": 0xFF0000}
|
msg = {"text": "Unknown notification type", "color": 0xFF0000}
|
||||||
embed = {
|
embed = {
|
||||||
@@ -159,21 +182,7 @@ def send_notification(message: str) -> None:
|
|||||||
|
|
||||||
tz_list: list[str] | None = None
|
tz_list: list[str] | None = None
|
||||||
if message == "420":
|
if message == "420":
|
||||||
try:
|
_update_420_cache()
|
||||||
cache = get_tzdb_cache()
|
|
||||||
if cache is None:
|
|
||||||
tz_list = where_is_it_420(load_timezones(), load_countries())
|
|
||||||
else:
|
|
||||||
tz_list = where_is_it_420(
|
|
||||||
[],
|
|
||||||
[],
|
|
||||||
tz_names=cache.get("tz_names"),
|
|
||||||
tz_to_country_code=cache.get("tz_to_country_code"),
|
|
||||||
country_code_to_name=cache.get("country_code_to_name"),
|
|
||||||
)
|
|
||||||
_update_state(last_locations=tz_list or [])
|
|
||||||
except Exception as e:
|
|
||||||
_update_state(last_locations=[], last_error=str(e))
|
|
||||||
|
|
||||||
embed = create_embed(message, tz_list=tz_list)
|
embed = create_embed(message, tz_list=tz_list)
|
||||||
data = {"embeds": [embed]}
|
data = {"embeds": [embed]}
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
numpy
|
|
||||||
pandas
|
|
||||||
pytest
|
pytest
|
||||||
python-dotenv
|
python-dotenv
|
||||||
pytz
|
pytz
|
||||||
|
|||||||
+5
-5
@@ -1,24 +1,24 @@
|
|||||||
{
|
{
|
||||||
"420": {
|
"420": {
|
||||||
"color": 3066993,
|
"color": "#2ECC71",
|
||||||
"image_url": "https://copyparty.allucanget.biz/img/weed.png",
|
"image_url": "https://copyparty.allucanget.biz/img/weed.png",
|
||||||
"text": "Blaze it!"
|
"text": "Blaze it!"
|
||||||
},
|
},
|
||||||
"halftime": {
|
"halftime": {
|
||||||
"color": 3066993,
|
"color": "#2ECC71",
|
||||||
"image_url": "https://copyparty.allucanget.biz/img/weed.png",
|
"image_url": "https://copyparty.allucanget.biz/img/weed.png",
|
||||||
"text": "Half-time!"
|
"text": "Half-time!"
|
||||||
},
|
},
|
||||||
"reminder": {
|
"reminder": {
|
||||||
"color": 15105570,
|
"color": "#E67E22",
|
||||||
"text": "This is your 5 minute reminder to 420!"
|
"text": "This is your 5 minute reminder to 420!"
|
||||||
},
|
},
|
||||||
"reminder_halftime": {
|
"reminder_halftime": {
|
||||||
"color": 15105570,
|
"color": "#E67E22",
|
||||||
"text": "Half-time in 5 minutes!"
|
"text": "Half-time in 5 minutes!"
|
||||||
},
|
},
|
||||||
"test": {
|
"test": {
|
||||||
"color": 3447003,
|
"color": "#3498DB",
|
||||||
"text": "This is a test notification."
|
"text": "This is a test notification."
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+12
-1
@@ -49,6 +49,11 @@ def _normalize_templates(raw: dict) -> dict[str, dict]:
|
|||||||
color = incoming.get("color")
|
color = incoming.get("color")
|
||||||
if isinstance(color, int):
|
if isinstance(color, int):
|
||||||
out[key]["color"] = color
|
out[key]["color"] = color
|
||||||
|
elif isinstance(color, str):
|
||||||
|
try:
|
||||||
|
out[key]["color"] = parse_color(color)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
image_url = incoming.get("image_url")
|
image_url = incoming.get("image_url")
|
||||||
if isinstance(image_url, str) and image_url.strip():
|
if isinstance(image_url, str) and image_url.strip():
|
||||||
@@ -74,10 +79,16 @@ def load_templates(path: str | Path) -> dict[str, dict]:
|
|||||||
def save_templates(path: str | Path, templates: dict) -> None:
|
def save_templates(path: str | Path, templates: dict) -> None:
|
||||||
p = Path(path)
|
p = Path(path)
|
||||||
normalized = _normalize_templates(templates)
|
normalized = _normalize_templates(templates)
|
||||||
|
serialized = deepcopy(normalized)
|
||||||
|
|
||||||
|
for tpl in serialized.values():
|
||||||
|
color = tpl.get("color")
|
||||||
|
if isinstance(color, int):
|
||||||
|
tpl["color"] = f"#{color:06X}"
|
||||||
|
|
||||||
p.parent.mkdir(parents=True, exist_ok=True)
|
p.parent.mkdir(parents=True, exist_ok=True)
|
||||||
tmp = p.with_suffix(p.suffix + ".tmp")
|
tmp = p.with_suffix(p.suffix + ".tmp")
|
||||||
tmp.write_text(json.dumps(normalized, indent=2,
|
tmp.write_text(json.dumps(serialized, indent=2,
|
||||||
sort_keys=True) + "\n", encoding="utf-8")
|
sort_keys=True) + "\n", encoding="utf-8")
|
||||||
tmp.replace(p)
|
tmp.replace(p)
|
||||||
|
|
||||||
|
|||||||
+52
-30
@@ -1,6 +1,6 @@
|
|||||||
import pytz
|
import pytz
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import pandas as pd
|
from csv import DictReader
|
||||||
|
|
||||||
|
|
||||||
def get_tz_info(tz_name: str, timezones: list[dict]) -> dict | None:
|
def get_tz_info(tz_name: str, timezones: list[dict]) -> dict | None:
|
||||||
@@ -41,52 +41,74 @@ def load_tz_file():
|
|||||||
"abbreviation", "time_start", "gmt_offset", "dst"]
|
"abbreviation", "time_start", "gmt_offset", "dst"]
|
||||||
# columns to load
|
# columns to load
|
||||||
load_columns = ["zone_name", "country_code"]
|
load_columns = ["zone_name", "country_code"]
|
||||||
# read csv with pandas
|
# read csv
|
||||||
df = pd.read_csv(timezone_file, names=timezone_names)
|
with open(timezone_file, newline='') as csvfile:
|
||||||
|
reader = DictReader(csvfile, fieldnames=timezone_names)
|
||||||
|
csv = [row for row in reader]
|
||||||
# drop all columns except load_columns
|
# drop all columns except load_columns
|
||||||
df = df[load_columns]
|
csv = [{k: v for k, v in row.items() if k in load_columns} for row in csv]
|
||||||
# distinct zone_names
|
# distinct zone_names
|
||||||
df = df.drop_duplicates(subset=["zone_name"])
|
seen = set()
|
||||||
|
unique_csv = []
|
||||||
|
for row in csv:
|
||||||
|
if row["zone_name"] not in seen:
|
||||||
|
seen.add(row["zone_name"])
|
||||||
|
unique_csv.append(row)
|
||||||
|
csv = unique_csv
|
||||||
|
|
||||||
# reset index
|
return csv
|
||||||
df = df.reset_index(drop=True)
|
|
||||||
|
|
||||||
return df
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
|
||||||
# read csv with pandas
|
# read csv file and load timezones and countries
|
||||||
df_file = load_tz_file()
|
csv = load_tz_file()
|
||||||
|
|
||||||
# split zone_name into components by "/"
|
# split zone_name into components by "/"
|
||||||
df_file[['region', 'city']] = df_file['zone_name'].str.split(
|
for row in csv:
|
||||||
'/', expand=True, n=1)
|
parts = row["zone_name"].split("/", 1)
|
||||||
|
row["region"] = parts[0]
|
||||||
|
row["city"] = parts[1] if len(parts) > 1 else None
|
||||||
# drop regions with no country_code (like Etc, GMT, etc)
|
# drop regions with no country_code (like Etc, GMT, etc)
|
||||||
df_file = df_file[df_file['country_code'].notna()]
|
csv = [row for row in csv if row["country_code"]]
|
||||||
|
|
||||||
|
# get all timezones from pytz and split into region and city
|
||||||
|
|
||||||
|
tz = [{"zone_name": tz} for tz in pytz.all_timezones]
|
||||||
|
|
||||||
df_tz = pd.DataFrame(pytz.all_timezones)
|
|
||||||
# rename column to zone_name
|
|
||||||
df_tz = df_tz.rename(columns={0: 'zone_name'})
|
|
||||||
# split zone_name into components by "/"
|
# split zone_name into components by "/"
|
||||||
df_tz[['region', 'city']] = df_tz['zone_name'].str.split(
|
for row in tz:
|
||||||
'/', expand=True, n=1)
|
parts = row["zone_name"].split("/", 1)
|
||||||
|
row["region"] = parts[0]
|
||||||
|
row["city"] = parts[1] if len(parts) > 1 else None
|
||||||
# drop regions with no city (like UTC, GMT, etc)
|
# drop regions with no city (like UTC, GMT, etc)
|
||||||
df_tz = df_tz[df_tz['city'].notna()]
|
tz = [row for row in tz if row["city"]]
|
||||||
# drop rows where region is 'Etc'
|
# drop rows where region is 'Etc'
|
||||||
df_tz = df_tz[df_tz['region'] != 'Etc']
|
tz = [row for row in tz if row["region"] != "Etc"]
|
||||||
|
|
||||||
|
# join data on region and city
|
||||||
|
timezones = []
|
||||||
|
for tz_row in tz:
|
||||||
|
for csv_row in csv:
|
||||||
|
if tz_row["region"] == csv_row["region"] and tz_row["city"] == csv_row["city"]:
|
||||||
|
timezones.append({
|
||||||
|
"zone_name": tz_row["zone_name"],
|
||||||
|
"country_code": csv_row["country_code"],
|
||||||
|
"region": tz_row["region"],
|
||||||
|
"city": tz_row["city"],
|
||||||
|
})
|
||||||
|
break
|
||||||
|
|
||||||
# join dataframes on region and city
|
|
||||||
df_merged = pd.merge(df_file, df_tz, on=[
|
|
||||||
'region', 'city'], how='inner', indicator=True)
|
|
||||||
# reorder columns
|
# reorder columns
|
||||||
df_merged = df_merged[['region', 'city', 'country_code']]
|
timezones = [{k: row[k] for k in ['region', 'city', 'country_code']}
|
||||||
# print merged dataframe
|
for row in timezones]
|
||||||
print(f"Merged timezones: {len(df_merged)}")
|
|
||||||
print(df_merged.sample(20).to_string(index=False))
|
# print merged data
|
||||||
regions = df_merged['region'].unique()
|
print(f"Merged timezones: {len(timezones)}")
|
||||||
|
print(timezones[:20])
|
||||||
|
regions = set(row['region'] for row in timezones)
|
||||||
for region in regions:
|
for region in regions:
|
||||||
df_region = df_merged[df_merged['region'] == region]
|
df_region = [row for row in timezones if row['region'] == region]
|
||||||
print(f"{len(df_region)} merged in {region}")
|
print(f"{len(df_region)} merged in {region}")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -72,6 +72,8 @@ def test_admin_get_renders_template_form(monkeypatch):
|
|||||||
assert "Admin: templates" in body
|
assert "Admin: templates" in body
|
||||||
assert "name='420__text'" in body
|
assert "name='420__text'" in body
|
||||||
assert "name='420__color'" in body
|
assert "name='420__color'" in body
|
||||||
|
assert "name='420__color_picker'" in body
|
||||||
|
assert "type='color'" in body
|
||||||
assert "name='420__image_url'" in body
|
assert "name='420__image_url'" in body
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,9 @@ def test_save_and_load_templates_roundtrip(tmp_path):
|
|||||||
}
|
}
|
||||||
save_templates(path, data)
|
save_templates(path, data)
|
||||||
|
|
||||||
|
raw = path.read_text(encoding="utf-8")
|
||||||
|
assert '"color": "#00007B"' in raw
|
||||||
|
|
||||||
loaded = load_templates(path)
|
loaded = load_templates(path)
|
||||||
assert loaded["420"]["text"] == "Custom"
|
assert loaded["420"]["text"] == "Custom"
|
||||||
assert loaded["420"]["color"] == 123
|
assert loaded["420"]["color"] == 123
|
||||||
|
|||||||
Reference in New Issue
Block a user