mqtt.py
1 #!/usr/bin/env python3 2 3 import json 4 import logging 5 import threading 6 7 import paho.mqtt.client as mqtt 8 9 from telemetry.utils import get_mqtt_config 10 11 _LOGGER = logging.getLogger(__name__) 12 13 ( 14 B4MAD_RACING_MQTT_HOST, 15 B4MAD_RACING_MQTT_PORT, 16 B4MAD_RACING_MQTT_USER, 17 B4MAD_RACING_MQTT_PASSWORD, 18 ) = get_mqtt_config() 19 20 21 class Mqtt: 22 def __init__(self, observer, topic, replay: bool = False, debug=False): 23 mqttc = mqtt.Client() 24 mqttc.on_message = self.on_message 25 mqttc.on_connect = self.on_connect 26 mqttc.on_publish = self.on_publish 27 mqttc.on_subscribe = self.on_subscribe 28 mqttc.username_pw_set(B4MAD_RACING_MQTT_USER, B4MAD_RACING_MQTT_PASSWORD) 29 self.mqttc = mqttc 30 self.do_disconnect = False 31 self.replay = replay 32 self.topic = topic 33 self.observer = observer 34 self._stop_event = threading.Event() 35 self.ready = False 36 self.debug = debug 37 38 # def __del__(self): 39 # # disconnect from broker 40 41 def disconnect(self): 42 self.mqttc.disconnect() 43 44 def stop(self): 45 self._stop_event.set() 46 47 def stopped(self): 48 return self._stop_event.is_set() 49 50 def on_message(self, mqttc, obj, msg): 51 """Handle incoming messages, we are only interested in the telemetry. 52 53 Args: 54 mqttc (_type_): the mqtt client 55 obj (_type_): the userdata 56 msg (_type_): the message received 57 """ 58 # _LOGGER.debug( 59 # "%s: qos='%s',payload='%s'", msg.topic, str(msg.qos), str(msg.payload) 60 # ) 61 62 # if self.do_disconnect: 63 # _LOGGER.debug("stopping MQTT") 64 # mqttc.disconnect() 65 66 if self.stopped(): 67 self.mqttc.disconnect() 68 69 topic = msg.topic 70 if self.replay: 71 # remove replay/ prefix from session 72 topic = topic[7:] 73 74 try: 75 payload = json.loads(msg.payload.decode("utf-8")).get("telemetry") 76 except Exception as e: 77 logging.error("Error decoding payload: %s", e) 78 return 79 80 response = self.observer.notify(topic, payload) 81 if response: 82 (r_topic, r_payload) = response 83 payloads = r_payload 84 if not isinstance(r_payload, list): 85 payloads = [r_payload] 86 87 for r_payload in payloads: 88 meters = payload.get("DistanceRoundTrack", 0) 89 logging.debug("r-->: %s: %s : %s", meters, r_topic, r_payload) 90 mqttc.publish(r_topic, r_payload) 91 92 def on_connect(self, mqttc, obj, flags, rc): 93 _LOGGER.debug("on_connect rc: %s", str(rc)) 94 if rc == mqtt.MQTT_ERR_SUCCESS: 95 self.ready = True 96 97 def on_publish(self, mqttc, obj, mid): 98 # _LOGGER.debug("mid: %s", str(mid)) 99 pass 100 101 def on_subscribe(self, mqttc, obj, mid, granted_qos): 102 _LOGGER.debug("subscribed: mid='%s', granted_qos='%s'", str(mid), str(granted_qos)) 103 104 def on_log(self, mqttc, obj, level, string): 105 # print(string) 106 pass 107 108 def run(self): 109 self.mqttc.connect(B4MAD_RACING_MQTT_HOST, B4MAD_RACING_MQTT_PORT, 60) 110 # topic = f"crewchief/{self.driver}/#" 111 if self.replay: 112 self.topic = f"replay/{self.topic}" 113 114 s = self.mqttc.subscribe(self.topic, 0) 115 if s[0] == mqtt.MQTT_ERR_SUCCESS: 116 _LOGGER.info(f"Subscribed to {self.topic}") 117 self.mqttc.loop_forever() 118 else: 119 _LOGGER.error(f"Failed to subscribe to {self.topic}") 120 exit(1) 121 122 123 # if __name__ == "__main__": 124 # _LOGGER.info("Starting MQTT client") 125 126 # history = History() 127 # coach = Coach(history) 128 129 # m = Mqtt(coach) 130 # m.run()