fast_lap_analyzer.py
1 import logging 2 import statistics 3 4 import numpy as np 5 6 from .analyzer import Analyzer 7 from .influx import Influx 8 from .models import FastLap 9 from .pitcrew.segment import Segment 10 11 12 class FastLapAnalyzer: 13 def __init__(self, laps=[], bucket="fast_laps"): 14 self.analyzer = Analyzer() 15 self.laps = laps 16 self.bucket = bucket 17 self.influx_client = None 18 self.same_sectors = False 19 self.columns = ["Brake", "SpeedMs", "Throttle", "Gear", "CurrentLapTime", "SteeringAngle", "Time"] 20 21 def influx(self): 22 if not self.influx_client: 23 self.influx_client = Influx() 24 return self.influx_client 25 26 def assert_can_analyze(self): 27 # maybe check for game type 28 return True 29 30 def fetch_lap_telemetry(self, max_laps=None): 31 laps_with_telemetry = [] 32 lap_telemetry = [] 33 counter = 0 34 for lap in self.laps: 35 laps = self.influx().telemetry_for_laps([lap], measurement="fast_laps", bucket=self.bucket) 36 if len(laps) == 0: 37 logging.info("No data found for lap in fast_laps bucket, trying in default bucket") 38 laps = self.influx().telemetry_for_laps([lap]) 39 if len(laps) == 0: 40 logging.info("No data found for lap, continuing") 41 continue 42 laps_with_telemetry.append(lap) 43 df = self.preprocess(laps[0]) 44 lap_telemetry.append(df) 45 counter += 1 46 if max_laps and counter >= max_laps: 47 break 48 49 return lap_telemetry, laps_with_telemetry 50 51 def current_fast_lap_sectors(self): 52 lap = self.laps[0] 53 car = lap.car 54 track = lap.track 55 game = lap.session.game 56 fast_lap = FastLap.objects.filter(game=game, car=car, track=track, driver=None).first() 57 sectors = [] 58 if fast_lap and fast_lap.data: 59 for segment in fast_lap.data.get("segments", []): 60 sectors.append( 61 { 62 "start": segment.start, 63 "end": segment.end, 64 } 65 ) 66 return sectors 67 68 def similar_sectors(self, sectors_a, sectors_b): 69 if len(sectors_a) != len(sectors_b): 70 return False 71 72 if len(sectors_a) == 0: 73 return False 74 75 start_diffs = [] 76 for i in range(len(sectors_a)): 77 start_diffs.append(abs(sectors_a[i]["start"] - sectors_b[i]["start"])) 78 79 # med = statistics.median(start_diffs) 80 med = statistics.mean(start_diffs) 81 logging.debug(f"start_diffs: {start_diffs} med: {med}") 82 if med > 40: 83 return False 84 85 return True 86 87 def extract_sectors(self, lap_data): 88 df_max = self.analyzer.combine_max_throttle(lap_data) 89 if df_max is None: 90 return None, None 91 sector_start_end = self.analyzer.split_sectors( 92 df_max, min_distance_between_sectors=35, min_length_throttle_below_threshold=20 93 ) 94 return sector_start_end, df_max 95 96 def fastest_sector(self, data_frames, start, end): 97 fast_sector = None 98 fast_sector_time = 10_000_000_000_000 99 fast_sector_idx = -1 100 101 # logging.debug(f"start: {start}, end: {end}") 102 for i, df in enumerate(data_frames): 103 sector = self.analyzer.section_df(df, start, end) 104 min_distance = sector["DistanceRoundTrack"].min() 105 max_distance = sector["DistanceRoundTrack"].max() 106 if start > end: 107 tmp = start 108 start = end 109 end = tmp 110 min_threshold = 10 111 # be a bit more generous with the min_threshold if its the first sector 112 # since those often dont start at 0 113 if min_distance < 20: 114 min_threshold = 20 115 if abs(min_distance - start) > min_threshold: 116 logging.debug(f"get sector for lap {i}: min: {min_distance} != start: {start}") 117 continue 118 if abs(max_distance - end) > 10: 119 logging.debug(f"get sector for lap {i}: max: {max_distance} != end: {end}") 120 continue 121 # logging.debug(f"min_distance: {min_distance}, max_distance: {max_distance}") 122 # continue if the sector is empty 123 # check why this happens 124 # pipenv run ./manage.py analyze \ 125 # --game 'Automobilista 2' --track 'Snetterton:Snetterton_300' --car 'Ginetta G58' 126 if sector.empty: 127 logging.error(f"sector {i} is empty") 128 continue 129 130 # start_idx = -1 131 # end_idx = 0 132 # if start > end: 133 # logging.debug(f"sector {i} wrapped start: {start} > end: {end}") 134 # start_idx = 0 135 # end_idx = -1 136 137 # sector_time = self.analyzer.sector_lap_time(sector) 138 sector_time = self.analyzer.sector_time(sector) 139 140 if sector_time < fast_sector_time: 141 fast_sector = sector 142 fast_sector_time = sector_time 143 fast_sector_idx = i 144 145 # logging.debug(f"fast_sector_idx: {fast_sector_idx} fast_sector_time: {fast_sector_time}") 146 return fast_sector, fast_sector_idx 147 148 def analyze(self, min_laps=1, max_laps=10): 149 if not self.assert_can_analyze(): 150 logging.info("Can't analyze") 151 return 152 153 lap_telemetry, laps_with_telemetry = self.fetch_lap_telemetry(max_laps) 154 155 if len(lap_telemetry) < min_laps: 156 logging.info(f"Found {len(lap_telemetry)} laps, need {min_laps}") 157 return 158 159 sector_start_end, df_max = self.extract_sectors(lap_telemetry) 160 if sector_start_end is None: 161 return None 162 163 current_sectors = self.current_fast_lap_sectors() 164 if self.similar_sectors(sector_start_end, current_sectors): 165 logging.info("Sectors are similar to current fast lap, using current fast lap") 166 self.same_sectors = True 167 sector_start_end = current_sectors 168 169 segments, used_laps = self.extract_segments(sector_start_end, lap_telemetry, laps_with_telemetry, df_max) 170 171 distance_time = self.analyzer.distance_speed_lookup_table(lap_telemetry[0]) 172 data = { 173 "distance_time": distance_time, 174 "segments": segments, 175 } 176 177 return data, list(used_laps) 178 179 def extract_segments(self, sector_start_end, lap_telemetry, laps_with_telemetry, df_max): 180 segments = [] 181 used_laps = set() 182 track_length = df_max["DistanceRoundTrack"].max() 183 for i in range(len(sector_start_end)): 184 start = sector_start_end[i]["start"] 185 end = sector_start_end[i]["end"] 186 logging.debug(f"extract_segments for sector {i} start: {start} end: {end}") 187 sector, lap_index = self.fastest_sector(lap_telemetry, start, end) 188 if sector is None: 189 logging.error(f"Could not find fastest sector for {start} - {end}") 190 continue 191 # merge Throttle input 192 # sector['Throttle'] = df_max['Throttle'] 193 194 used_laps.add(laps_with_telemetry[lap_index]) 195 196 segment = self.extract_segment(sector) 197 segment.start = start 198 segment.end = end 199 segment.turn = i + 1 200 segment.track_length = track_length 201 segment.time = self.analyzer.sector_lap_time(sector) 202 segments.append(segment) 203 return segments, used_laps 204 205 def brake_features(self, df): 206 brake_feature_args = { 207 "brake_threshold": 0.1, 208 } 209 return self.analyzer.brake_features(df, **brake_feature_args) 210 211 def throttle_features(self, df): 212 throttle_features_args = { 213 # "throttle_threshold": 0.98, 214 } 215 return self.analyzer.throttle_features(df, **throttle_features_args) 216 217 def gear_features(self, df): 218 gear = df["Gear"].min() 219 # get all gear changes 220 gear_changes = df[df["Gear"] != df["Gear"].shift(1)] 221 # now get the value of DistanceRoundTrack for each gear change 222 distances = gear_changes["DistanceRoundTrack"].tolist() 223 # round the values to integer 224 distances = [int(round(x)) for x in distances] 225 gears = gear_changes["Gear"].tolist() 226 gears = [int(x) for x in gears] 227 # create a dict which maps the distance to the gear 228 distance_gear = dict(zip(distances, gears)) 229 return { 230 "gear": int(gear) if not np.isnan(gear) else 0, 231 "distance_gear": distance_gear, 232 } 233 234 def extract_segment(self, sector): 235 analyzer = self.analyzer 236 throttle_or_brake = analyzer.sector_type(sector) 237 brake_features = self.brake_features(sector) 238 throttle_features = self.throttle_features(sector) 239 gear_features = self.gear_features(sector) 240 other_features = {} 241 242 segment = Segment() 243 segment.type = throttle_or_brake 244 245 if brake_features: 246 segment.add_features(brake_features, type="brake") 247 if throttle_features: 248 segment.add_features(throttle_features, type="throttle") 249 if gear_features: 250 segment.add_features(gear_features, type="gear") 251 if other_features: 252 segment.add_features(other_features, type="other") 253 254 segment.telemetry = sector 255 return segment 256 257 def preprocess(self, df): 258 # # Check if the value is increasing compared to the previous value 259 # is_increasing = df["DistanceRoundTrack"] > df["DistanceRoundTrack"].shift(1) 260 261 # # Get the indices where the value is not increasing 262 # not_increasing_indices = is_increasing[is_increasing == False].index.tolist() # noqa: E712 263 264 # if len(not_increasing_indices) > 1: 265 # logging.debug(f"Found {len(not_increasing_indices)} not increasing indices") 266 267 # df = self.analyzer.drop_decreasing(df) 268 # # dataframes with less than 100 points are not reliable 269 # if len(df) < 100: 270 # logging.error(f"Dataframe has less than 100 points: {len(df)}") 271 # return 272 df = df[df["Gear"] != 0].copy() 273 # convert _time Timestamp column to int64 274 df.loc[:, "Time"] = df["_time"].astype("int64") 275 df = self.analyzer.resample( 276 df, 277 freq=1, 278 columns=self.columns, 279 ) 280 return df