/ components / paddock / telemetry / fast_lap_analyzer_v1.py
fast_lap_analyzer_v1.py
  1  import logging
  2  
  3  # import matplotlib.pyplot as plt
  4  # import seaborn as sns
  5  import numpy as np
  6  import pandas as pd
  7  import scipy
  8  
  9  from .influx import Influx
 10  
 11  
 12  class FastLapAnalyzerV1:
 13      def __init__(self, laps):
 14          self.influx = Influx()
 15          self.laps = laps
 16  
 17      def analyze(self):
 18          for lap in self.laps:
 19              logging.info(
 20                  f"\t{lap.session.session_id} - {lap.pk} : length: {lap.length} time: {lap.time} number: {lap.number} "
 21              )
 22  
 23          game = self.laps[0].session.game
 24          car = self.laps[0].car
 25  
 26          if game.name == "RaceRoom":
 27              logging.info("RaceRoom not supported, because no SteeringAngle")
 28              return
 29          if game.name == "Assetto Corsa Competizione":
 30              logging.info("Assetto Corsa Competizione not supported, because no SteeringAngle")
 31              return
 32          if car.name == "Unknown":
 33              logging.info(f"Car {car.name} not supported, skipping")
 34              return
 35  
 36          # Construct a dictionary that will store the data
 37          features = [
 38              "Brake",
 39              "Throttle",
 40              "SteeringAngle",
 41              "Gear",
 42              "DistanceRoundTrack",
 43              "SpeedMs",
 44              "CurrentLapTime",
 45          ]
 46          feature_values = {}
 47          for feature in features:
 48              feature_values[feature] = []
 49  
 50          # Loop over the SessionIds for the n fastest laps
 51          # for i,session in enumerate(np.unique(session_list)):
 52          for lap in self.laps:
 53              logging.info(f"Getting data for SessionId: {lap.session.session_id} / {lap.number}")
 54              # format datetime to flux format
 55              start = lap.start.strftime("%Y-%m-%dT%H:%M:%SZ")
 56              end = lap.end.strftime("%Y-%m-%dT%H:%M:%SZ")
 57              query = f"""
 58                  from(bucket: "racing")
 59                  |> range(start: {start}, stop: {end})
 60                  |> filter(fn: (r) => r["_measurement"] == "laps_cc")
 61                  |> filter(fn: (r) => r["SessionId"] == "{lap.session.session_id}")
 62                  |> filter(fn: (r) => r["GameName"] == "{lap.session.game.name}")
 63                  |> filter(fn: (r) => r["CarModel"] == "{lap.car.name}")
 64                  |> filter(fn: (r) => r["TrackCode"] == "{lap.track.name}")
 65                  |> filter(fn: (r) => r["CurrentLap"] == "{lap.number}")
 66                  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
 67                  |> sort(columns: ["_time"], desc: false)
 68              """
 69              # Open the data frame corresponding to session
 70              try:
 71                  # print(query)
 72                  df = self.influx.query_api.query_data_frame(query=query)
 73                  # print(df["SteeringAngle"].to_string())
 74                  for feature in features:
 75                      feature_values[feature].append(df[feature].values)
 76              except Exception as e:
 77                  logging.error(f"Could not get data for SessionId: {lap.session.session_id} / {lap.number} / {e}")
 78                  continue
 79  
 80          if len(feature_values["Brake"]) == 0:
 81              logging.error("No data found")
 82              return
 83  
 84          # Define the maximal size of all SteeringAngle arrays
 85          # print(feature_values["SteeringAngle"])
 86          max_number_of_datapoints = max([s.size for s in feature_values["SteeringAngle"]])
 87  
 88          # Define a dictionnary that will store the data
 89          # we need a dictionnary that contains arrays with all the same size,
 90          # rather than the previous dictionnary that was containing lists
 91          data_arrays = {}
 92          for feature in features:
 93              data_arrays[feature] = np.zeros((len(self.laps), max_number_of_datapoints))
 94  
 95          # Better to have ones for Throttle instead of zeros
 96          data_arrays["Throttle"] = np.ones((len(self.laps), max_number_of_datapoints))
 97  
 98          # Fill the dictionnary with data
 99          for i in range(len(self.laps)):
