telemetry_loader.py
1 import os 2 import tempfile 3 4 import numpy as np 5 import pandas as pd 6 7 from telemetry.analyzer import Analyzer 8 from telemetry.influx import Influx 9 from telemetry.models import Lap 10 11 12 class TelemetryLoader: 13 temp_dir = tempfile.mkdtemp() 14 15 def __init__(self, caching=False): 16 self.caching = caching 17 pass 18 19 def read_dataframe(self, file_path): 20 return pd.read_csv(file_path, compression="gzip", parse_dates=["_time"]) 21 22 def save_dataframe(self, df, file_path): 23 df.to_csv(file_path, compression="gzip", index=False) 24 25 def process_dataframe(self, df): 26 df = df.sort_values(by="_time") 27 df = df.replace(np.nan, None) 28 29 # only return the columns we need: "SpeedMs", "Throttle", "Brake", "DistanceRoundTrack" 30 columns = [ 31 "SpeedMs", 32 "Throttle", 33 "Brake", 34 "DistanceRoundTrack", 35 "CurrentLap", 36 "Gear", 37 "SteeringAngle", 38 "CurrentLapTime", 39 ] 40 41 # check if the session contains position data 42 if "WorldPosition_x" in df.columns: 43 columns += ["WorldPosition_x", "WorldPosition_y", "WorldPosition_z"] 44 45 for field in ["Yaw", "Pitch", "Roll"]: 46 if field in df.columns: 47 columns += [field] 48 49 df = df[columns] 50 51 analyzer = Analyzer() 52 # split the dataframe into laps based on the CurrentLap field 53 unique_laps = df["CurrentLap"].unique() 54 laps = [] 55 for lap in unique_laps: 56 lap_df = df[df["CurrentLap"] == lap] 57 lap_df = analyzer.resample_channels(lap_df, columns=columns, freq=1) 58 laps.append(lap_df) 59 60 # merge the laps back into a single dataframe 61 df = pd.concat(laps) 62 63 # change CurrentLap to int 64 # otherwise the session.js frontend will not be able to parse the JSON 65 df["CurrentLap"] = df["CurrentLap"].astype(int) 66 67 return df 68 69 def get_lap_df(self, lap_id, measurement="laps_cc", bucket="racing"): 70 # make sure the lap_id is an integer 71 lap_id = int(lap_id) 72 # fetch the lap from the database 73 lap = Lap.objects.get(id=lap_id) 74 75 influx = Influx() 76 77 measurement = "fast_laps" 78 bucket = "fast_laps" 79 lap_df = influx.telemetry_for_laps([lap], measurement=measurement, bucket=bucket) 80 81 if len(lap_df) == 0: 82 measurement = "laps_cc" 83 bucket = "racing" 84 lap_df = influx.telemetry_for_laps([lap], measurement=measurement, bucket=bucket) 85 86 if len(lap_df) > 0: 87 df = self.process_dataframe(lap_df[0]) 88 else: 89 df = pd.DataFrame() 90 return df 91 92 def get_session_df(self, session_id, measurement="laps_cc", bucket="racing"): 93 # make sure the session_id is an integer 94 session_id = int(session_id) 95 file_path = f"{self.temp_dir}/session_{session_id}_df.csv.gz" 96 97 if self.caching and os.path.exists(file_path): 98 session_df = self.read_dataframe(file_path) 99 else: 100 influx = Influx() 101 aggregate = "100ms" 102 # aggregate = "" 103 # if self.caching: 104 # aggregate = "1s" 105 session_df = influx.session_df( 106 session_id, measurement=measurement, bucket=bucket, start="-10y", aggregate=aggregate, drop_tags=True 107 ) 108 if self.caching: 109 self.save_dataframe(session_df, file_path) 110 111 if len(session_df) > 0: 112 df = self.process_dataframe(session_df) 113 else: 114 df = pd.DataFrame() 115 return df