coach_watcher.py
1 import logging 2 import threading 3 import time 4 5 from telemetry.models import Coach, Driver 6 7 from .coach import Coach as PitCrewCoach 8 from .coach_app import CoachApp 9 from .coach_copilots import CoachCopilots 10 from .history import History 11 from .mqtt import Mqtt 12 13 14 class CoachWatcher: 15 def __init__(self, firehose, replay=False): 16 self.firehose = firehose 17 self.sleep_time = 3 18 self.active_coaches = {} 19 self.replay = replay 20 self.ready = False 21 22 self._stop_event = threading.Event() 23 24 def stop(self): 25 self._stop_event.set() 26 27 def stopped(self): 28 return self._stop_event.is_set() 29 30 def drivers(self): 31 drivers = set() 32 for session in self.firehose.sessions.values(): 33 # check if session.driver is a Driver object 34 if isinstance(session.driver, Driver): 35 drivers.add(session.driver) 36 return drivers 37 38 def watch_coaches(self): 39 while True and not self.stopped(): 40 self.check_active_coaches() 41 # sleep longer than save_sessions, to make sure all DB objects are initialized 42 drivers = self.drivers() 43 # collect all driver names 44 # driver_names = [driver.name for driver in drivers] 45 # logging.info("checking coaches for drivers: %s", ", ".join(driver_names)) 46 coaches = Coach.objects.filter(driver__in=drivers) 47 for coach in coaches: 48 # logging.info(f"{coach.driver} coach enabled: {coach.enabled}") 49 if coach.enabled: 50 if coach.driver.name not in self.active_coaches.keys(): 51 logging.debug(f"activating coach for {coach.driver}") 52 self.start_coach(coach.driver.name, coach) 53 else: 54 if coach.driver.name in self.active_coaches.keys(): 55 logging.debug(f"deactivating coach for {coach.driver}") 56 self.stop_coach(coach.driver.name) 57 time.sleep(self.sleep_time) 58 self.ready = True 59 60 def stop_coach(self, driver_name): 61 if driver_name not in self.active_coaches.keys(): 62 return 63 logging.info(f"disconnectin MQTT thread for {driver_name}") 64 self.active_coaches[driver_name][0].disconnect() 65 logging.info(f"disconnecting History thread for {driver_name}") 66 self.active_coaches[driver_name][1].disconnect() 67 del self.active_coaches[driver_name] 68 69 def start_coach(self, driver_name, coach_model, debug=False): 70 history = History() 71 if coach_model.mode == Coach.MODE_TRACK_GUIDE_APP or coach_model.mode == Coach.MODE_DEBUG_APP: 72 coach = CoachApp(history, coach_model, debug=debug) 73 elif coach_model.mode == Coach.MODE_COPILOTS: 74 coach = CoachCopilots(history, coach_model, debug=debug) 75 else: 76 coach = PitCrewCoach(history, coach_model, debug=debug) 77 78 topic = f"crewchief/{driver_name}/#" 79 mqtt = Mqtt(coach, topic, replay=self.replay, debug=debug) 80 81 def history_thread(): 82 logging.info(f"History thread starting for {driver_name}") 83 history.run() 84 logging.info(f"History thread stopped for {driver_name}") 85 86 h = threading.Thread(target=history_thread) 87 h.name = f"history-{driver_name}" 88 89 def mqtt_thread(): 90 logging.info(f"MQTT thread starting for {driver_name}") 91 mqtt.run() 92 logging.info(f"MQTT thread stopped for {driver_name}") 93 94 c = threading.Thread(target=mqtt_thread) 95 c.name = f"mqtt-{driver_name}" 96 97 threads = list() 98 threads.append(h) 99 threads.append(c) 100 c.start() 101 h.start() 102 self.active_coaches[driver_name] = [history, mqtt, threads] 103 104 def check_active_coaches(self): 105 dead_drivers = set() 106 for driver_name in self.active_coaches.keys(): 107 # history = self.active_coaches[driver_name][0] 108 # mqtt = self.active_coaches[driver_name][1] 109 threads = self.active_coaches[driver_name][2] 110 111 for t in threads: 112 if not t.is_alive(): 113 self.stop() 114 logging.error(f"Thread {t} died for {driver_name}") 115 dead_drivers.add(driver_name) 116 break 117 118 for driver_name in dead_drivers: 119 self.stop_coach(driver_name) 120 121 def run(self): 122 try: 123 self.watch_coaches() 124 except Exception as e: 125 logging.exception(f"Exception in CoachWatcher: {e}") 126 raise e 127 finally: 128 # stop all coaches 129 coaches = list(self.active_coaches.keys()) 130 for driver in coaches: 131 self.stop_coach(driver) 132 133 logging.info("CoachWatcher stopped")