/ components / paddock / api / telemetry_loader.py
telemetry_loader.py
  1  import os
  2  import tempfile
  3  
  4  import numpy as np
  5  import pandas as pd
  6  
  7  from telemetry.analyzer import Analyzer
  8  from telemetry.influx import Influx
  9  from telemetry.models import Lap
 10  
 11  
 12  class TelemetryLoader:
 13      temp_dir = tempfile.mkdtemp()
 14  
 15      def __init__(self, caching=False):
 16          self.caching = caching
 17          pass
 18  
 19      def read_dataframe(self, file_path):
 20          return pd.read_csv(file_path, compression="gzip", parse_dates=["_time"])
 21  
 22      def save_dataframe(self, df, file_path):
 23          df.to_csv(file_path, compression="gzip", index=False)
 24  
 25      def process_dataframe(self, df):
 26          df = df.sort_values(by="_time")
 27          df = df.replace(np.nan, None)
 28  
 29          # only return the columns we need: "SpeedMs", "Throttle", "Brake", "DistanceRoundTrack"
 30          columns = [
 31              "SpeedMs",
 32              "Throttle",
 33              "Brake",
 34              "DistanceRoundTrack",
 35              "CurrentLap",
 36              "Gear",
 37              "SteeringAngle",
 38              "CurrentLapTime",
 39          ]
 40  
 41          # check if the session contains position data
 42          if "WorldPosition_x" in df.columns:
 43              columns += ["WorldPosition_x", "WorldPosition_y", "WorldPosition_z"]
 44  
 45          for field in ["Yaw", "Pitch", "Roll"]:
 46              if field in df.columns:
 47                  columns += [field]
 48  
 49          df = df[columns]
 50  
 51          analyzer = Analyzer()
 52          # split the dataframe into laps based on the CurrentLap field
 53          unique_laps = df["CurrentLap"].unique()
 54          laps = []
 55          for lap in unique_laps:
 56              lap_df = df[df["CurrentLap"] == lap]
 57              lap_df = analyzer.resample_channels(lap_df, columns=columns, freq=1)
 58              laps.append(lap_df)
 59  
 60          # merge the laps back into a single dataframe
 61          df = pd.concat(laps)
 62  
 63          # change CurrentLap to int
 64          # otherwise the session.js frontend will not be able to parse the JSON
 65          df["CurrentLap"] = df["CurrentLap"].astype(int)
 66  
 67          return df
 68  
 69      def get_lap_df(self, lap_id, measurement="laps_cc", bucket="racing"):
 70          # make sure the lap_id is an integer
 71          lap_id = int(lap_id)
 72          # fetch the lap from the database
 73          lap = Lap.objects.get(id=lap_id)
 74  
 75          influx = Influx()
 76  
 77          measurement = "fast_laps"
 78          bucket = "fast_laps"
 79          lap_df = influx.telemetry_for_laps([lap], measurement=measurement, bucket=bucket)
 80  
 81          if len(lap_df) == 0:
 82              measurement = "laps_cc"
 83              bucket = "racing"
 84              lap_df = influx.telemetry_for_laps([lap], measurement=measurement, bucket=bucket)
 85  
 86          if len(lap_df) > 0:
 87              df = self.process_dataframe(lap_df[0])
 88          else:
 89              df = pd.DataFrame()
 90          return df
 91  
 92      def get_session_df(self, session_id, measurement="laps_cc", bucket="racing"):
 93          # make sure the session_id is an integer
 94          session_id = int(session_id)
 95          file_path = f"{self.temp_dir}/session_{session_id}_df.csv.gz"
 96  
 97          if self.caching and os.path.exists(file_path):
 98              session_df = self.read_dataframe(file_path)
 99          else:
100              influx = Influx()
101              aggregate = "100ms"
102              # aggregate = ""
103              # if self.caching:
104              #     aggregate = "1s"
105              session_df = influx.session_df(
106                  session_id, measurement=measurement, bucket=bucket, start="-10y", aggregate=aggregate, drop_tags=True
107              )
108              if self.caching:
109                  self.save_dataframe(session_df, file_path)
110  
111          if len(session_df) > 0:
112              df = self.process_dataframe(session_df)
113          else:
114              df = pd.DataFrame()
115          return df