/ components / paddock / telemetry / fast_lap_analyzer.py
fast_lap_analyzer.py
  1  import logging
  2  import statistics
  3  
  4  import numpy as np
  5  
  6  from .analyzer import Analyzer
  7  from .influx import Influx
  8  from .models import FastLap
  9  from .pitcrew.segment import Segment
 10  
 11  
 12  class FastLapAnalyzer:
 13      def __init__(self, laps=[], bucket="fast_laps"):
 14          self.analyzer = Analyzer()
 15          self.laps = laps
 16          self.bucket = bucket
 17          self.influx_client = None
 18          self.same_sectors = False
 19          self.columns = ["Brake", "SpeedMs", "Throttle", "Gear", "CurrentLapTime", "SteeringAngle", "Time"]
 20  
 21      def influx(self):
 22          if not self.influx_client:
 23              self.influx_client = Influx()
 24          return self.influx_client
 25  
 26      def assert_can_analyze(self):
 27          # maybe check for game type
 28          return True
 29  
 30      def fetch_lap_telemetry(self, max_laps=None):
 31          laps_with_telemetry = []
 32          lap_telemetry = []
 33          counter = 0
 34          for lap in self.laps:
 35              laps = self.influx().telemetry_for_laps([lap], measurement="fast_laps", bucket=self.bucket)
 36              if len(laps) == 0:
 37                  logging.info("No data found for lap in fast_laps bucket, trying in default bucket")
 38                  laps = self.influx().telemetry_for_laps([lap])
 39                  if len(laps) == 0:
 40                      logging.info("No data found for lap, continuing")
 41                      continue
 42              laps_with_telemetry.append(lap)
 43              df = self.preprocess(laps[0])
 44              lap_telemetry.append(df)
 45              counter += 1
 46              if max_laps and counter >= max_laps:
 47                  break
 48  
 49          return lap_telemetry, laps_with_telemetry
 50  
 51      def current_fast_lap_sectors(self):
 52          lap = self.laps[0]
 53          car = lap.car
 54          track = lap.track
 55          game = lap.session.game
 56          fast_lap = FastLap.objects.filter(game=game, car=car, track=track, driver=None).first()
 57          sectors = []
 58          if fast_lap and fast_lap.data:
 59              for segment in fast_lap.data.get("segments", []):
 60                  sectors.append(
 61                      {
 62                          "start": segment.start,
 63                          "end": segment.end,
 64                      }
 65                  )
 66          return sectors
 67  
 68      def similar_sectors(self, sectors_a, sectors_b):
 69          if len(sectors_a) != len(sectors_b):
 70              return False
 71  
 72          if len(sectors_a) == 0:
 73              return False
 74  
 75          start_diffs = []
 76          for i in range(len(sectors_a)):
 77              start_diffs.append(abs(sectors_a[i]["start"] - sectors_b[i]["start"]))
 78  
 79          # med = statistics.median(start_diffs)
 80          med = statistics.mean(start_diffs)
 81          logging.debug(f"start_diffs: {start_diffs} med: {med}")
 82          if med > 40:
 83              return False
 84  
 85          return True
 86  
 87      def extract_sectors(self, lap_data):
 88          df_max = self.analyzer.combine_max_throttle(lap_data)
 89          if df_max is None:
 90              return None, None
 91          sector_start_end = self.analyzer.split_sectors(
 92              df_max, min_distance_between_sectors=35, min_length_throttle_below_threshold=20
 93          )
 94          return sector_start_end, df_max
 95  
 96      def fastest_sector(self, data_frames, start, end):
 97          fast_sector = None
 98          fast_sector_time = 10_000_000_000_000
 99          fast_sector_idx = -1
