bot.py
 1  import time
 2  from os import environ
 3  
 4  import requests
 5  from redis import Redis
 6  
 7  from covid_api import get_district
 8  
 9  TOKEN = environ["TOKEN"]
10  REDIS_HOST = environ["REDIS_HOST"]
11  CHANNEL = environ["CHANNEL"]
12  DISTRICT = environ["DISTRICT"]
13  
14  redis = Redis(host=REDIS_HOST)
15  
16  
17  def send_message(text: str):
18      requests.get(
19          f"https://api.telegram.org/bot{TOKEN}/sendMessage",
20          params={"chat_id": CHANNEL, "text": text, "parse_mode": "Markdown"},
21      )
22  
23  
24  def update(ts: float) -> bool:
25      if ts <= float(redis.get("last_update") or 0):
26          return False
27      redis.set("last_update", ts)
28      return True
29  
30  
31  def update_value(district: dict, key: str, precision: int = 0):
32      last = redis.get("last_val_" + key)
33      if last:
34          last = round(float(last.decode()), precision or None)
35  
36      value = district[key]
37      redis.set("last_val_" + key, value)
38      value = round(value, precision or None)
39  
40      if last is not None and last != value:
41          diff = round(value - last, precision or None)
42          return f"{value} ({'+' * (value > last)}{diff})"
43  
44      return str(value)
45  
46  
47  def callback():
48      last_update, district = get_district(DISTRICT)
49      if not update(last_update.timestamp()):
50          return
51  
52      text = [
53          f"*{district['name']}*\n",
54          f"Fälle: {update_value(district, 'count')}",
55          f"Fälle/100k EW: {update_value(district, 'casesPer100k', 2)}",
56          f"Fälle letzte 7d/100k EW: {update_value(district, 'weekIncidence', 2)}",
57          f"Todesfälle: {update_value(district, 'deaths')}",
58          f"\nStand: {last_update.strftime('%d.%m.%Y, %H:%M Uhr')}",
59      ]
60      send_message(text="\n".join(text))
61  
62  
63  while True:
64      callback()
65      time.sleep(60)