100              for feature in features:
101                  data_arrays[feature][i][: feature_values[feature][i].size] = feature_values[feature][i]
102  
103          # Do the average of all data in the dictionnary
104          # over the n fastest laps
105          # and store it in a new dictionnary
106          # FIXME: not sure if this is the best way to find the average signal
107  
108          av_data_arrays = {}
109          for feature in features:
110              av_data_arrays[feature] = np.mean(data_arrays[feature], axis=0)
111  
112          # for readibility, define a variable that store the average SteeringAngle array
113          steer = av_data_arrays["SteeringAngle"]
114  
115          # normalize the SteeringAngle array between -1 and 1
116          steer = steer / np.max(np.abs(steer))
117  
118          # smooth the data
119          window_length = 60  # 60 points corresponds to 1 seconds
120          steer_smooth = scipy.signal.savgol_filter(steer, window_length, 2)
121          # plt.plot(steer_smooth, label="SteeringAngle")
122          steer = steer_smooth
123  
124          # Define threshold to select major turns, which means abs(SteeringAngle) > threshold
125          threshold = 0.2
126          # Find extrama in SteeringAngle
127          # Mask 1 gives the indices where SteerinAngle is above thresold : each major turn
128          msk1 = np.where(np.abs(steer) > threshold)
129  
130          # Mask 2 gives the indices of the end of a major turn
131          # which means when the indices giving the major turns are separeted by 10 or more
132          idx = np.arange(steer.size)[msk1]
133          idx_diff = idx[1:] - idx[:-1]
134          msk2 = np.concatenate(([0], np.where(idx_diff > 10)[0], [idx_diff.size - 1]))
135  
136          # Define the list that will store the extrama indices
137          extrema_idx = []
138  
139          # Mask 3 gives the indices of the center of each major turn
140          for i, m in enumerate(msk2[:-1]):
141              # select SteeringAngle for the i turn
142              steer_tmp = steer[idx[msk2][i] : idx[msk2][i + 1]]
143              if steer_tmp.size:
144                  # find index of the extrema of SteeringAngle
145                  extrema_idx_tmp = np.argmax(np.abs(steer_tmp))
146                  # add idx offset to find the index in the full SteeringAngle array
147                  extrema_idx_truth = extrema_idx_tmp + idx[msk2][i]
148                  # add to list
149                  extrema_idx.append(extrema_idx_truth)
150  
151          extrema_idx = np.array(extrema_idx)
152  
153          # Find the begining and end of each segment : center between two major turns
154          center_idx = ((extrema_idx[1:] + extrema_idx[:-1]) / 2).astype("int32")
155          center_idx = np.concatenate(([0], center_idx, [steer.size - 1]))
156  
157          # # Plot the Averaged SteeringAngle, and show the identification of major turns
158          # plt.rcParams["figure.figsize"] = (25, 10)
159          # plt.plot(steer, label="SteeringAngle")
160          # plt.plot(idx, steer[msk1], "x", label="Major turns")
161          # plt.plot(idx[msk2], steer[msk1][msk2], "o", label="End of major turns", markersize="10")
162          # plt.plot(
163          #     np.arange(steer.size)[extrema_idx],
164          #     steer[extrema_idx],
165          #     "o",
166          #     label="Center of major turn",
167          #     markersize="15",
168          # )
169          # plt.plot(
170          #     np.arange(steer.size)[center_idx],
171          #     steer[center_idx],
172          #     "o",
173          #     label="Center between two major turns",
174          #     markersize="15",
175          # )
176          # plt.legend(fontsize=18)
177          # plt.show()
178          print(f"Number of major turns: {len(extrema_idx)}")
179  
180          # Build the segment dictionnary that contains a list for each data entry
181          # these lists gather the data for each turn
182          track_segment_count = center_idx.size - 1
183  
184          # initialize dictionnary
185          segment_data = {}
186          for feature in features:
187              segment_data[feature] = []
188  
189          # fill the dictionnary
190          for i in range(track_segment_count):
191              for feature in features:
192                  segment_data[feature].append(av_data_arrays[feature][center_idx[i] : center_idx[i + 1]])
193  
194          track_info = []
195          for i in range(track_segment_count):
196              track_info.append(
197                  {
198                      "start": segment_data["DistanceRoundTrack"][i][0],
199                      "end": segment_data["DistanceRoundTrack"][i][-1],
200                  }
201              )
202  
203          # Average DistanceRoundTrack when brake is pressed the first time
204          for i in range(track_segment_count):
205              seg = np.where(segment_data["Brake"][i] > 0)
206              if seg[0].size > 0:
207                  j = seg[0][0]
208                  track_info[i]["brake"] = segment_data["DistanceRoundTrack"][i][j]
209              else:
210                  track_info[i]["brake"] = 0
211  
212          # Average brake force during the turn
213          for i in range(track_segment_count):
214              track_info[i]["force"] = segment_data["Brake"][i].max()
215  
216          # Average value of gear at the middle of the turn
217          for i in range(track_segment_count):
218              track_info[i]["gear"] = av_data_arrays["Gear"][extrema_idx][i]
219  
220          # Average lowest value of speed during the turn
221          for i in range(track_segment_count):
222              track_info[i]["speed"] = av_data_arrays["SpeedMs"][extrema_idx][i]
223  
224          # Average value of distance when the brake force is at maximum
225          # FIXME: this is not the right way to do it, we want the distance when the brake force starts to decrease
226          for i in range(track_segment_count):
227              j = np.argmax(segment_data["Brake"][i])
228              track_info[i]["stop"] = segment_data["DistanceRoundTrack"][i][j]
229  
230          # Average DistanceRoundTrack when the throttle is pressed again during the turn
231          for i in range(track_segment_count):
232              seg = np.where(segment_data["Throttle"][i] == segment_data["Throttle"][i].min())
233              if seg[0].size > 0:
234                  j = seg[0][-1]
235                  track_info[i]["accelerate"] = segment_data["DistanceRoundTrack"][i][j]
236  
237          # convert track_info, which is an array of dict, to a pandas dataframe
238          df = pd.DataFrame(track_info)
239          logging.info(df.style.format(precision=1).to_string())
240          return track_info