/ notebooks / analysis / src / preprocess.py
preprocess.py
  1  #!/usr/bin/env python
  2  # -*- coding: utf-8 -*-
  3  
  4  
  5  """ functions to load data from csv
  6  need to :
  7  * load data from csv , combine the different dumps and transform them
  8  * figure out the laps
  9  * split data into laps
 10  * transform from time domain to space domain
 11  """
 12  
 13  
 14  import logging
 15  import os
 16  import pathlib
 17  from io import StringIO
 18  
 19  import numpy as np
 20  import pandas as pd
 21  
 22  log = logging.getLogger("preprocess")
 23  logging.basicConfig(level=logging.WARNING)
 24  
 25  
 26  # create dirs
 27  for d in "../data/preprocessed", "../data/extracted":
 28      if not os.path.exists(d):
 29          os.mkdir(d)
 30  
 31  
 32  def load_session_csv(path: str) -> pd.DataFrame:
 33      """load the data from a csv that contains a db dump
 34      csv has "comment" columns that start with '#'
 35      after each comment block ( 3 lines usually), the headers of the columns are
 36      newly specified
 37  
 38      this function will
 39      1. load the data
 40      2. pivot with time
 41      3. compute laps
 42  
 43      :param path: path to the csv file
 44      :return: pd.Dataframe
 45      """
 46  
 47      # load the file a
 48      with open(path) as fh:
 49          txt = fh.read()
 50  
 51      # split into dumps
 52      # this marks the start of a new dataset/dataset
 53      start_str = "#group"
 54      dataset_txts = [start_str + dataset_txt for dataset_txt in txt.split(start_str)[1:]]
 55  
 56      # load each as dataframe and combine
 57      dfs = [pd.read_csv(StringIO(dataset_txt), comment="#") for dataset_txt in dataset_txts]
 58      df = pd.concat(dfs)
 59  
 60      # make sure this only 1 session
 61      assert len(df["_start"].unique()) == 1
 62      assert len(df["SessionId"].unique()) == 1
 63  
 64      # transform to time index with 1 feature per column
 65      df = df.pivot("_time", "_field", "_value")
 66  
 67      # fill missing values
 68      fill_cols = ["Throttle", "Brake", "TrackPositionPercent", "SteeringAngle", "Gear"]
 69      df.loc[:, fill_cols] = df.loc[:, fill_cols].fillna(method="ffill")
 70  
 71      # compute laps
 72      df["new_lap_start"] = df["TrackPositionPercent"].diff() < -0.9
 73      df["lap"] = df["new_lap_start"].cumsum()
 74      return df
 75  
 76  
 77  def compute_spatial_trajectory(lap_df, interpolation_step_size=0.001, segments=100):
 78      """
 79      convert from time index to space index
 80      use lap_df (that has time index), take the track position as new index and interpolate to linear grid
 81      :return: new dataframe with interpolated values
 82      """
 83      assert len(lap_df["lap"].unique()) == 1
 84  
 85      x = np.arange(0, 1, interpolation_step_size)
 86      brake = np.interp(x, lap_df["TrackPositionPercent"], lap_df["Brake"])
 87      steering_angle = np.interp(x, lap_df["TrackPositionPercent"], lap_df["SteeringAngle"])
 88      throttle = np.interp(x, lap_df["TrackPositionPercent"], lap_df["Throttle"])
 89  
 90      # speed = np.interp(x, lap_df["TrackPositionPercent"], lap_df["SpeedKmh"])
 91  
 92      df = pd.DataFrame(
 93          {
 94              "x": x,
 95              "brake": brake,
 96              "steering_angle": steering_angle,
 97              "throttle": throttle,
 98              # "speed": speed,
 99          }
100      )
101  
102      df["segment"] = (df["x"] * segments).astype(int)
103      df = df.groupby("segment").mean().drop("x", axis=1)
104  
105      return df
106  
107  
108  def extract_laps_from_session_csv(inpath: str, out_dir="../data/extracted/"):
109      """call the lower level functions to load the session data,
110      then interpolate lap data
111      and store lap data in individual csv files
112      """
113      session_df = load_session_csv(inpath)
114      for lap in session_df["lap"].dropna().unique():
115          fname = f"{inpath.split('/')[-1][:-4]}-lap{lap}.csv"
116          outpath = os.path.join(out_dir, fname)
117          session_df[session_df["lap"] == lap].to_csv(outpath)
118          log.info(f"wrote file: {outpath}")
119  
120  
121  def preprocess_lap(inpath: str, outpath: str):
122      """pass"""
123      df = pd.read_csv(inpath)
124      df = compute_spatial_trajectory(df)
125      df.to_csv(outpath)
126      log.info(f"wrote file: {outpath}")
127  
128  
129  if __name__ == "__main__":
130      path = "../data/raw/89db51de-22a6-4033-8201-2fc37a5fe905.csv"
131      extract_laps_from_session_csv(path)
132  
133      for inpath in pathlib.Path().glob("../data/extracted/*lap*.csv"):
134          outpath = f"../data/preprocessed/{inpath.name}"
135          preprocess_lap(inpath, outpath)