100  
101          # logging.debug(f"start: {start}, end: {end}")
102          for i, df in enumerate(data_frames):
103              sector = self.analyzer.section_df(df, start, end)
104              min_distance = sector["DistanceRoundTrack"].min()
105              max_distance = sector["DistanceRoundTrack"].max()
106              if start > end:
107                  tmp = start
108                  start = end
109                  end = tmp
110              min_threshold = 10
111              # be a bit more generous with the min_threshold if its the first sector
112              # since those often dont start at 0
113              if min_distance < 20:
114                  min_threshold = 20
115              if abs(min_distance - start) > min_threshold:
116                  logging.debug(f"get sector for lap {i}: min: {min_distance} != start: {start}")
117                  continue
118              if abs(max_distance - end) > 10:
119                  logging.debug(f"get sector for lap {i}: max: {max_distance} != end: {end}")
120                  continue
121              # logging.debug(f"min_distance: {min_distance}, max_distance: {max_distance}")
122              # continue if the sector is empty
123              # check why this happens
124              # pipenv run ./manage.py analyze \
125              #   --game 'Automobilista 2' --track 'Snetterton:Snetterton_300' --car 'Ginetta G58'
126              if sector.empty:
127                  logging.error(f"sector {i} is empty")
128                  continue
129  
130              # start_idx = -1
131              # end_idx = 0
132              # if start > end:
133              #     logging.debug(f"sector {i} wrapped start: {start} > end: {end}")
134              #     start_idx = 0
135              #     end_idx = -1
136  
137              # sector_time = self.analyzer.sector_lap_time(sector)
138              sector_time = self.analyzer.sector_time(sector)
139  
140              if sector_time < fast_sector_time:
141                  fast_sector = sector
142                  fast_sector_time = sector_time
143                  fast_sector_idx = i
144  
145          # logging.debug(f"fast_sector_idx: {fast_sector_idx} fast_sector_time: {fast_sector_time}")
146          return fast_sector, fast_sector_idx
147  
148      def analyze(self, min_laps=1, max_laps=10):
149          if not self.assert_can_analyze():
150              logging.info("Can't analyze")
151              return
152  
153          lap_telemetry, laps_with_telemetry = self.fetch_lap_telemetry(max_laps)
154  
155          if len(lap_telemetry) < min_laps:
156              logging.info(f"Found {len(lap_telemetry)} laps, need {min_laps}")
157              return
158  
159          sector_start_end, df_max = self.extract_sectors(lap_telemetry)
160          if sector_start_end is None:
161              return None
162  
163          current_sectors = self.current_fast_lap_sectors()
164          if self.similar_sectors(sector_start_end, current_sectors):
165              logging.info("Sectors are similar to current fast lap, using current fast lap")
166              self.same_sectors = True
167              sector_start_end = current_sectors
168  
169          segments, used_laps = self.extract_segments(sector_start_end, lap_telemetry, laps_with_telemetry, df_max)
170  
171          distance_time = self.analyzer.distance_speed_lookup_table(lap_telemetry[0])
172          data = {
173              "distance_time": distance_time,
174              "segments": segments,
175          }
176  
177          return data, list(used_laps)
178  
179      def extract_segments(self, sector_start_end, lap_telemetry, laps_with_telemetry, df_max):
180          segments = []
181          used_laps = set()
182          track_length = df_max["DistanceRoundTrack"].max()
183          for i in range(len(sector_start_end)):
184              start = sector_start_end[i]["start"]
185              end = sector_start_end[i]["end"]
186              logging.debug(f"extract_segments for sector {i} start: {start} end: {end}")
187              sector, lap_index = self.fastest_sector(lap_telemetry, start, end)
188              if sector is None:
189                  logging.error(f"Could not find fastest sector for {start} - {end}")
190                  continue
191              # merge Throttle input
192              # sector['Throttle'] = df_max['Throttle']
193  
194              used_laps.add(laps_with_telemetry[lap_index])
195  
196              segment = self.extract_segment(sector)
197              segment.start = start
198              segment.end = end
199              segment.turn = i + 1
200              segment.track_length = track_length
201              segment.time = self.analyzer.sector_lap_time(sector)
202              segments.append(segment)
203          return segments, used_laps
204  
205      def brake_features(self, df):
206          brake_feature_args = {
207              "brake_threshold": 0.1,
208          }
209          return self.analyzer.brake_features(df, **brake_feature_args)
210  
211      def throttle_features(self, df):
212          throttle_features_args = {
213              # "throttle_threshold": 0.98,
214          }
215          return self.analyzer.throttle_features(df, **throttle_features_args)
216  
217      def gear_features(self, df):
218          gear = df["Gear"].min()
219          # get all gear changes
220          gear_changes = df[df["Gear"] != df["Gear"].shift(1)]
221          # now get the value of DistanceRoundTrack for each gear change
222          distances = gear_changes["DistanceRoundTrack"].tolist()
223          # round the values to integer
224          distances = [int(round(x)) for x in distances]
225          gears = gear_changes["Gear"].tolist()
226          gears = [int(x) for x in gears]
227          # create a dict which maps the distance to the gear
228          distance_gear = dict(zip(distances, gears))
229          return {
230              "gear": int(gear) if not np.isnan(gear) else 0,
231              "distance_gear": distance_gear,
232          }
233  
234      def extract_segment(self, sector):
235          analyzer = self.analyzer
236          throttle_or_brake = analyzer.sector_type(sector)
237          brake_features = self.brake_features(sector)
238          throttle_features = self.throttle_features(sector)
239          gear_features = self.gear_features(sector)
240          other_features = {}
241  
242          segment = Segment()
243          segment.type = throttle_or_brake
244  
245          if brake_features:
246              segment.add_features(brake_features, type="brake")
247          if throttle_features:
248              segment.add_features(throttle_features, type="throttle")
249          if gear_features:
250              segment.add_features(gear_features, type="gear")
251          if other_features:
252              segment.add_features(other_features, type="other")
253  
254          segment.telemetry = sector
255          return segment
256  
257      def preprocess(self, df):
258          # # Check if the value is increasing compared to the previous value
259          # is_increasing = df["DistanceRoundTrack"] > df["DistanceRoundTrack"].shift(1)
260  
261          # # Get the indices where the value is not increasing
262          # not_increasing_indices = is_increasing[is_increasing == False].index.tolist()  # noqa: E712
263  
264          # if len(not_increasing_indices) > 1:
265          #     logging.debug(f"Found {len(not_increasing_indices)} not increasing indices")
266  
267          # df = self.analyzer.drop_decreasing(df)
268          # # dataframes with less than 100 points are not reliable
269          # if len(df) < 100:
270          #     logging.error(f"Dataframe has less than 100 points: {len(df)}")
271          #     return
272          df = df[df["Gear"] != 0].copy()
273          # convert _time Timestamp column to int64
274          df.loc[:, "Time"] = df["_time"].astype("int64")
275          df = self.analyzer.resample(
276              df,
277              freq=1,
278              columns=self.columns,
279          )
280          return df