/ CrypticStatusBot / cryptic_status.py
cryptic_status.py
  1  import datetime
  2  import json
  3  import os
  4  import re
  5  import time
  6  import sentry_sdk
  7  from typing import Optional, List, Set
  8  
  9  from discord import Client, Message, TextChannel, Embed, Color
 10  from discord.ext import tasks
 11  from discord.ext.tasks import Loop
 12  from sentry_sdk.integrations.aiohttp import AioHttpIntegration
 13  
 14  from cryptic_client import CrypticClient
 15  from server import Server
 16  
 17  VERSION = "1.2"
 18  
 19  sentry_dsn = os.environ.get("SENTRY_DSN")
 20  if sentry_dsn:
 21      sentry_sdk.init(
 22          dsn=sentry_dsn,
 23          attach_stacktrace=True,
 24          shutdown_timeout=5,
 25          integrations=[AioHttpIntegration()],
 26          release=f"crypticstatusbot@{VERSION}",
 27      )
 28  
 29  RESULT = [":x: Down", ":white_check_mark: Running"]
 30  CHANNEL_CHAR = ["✘", "✔"]
 31  SPACE = "\u2009" * 2
 32  DOT = "\u00b7"
 33  
 34  config: dict = json.load(open("config.json"))
 35  servers: List[Server] = [Server.deserialize(server) for server in config["servers"]]
 36  
 37  
 38  def validate_config():
 39      occupied_channels: Set[int] = set()
 40      for server in servers:
 41          assert server.channel_id not in occupied_channels, "channel is already occupied"
 42          occupied_channels.add(server.channel_id)
 43  
 44  
 45  def space_channel_name(name: str) -> str:
 46      return name.replace(" ", SPACE)
 47  
 48  
 49  async def fetch_status_message(channel: TextChannel) -> Optional[Message]:
 50      out = None
 51      async for message in channel.history(limit=None):
 52          if out is None and message.author == bot.user:
 53              out = message
 54          else:
 55              await message.delete()
 56      return out
 57  
 58  
 59  validate_config()
 60  
 61  bot: Client = Client()
 62  
 63  
 64  async def microservice_status(server: Server, ms_running: bool, ms: str):
 65      if ms in server.ms_down:
 66          since, message_id = server.ms_down[ms]
 67          time_passed: float = time.time() - since
 68          channel: TextChannel = bot.get_channel(server.channel_id)
 69          if ms_running:
 70              server.ms_down.pop(ms)
 71              message: Optional[Message] = await channel.fetch_message(message_id)
 72              if message is not None:
 73                  await message.delete()
 74          elif time_passed > 120 and message_id is None:
 75              message = await channel.send(
 76                  f":warning: The {[ms + ' microservice', 'java server'][ms == 'server']} seems to be down!"
 77              )
 78              server.ms_down[ms] = since, message.id
 79      elif not ms_running:
 80          server.ms_down[ms] = time.time(), None
 81  
 82  
 83  @bot.event
 84  async def on_ready():
 85      print(f"Logged in as {bot.user}")
 86  
 87      try:
 88          main_loop.start()
 89      except RuntimeError:
 90          main_loop.restart()
 91  
 92  
 93  main_loop: Loop
 94  
 95  
 96  # noinspection PyCallingNonCallable
 97  @tasks.loop(seconds=config["refresh_interval"])
 98  async def main_loop():
 99      for server in servers:
100          # print(server)
101          embed: Embed = Embed(title=f"**{server.title} - Microservice Status**", description=f"Server: {server.socket}")
102          if server.frontend is not None:
103              embed.description += f"\nFrontend: {server.frontend}"
104  
105          channel: TextChannel = bot.get_channel(server.channel_id)
106          if channel is None:
107              continue
108  
109          client: CrypticClient = CrypticClient(server)
110  
111          server_running: bool = client.check_java_server()
112          embed.add_field(name="**Java Server**", value=RESULT[server_running], inline=False)
113          await microservice_status(server, server_running, "server")
114  
115          all_up: bool = server_running
116  
117          for expected, microservices in server.microservices.items():
118              for ms in microservices:
119                  ms_running: bool = server_running and client.check_microservice(ms, expected)
120                  embed.add_field(name=f"**cryptic-{ms}**", value=RESULT[ms_running], inline=False)
121                  all_up: bool = all_up and ms_running
122  
123                  if server_running:
124                      await microservice_status(server, ms_running, "cryptic-" + ms)
125  
126          online_count: Optional[int] = None
127          if server_running:
128              online_count: Optional[int] = client.request({"action": "info"}).get("online")
129              if online_count is not None:
130                  embed.description += f"\nOnline Players: {online_count - 1}"
131  
132          client.close()
133  
134          embed.colour = [Color(0xFF0000), Color(0xFFFF00), Color(0x008800)][server_running + all_up]
135          embed.set_footer(text=f"v{VERSION} - Bot by @Defelo#2022")
136          embed.timestamp = datetime.datetime.utcnow()
137  
138          status_message: Optional[Message] = await fetch_status_message(channel)
139          if status_message is None:
140              await channel.send(embed=embed)
141          else:
142              await status_message.edit(content="", embed=embed)
143  
144          old_channel_name: str = re.match(r"^.*?([a-z0-9]*)$", channel.name).group(1)
145          up_indicator: str = CHANNEL_CHAR[all_up]
146          new_channel_name: str = f"{up_indicator} {old_channel_name}"
147          new_channel_topic: str = f"Status of {server.title}"
148          if server_running and online_count is not None:
149              new_channel_topic += f" {DOT} Online Players: {online_count - 1}"
150  
151          await channel.edit(name=space_channel_name(new_channel_name), topic=new_channel_topic)
152  
153  
154  @bot.event
155  async def on_error(*_, **__):
156      if sentry_dsn:
157          sentry_sdk.capture_exception()
158      else:
159          raise
160  
161  
162  bot.run(os.environ["TOKEN"])