/ CrypticStatusBot / cryptic_client.py
cryptic_client.py
 1  import json
 2  from ssl import SSLCertVerificationError
 3  from typing import Optional
 4  from uuid import uuid4 as uuid
 5  
 6  from websocket import WebSocket, create_connection, WebSocketException, WebSocketTimeoutException
 7  
 8  from server import Server
 9  
10  
11  class CrypticClient:
12      def __init__(self, server: Server):
13          self.server: Server = server
14          try:
15              self.ws: WebSocket = create_connection(server.socket)
16          except (ConnectionRefusedError, ConnectionResetError, WebSocketException, SSLCertVerificationError):
17              self.ws: Optional[WebSocket] = None
18          else:
19              self.ws.settimeout(5)
20  
21      def request(self, data: dict) -> Optional[dict]:
22          for _ in range(3):
23              self.ws.send(json.dumps(data))
24              try:
25                  return json.loads(self.ws.recv())
26              except WebSocketTimeoutException:
27                  pass
28  
29      def check_java_server(self) -> bool:
30          if self.ws is None:
31              return False
32          response: Optional[dict] = self.request(
33              {"action": "login", "name": self.server.username, "password": self.server.password}
34          )
35          return response is not None and "token" in response
36  
37      def check_microservice(self, ms: str, expected: str) -> bool:
38          response: Optional[dict] = self.request({"ms": ms, "endpoint": [], "data": {}, "tag": str(uuid())})
39          # print(ms, expected, response)
40          return response is not None and response.get("data", {}).get("error") == expected
41  
42      def close(self):
43          if self.ws is not None:
44              self.ws.send(json.dumps({"action": "logout"}))
45              self.ws.close()