/ components / paddock / telemetry / pitcrew / coach_watcher.py
coach_watcher.py
  1  import logging
  2  import threading
  3  import time
  4  
  5  from telemetry.models import Coach, Driver
  6  
  7  from .coach import Coach as PitCrewCoach
  8  from .coach_app import CoachApp
  9  from .coach_copilots import CoachCopilots
 10  from .history import History
 11  from .mqtt import Mqtt
 12  
 13  
 14  class CoachWatcher:
 15      def __init__(self, firehose, replay=False):
 16          self.firehose = firehose
 17          self.sleep_time = 3
 18          self.active_coaches = {}
 19          self.replay = replay
 20          self.ready = False
 21  
 22          self._stop_event = threading.Event()
 23  
 24      def stop(self):
 25          self._stop_event.set()
 26  
 27      def stopped(self):
 28          return self._stop_event.is_set()
 29  
 30      def drivers(self):
 31          drivers = set()
 32          for session in self.firehose.sessions.values():
 33              # check if session.driver is a Driver object
 34              if isinstance(session.driver, Driver):
 35                  drivers.add(session.driver)
 36          return drivers
 37  
 38      def watch_coaches(self):
 39          while True and not self.stopped():
 40              self.check_active_coaches()
 41              # sleep longer than save_sessions, to make sure all DB objects are initialized
 42              drivers = self.drivers()
 43              # collect all driver names
 44              # driver_names = [driver.name for driver in drivers]
 45              # logging.info("checking coaches for drivers: %s", ", ".join(driver_names))
 46              coaches = Coach.objects.filter(driver__in=drivers)
 47              for coach in coaches:
 48                  # logging.info(f"{coach.driver} coach enabled: {coach.enabled}")
 49                  if coach.enabled:
 50                      if coach.driver.name not in self.active_coaches.keys():
 51                          logging.debug(f"activating coach for {coach.driver}")
 52                          self.start_coach(coach.driver.name, coach)
 53                  else:
 54                      if coach.driver.name in self.active_coaches.keys():
 55                          logging.debug(f"deactivating coach for {coach.driver}")
 56                          self.stop_coach(coach.driver.name)
 57              time.sleep(self.sleep_time)
 58              self.ready = True
 59  
 60      def stop_coach(self, driver_name):
 61          if driver_name not in self.active_coaches.keys():
 62              return
 63          logging.info(f"disconnectin MQTT thread for {driver_name}")
 64          self.active_coaches[driver_name][0].disconnect()
 65          logging.info(f"disconnecting History thread for {driver_name}")
 66          self.active_coaches[driver_name][1].disconnect()
 67          del self.active_coaches[driver_name]
 68  
 69      def start_coach(self, driver_name, coach_model, debug=False):
 70          history = History()
 71          if coach_model.mode == Coach.MODE_TRACK_GUIDE_APP or coach_model.mode == Coach.MODE_DEBUG_APP:
 72              coach = CoachApp(history, coach_model, debug=debug)
 73          elif coach_model.mode == Coach.MODE_COPILOTS:
 74              coach = CoachCopilots(history, coach_model, debug=debug)
 75          else:
 76              coach = PitCrewCoach(history, coach_model, debug=debug)
 77  
 78          topic = f"crewchief/{driver_name}/#"
 79          mqtt = Mqtt(coach, topic, replay=self.replay, debug=debug)
 80  
 81          def history_thread():
 82              logging.info(f"History thread starting for {driver_name}")
 83              history.run()
 84              logging.info(f"History thread stopped for {driver_name}")
 85  
 86          h = threading.Thread(target=history_thread)
 87          h.name = f"history-{driver_name}"
 88  
 89          def mqtt_thread():
 90              logging.info(f"MQTT thread starting for {driver_name}")
 91              mqtt.run()
 92              logging.info(f"MQTT thread stopped for {driver_name}")
 93  
 94          c = threading.Thread(target=mqtt_thread)
 95          c.name = f"mqtt-{driver_name}"
 96  
 97          threads = list()
 98          threads.append(h)
 99          threads.append(c)
100          c.start()
101          h.start()
102          self.active_coaches[driver_name] = [history, mqtt, threads]
103  
104      def check_active_coaches(self):
105          dead_drivers = set()
106          for driver_name in self.active_coaches.keys():
107              # history = self.active_coaches[driver_name][0]
108              # mqtt = self.active_coaches[driver_name][1]
109              threads = self.active_coaches[driver_name][2]
110  
111              for t in threads:
112                  if not t.is_alive():
113                      self.stop()
114                      logging.error(f"Thread {t} died for {driver_name}")
115                      dead_drivers.add(driver_name)
116                      break
117  
118          for driver_name in dead_drivers:
119              self.stop_coach(driver_name)
120  
121      def run(self):
122          try:
123              self.watch_coaches()
124          except Exception as e:
125              logging.exception(f"Exception in CoachWatcher: {e}")
126              raise e
127          finally:
128              # stop all coaches
129              coaches = list(self.active_coaches.keys())
130              for driver in coaches:
131                  self.stop_coach(driver)
132  
133              logging.info("CoachWatcher stopped")