mqtt.py
  1  #!/usr/bin/env python3
  2  
  3  import json
  4  import logging
  5  import threading
  6  
  7  import paho.mqtt.client as mqtt
  8  
  9  from telemetry.utils import get_mqtt_config
 10  
 11  _LOGGER = logging.getLogger(__name__)
 12  
 13  (
 14      B4MAD_RACING_MQTT_HOST,
 15      B4MAD_RACING_MQTT_PORT,
 16      B4MAD_RACING_MQTT_USER,
 17      B4MAD_RACING_MQTT_PASSWORD,
 18  ) = get_mqtt_config()
 19  
 20  
 21  class Mqtt:
 22      def __init__(self, observer, topic, replay: bool = False, debug=False):
 23          mqttc = mqtt.Client()
 24          mqttc.on_message = self.on_message
 25          mqttc.on_connect = self.on_connect
 26          mqttc.on_publish = self.on_publish
 27          mqttc.on_subscribe = self.on_subscribe
 28          mqttc.username_pw_set(B4MAD_RACING_MQTT_USER, B4MAD_RACING_MQTT_PASSWORD)
 29          self.mqttc = mqttc
 30          self.do_disconnect = False
 31          self.replay = replay
 32          self.topic = topic
 33          self.observer = observer
 34          self._stop_event = threading.Event()
 35          self.ready = False
 36          self.debug = debug
 37  
 38      # def __del__(self):
 39      #     # disconnect from broker
 40  
 41      def disconnect(self):
 42          self.mqttc.disconnect()
 43  
 44      def stop(self):
 45          self._stop_event.set()
 46  
 47      def stopped(self):
 48          return self._stop_event.is_set()
 49  
 50      def on_message(self, mqttc, obj, msg):
 51          """Handle incoming messages, we are only interested in the telemetry.
 52  
 53          Args:
 54              mqttc (_type_): the mqtt client
 55              obj (_type_): the userdata
 56              msg (_type_): the message received
 57          """
 58          # _LOGGER.debug(
 59          #     "%s: qos='%s',payload='%s'", msg.topic, str(msg.qos), str(msg.payload)
 60          # )
 61  
 62          # if self.do_disconnect:
 63          #     _LOGGER.debug("stopping MQTT")
 64          #     mqttc.disconnect()
 65  
 66          if self.stopped():
 67              self.mqttc.disconnect()
 68  
 69          topic = msg.topic
 70          if self.replay:
 71              # remove replay/ prefix from session
 72              topic = topic[7:]
 73  
 74          try:
 75              payload = json.loads(msg.payload.decode("utf-8")).get("telemetry")
 76          except Exception as e:
 77              logging.error("Error decoding payload: %s", e)
 78              return
 79  
 80          response = self.observer.notify(topic, payload)
 81          if response:
 82              (r_topic, r_payload) = response
 83              payloads = r_payload
 84              if not isinstance(r_payload, list):
 85                  payloads = [r_payload]
 86  
 87              for r_payload in payloads:
 88                  meters = payload.get("DistanceRoundTrack", 0)
 89                  logging.debug("r-->: %s: %s : %s", meters, r_topic, r_payload)
 90                  mqttc.publish(r_topic, r_payload)
 91  
 92      def on_connect(self, mqttc, obj, flags, rc):
 93          _LOGGER.debug("on_connect rc: %s", str(rc))
 94          if rc == mqtt.MQTT_ERR_SUCCESS:
 95              self.ready = True
 96  
 97      def on_publish(self, mqttc, obj, mid):
 98          # _LOGGER.debug("mid: %s", str(mid))
 99          pass
100  
101      def on_subscribe(self, mqttc, obj, mid, granted_qos):
102          _LOGGER.debug("subscribed: mid='%s', granted_qos='%s'", str(mid), str(granted_qos))
103  
104      def on_log(self, mqttc, obj, level, string):
105          # print(string)
106          pass
107  
108      def run(self):
109          self.mqttc.connect(B4MAD_RACING_MQTT_HOST, B4MAD_RACING_MQTT_PORT, 60)
110          # topic = f"crewchief/{self.driver}/#"
111          if self.replay:
112              self.topic = f"replay/{self.topic}"
113  
114          s = self.mqttc.subscribe(self.topic, 0)
115          if s[0] == mqtt.MQTT_ERR_SUCCESS:
116              _LOGGER.info(f"Subscribed to {self.topic}")
117              self.mqttc.loop_forever()
118          else:
119              _LOGGER.error(f"Failed to subscribe to {self.topic}")
120              exit(1)
121  
122  
123  # if __name__ == "__main__":
124  #     _LOGGER.info("Starting MQTT client")
125  
126  #     history = History()
127  #     coach = Coach(history)
128  
129  #     m = Mqtt(coach)
130  #     m.run()