bot.py
1 import time 2 from os import environ 3 4 import requests 5 from redis import Redis 6 7 from covid_api import get_district 8 9 TOKEN = environ["TOKEN"] 10 REDIS_HOST = environ["REDIS_HOST"] 11 CHANNEL = environ["CHANNEL"] 12 DISTRICT = environ["DISTRICT"] 13 14 redis = Redis(host=REDIS_HOST) 15 16 17 def send_message(text: str): 18 requests.get( 19 f"https://api.telegram.org/bot{TOKEN}/sendMessage", 20 params={"chat_id": CHANNEL, "text": text, "parse_mode": "Markdown"}, 21 ) 22 23 24 def update(ts: float) -> bool: 25 if ts <= float(redis.get("last_update") or 0): 26 return False 27 redis.set("last_update", ts) 28 return True 29 30 31 def update_value(district: dict, key: str, precision: int = 0): 32 last = redis.get("last_val_" + key) 33 if last: 34 last = round(float(last.decode()), precision or None) 35 36 value = district[key] 37 redis.set("last_val_" + key, value) 38 value = round(value, precision or None) 39 40 if last is not None and last != value: 41 diff = round(value - last, precision or None) 42 return f"{value} ({'+' * (value > last)}{diff})" 43 44 return str(value) 45 46 47 def callback(): 48 last_update, district = get_district(DISTRICT) 49 if not update(last_update.timestamp()): 50 return 51 52 text = [ 53 f"*{district['name']}*\n", 54 f"Fälle: {update_value(district, 'count')}", 55 f"Fälle/100k EW: {update_value(district, 'casesPer100k', 2)}", 56 f"Fälle letzte 7d/100k EW: {update_value(district, 'weekIncidence', 2)}", 57 f"Todesfälle: {update_value(district, 'deaths')}", 58 f"\nStand: {last_update.strftime('%d.%m.%Y, %H:%M Uhr')}", 59 ] 60 send_message(text="\n".join(text)) 61 62 63 while True: 64 callback() 65 time.sleep(60)