fast_lap_analyzer_v1.py
1 import logging 2 3 # import matplotlib.pyplot as plt 4 # import seaborn as sns 5 import numpy as np 6 import pandas as pd 7 import scipy 8 9 from .influx import Influx 10 11 12 class FastLapAnalyzerV1: 13 def __init__(self, laps): 14 self.influx = Influx() 15 self.laps = laps 16 17 def analyze(self): 18 for lap in self.laps: 19 logging.info( 20 f"\t{lap.session.session_id} - {lap.pk} : length: {lap.length} time: {lap.time} number: {lap.number} " 21 ) 22 23 game = self.laps[0].session.game 24 car = self.laps[0].car 25 26 if game.name == "RaceRoom": 27 logging.info("RaceRoom not supported, because no SteeringAngle") 28 return 29 if game.name == "Assetto Corsa Competizione": 30 logging.info("Assetto Corsa Competizione not supported, because no SteeringAngle") 31 return 32 if car.name == "Unknown": 33 logging.info(f"Car {car.name} not supported, skipping") 34 return 35 36 # Construct a dictionary that will store the data 37 features = [ 38 "Brake", 39 "Throttle", 40 "SteeringAngle", 41 "Gear", 42 "DistanceRoundTrack", 43 "SpeedMs", 44 "CurrentLapTime", 45 ] 46 feature_values = {} 47 for feature in features: 48 feature_values[feature] = [] 49 50 # Loop over the SessionIds for the n fastest laps 51 # for i,session in enumerate(np.unique(session_list)): 52 for lap in self.laps: 53 logging.info(f"Getting data for SessionId: {lap.session.session_id} / {lap.number}") 54 # format datetime to flux format 55 start = lap.start.strftime("%Y-%m-%dT%H:%M:%SZ") 56 end = lap.end.strftime("%Y-%m-%dT%H:%M:%SZ") 57 query = f""" 58 from(bucket: "racing") 59 |> range(start: {start}, stop: {end}) 60 |> filter(fn: (r) => r["_measurement"] == "laps_cc") 61 |> filter(fn: (r) => r["SessionId"] == "{lap.session.session_id}") 62 |> filter(fn: (r) => r["GameName"] == "{lap.session.game.name}") 63 |> filter(fn: (r) => r["CarModel"] == "{lap.car.name}") 64 |> filter(fn: (r) => r["TrackCode"] == "{lap.track.name}") 65 |> filter(fn: (r) => r["CurrentLap"] == "{lap.number}") 66 |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") 67 |> sort(columns: ["_time"], desc: false) 68 """ 69 # Open the data frame corresponding to session 70 try: 71 # print(query) 72 df = self.influx.query_api.query_data_frame(query=query) 73 # print(df["SteeringAngle"].to_string()) 74 for feature in features: 75 feature_values[feature].append(df[feature].values) 76 except Exception as e: 77 logging.error(f"Could not get data for SessionId: {lap.session.session_id} / {lap.number} / {e}") 78 continue 79 80 if len(feature_values["Brake"]) == 0: 81 logging.error("No data found") 82 return 83 84 # Define the maximal size of all SteeringAngle arrays 85 # print(feature_values["SteeringAngle"]) 86 max_number_of_datapoints = max([s.size for s in feature_values["SteeringAngle"]]) 87 88 # Define a dictionnary that will store the data 89 # we need a dictionnary that contains arrays with all the same size, 90 # rather than the previous dictionnary that was containing lists 91 data_arrays = {} 92 for feature in features: 93 data_arrays[feature] = np.zeros((len(self.laps), max_number_of_datapoints)) 94 95 # Better to have ones for Throttle instead of zeros 96 data_arrays["Throttle"] = np.ones((len(self.laps), max_number_of_datapoints)) 97 98 # Fill the dictionnary with data 99 for i in range(len(self.laps)): 100 for feature in features: 101 data_arrays[feature][i][: feature_values[feature][i].size] = feature_values[feature][i] 102 103 # Do the average of all data in the dictionnary 104 # over the n fastest laps 105 # and store it in a new dictionnary 106 # FIXME: not sure if this is the best way to find the average signal 107 108 av_data_arrays = {} 109 for feature in features: 110 av_data_arrays[feature] = np.mean(data_arrays[feature], axis=0) 111 112 # for readibility, define a variable that store the average SteeringAngle array 113 steer = av_data_arrays["SteeringAngle"] 114 115 # normalize the SteeringAngle array between -1 and 1 116 steer = steer / np.max(np.abs(steer)) 117 118 # smooth the data 119 window_length = 60 # 60 points corresponds to 1 seconds 120 steer_smooth = scipy.signal.savgol_filter(steer, window_length, 2) 121 # plt.plot(steer_smooth, label="SteeringAngle") 122 steer = steer_smooth 123 124 # Define threshold to select major turns, which means abs(SteeringAngle) > threshold 125 threshold = 0.2 126 # Find extrama in SteeringAngle 127 # Mask 1 gives the indices where SteerinAngle is above thresold : each major turn 128 msk1 = np.where(np.abs(steer) > threshold) 129 130 # Mask 2 gives the indices of the end of a major turn 131 # which means when the indices giving the major turns are separeted by 10 or more 132 idx = np.arange(steer.size)[msk1] 133 idx_diff = idx[1:] - idx[:-1] 134 msk2 = np.concatenate(([0], np.where(idx_diff > 10)[0], [idx_diff.size - 1])) 135 136 # Define the list that will store the extrama indices 137 extrema_idx = [] 138 139 # Mask 3 gives the indices of the center of each major turn 140 for i, m in enumerate(msk2[:-1]): 141 # select SteeringAngle for the i turn 142 steer_tmp = steer[idx[msk2][i] : idx[msk2][i + 1]] 143 if steer_tmp.size: 144 # find index of the extrema of SteeringAngle 145 extrema_idx_tmp = np.argmax(np.abs(steer_tmp)) 146 # add idx offset to find the index in the full SteeringAngle array 147 extrema_idx_truth = extrema_idx_tmp + idx[msk2][i] 148 # add to list 149 extrema_idx.append(extrema_idx_truth) 150 151 extrema_idx = np.array(extrema_idx) 152 153 # Find the begining and end of each segment : center between two major turns 154 center_idx = ((extrema_idx[1:] + extrema_idx[:-1]) / 2).astype("int32") 155 center_idx = np.concatenate(([0], center_idx, [steer.size - 1])) 156 157 # # Plot the Averaged SteeringAngle, and show the identification of major turns 158 # plt.rcParams["figure.figsize"] = (25, 10) 159 # plt.plot(steer, label="SteeringAngle") 160 # plt.plot(idx, steer[msk1], "x", label="Major turns") 161 # plt.plot(idx[msk2], steer[msk1][msk2], "o", label="End of major turns", markersize="10") 162 # plt.plot( 163 # np.arange(steer.size)[extrema_idx], 164 # steer[extrema_idx], 165 # "o", 166 # label="Center of major turn", 167 # markersize="15", 168 # ) 169 # plt.plot( 170 # np.arange(steer.size)[center_idx], 171 # steer[center_idx], 172 # "o", 173 # label="Center between two major turns", 174 # markersize="15", 175 # ) 176 # plt.legend(fontsize=18) 177 # plt.show() 178 print(f"Number of major turns: {len(extrema_idx)}") 179 180 # Build the segment dictionnary that contains a list for each data entry 181 # these lists gather the data for each turn 182 track_segment_count = center_idx.size - 1 183 184 # initialize dictionnary 185 segment_data = {} 186 for feature in features: 187 segment_data[feature] = [] 188 189 # fill the dictionnary 190 for i in range(track_segment_count): 191 for feature in features: 192 segment_data[feature].append(av_data_arrays[feature][center_idx[i] : center_idx[i + 1]]) 193 194 track_info = [] 195 for i in range(track_segment_count): 196 track_info.append( 197 { 198 "start": segment_data["DistanceRoundTrack"][i][0], 199 "end": segment_data["DistanceRoundTrack"][i][-1], 200 } 201 ) 202 203 # Average DistanceRoundTrack when brake is pressed the first time 204 for i in range(track_segment_count): 205 seg = np.where(segment_data["Brake"][i] > 0) 206 if seg[0].size > 0: 207 j = seg[0][0] 208 track_info[i]["brake"] = segment_data["DistanceRoundTrack"][i][j] 209 else: 210 track_info[i]["brake"] = 0 211 212 # Average brake force during the turn 213 for i in range(track_segment_count): 214 track_info[i]["force"] = segment_data["Brake"][i].max() 215 216 # Average value of gear at the middle of the turn 217 for i in range(track_segment_count): 218 track_info[i]["gear"] = av_data_arrays["Gear"][extrema_idx][i] 219 220 # Average lowest value of speed during the turn 221 for i in range(track_segment_count): 222 track_info[i]["speed"] = av_data_arrays["SpeedMs"][extrema_idx][i] 223 224 # Average value of distance when the brake force is at maximum 225 # FIXME: this is not the right way to do it, we want the distance when the brake force starts to decrease 226 for i in range(track_segment_count): 227 j = np.argmax(segment_data["Brake"][i]) 228 track_info[i]["stop"] = segment_data["DistanceRoundTrack"][i][j] 229 230 # Average DistanceRoundTrack when the throttle is pressed again during the turn 231 for i in range(track_segment_count): 232 seg = np.where(segment_data["Throttle"][i] == segment_data["Throttle"][i].min()) 233 if seg[0].size > 0: 234 j = seg[0][-1] 235 track_info[i]["accelerate"] = segment_data["DistanceRoundTrack"][i][j] 236 237 # convert track_info, which is an array of dict, to a pandas dataframe 238 df = pd.DataFrame(track_info) 239 logging.info(df.style.format(precision=1).to_string()) 240 return track_info