/ CrypticStatusBot / cryptic_client.py
cryptic_client.py
1 import json 2 from ssl import SSLCertVerificationError 3 from typing import Optional 4 from uuid import uuid4 as uuid 5 6 from websocket import WebSocket, create_connection, WebSocketException, WebSocketTimeoutException 7 8 from server import Server 9 10 11 class CrypticClient: 12 def __init__(self, server: Server): 13 self.server: Server = server 14 try: 15 self.ws: WebSocket = create_connection(server.socket) 16 except (ConnectionRefusedError, ConnectionResetError, WebSocketException, SSLCertVerificationError): 17 self.ws: Optional[WebSocket] = None 18 else: 19 self.ws.settimeout(5) 20 21 def request(self, data: dict) -> Optional[dict]: 22 for _ in range(3): 23 self.ws.send(json.dumps(data)) 24 try: 25 return json.loads(self.ws.recv()) 26 except WebSocketTimeoutException: 27 pass 28 29 def check_java_server(self) -> bool: 30 if self.ws is None: 31 return False 32 response: Optional[dict] = self.request( 33 {"action": "login", "name": self.server.username, "password": self.server.password} 34 ) 35 return response is not None and "token" in response 36 37 def check_microservice(self, ms: str, expected: str) -> bool: 38 response: Optional[dict] = self.request({"ms": ms, "endpoint": [], "data": {}, "tag": str(uuid())}) 39 # print(ms, expected, response) 40 return response is not None and response.get("data", {}).get("error") == expected 41 42 def close(self): 43 if self.ws is not None: 44 self.ws.send(json.dumps({"action": "logout"})) 45 self.ws.close()