preprocess.py
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 5 """ functions to load data from csv 6 need to : 7 * load data from csv , combine the different dumps and transform them 8 * figure out the laps 9 * split data into laps 10 * transform from time domain to space domain 11 """ 12 13 14 import logging 15 import os 16 import pathlib 17 from io import StringIO 18 19 import numpy as np 20 import pandas as pd 21 22 log = logging.getLogger("preprocess") 23 logging.basicConfig(level=logging.WARNING) 24 25 26 # create dirs 27 for d in "../data/preprocessed", "../data/extracted": 28 if not os.path.exists(d): 29 os.mkdir(d) 30 31 32 def load_session_csv(path: str) -> pd.DataFrame: 33 """load the data from a csv that contains a db dump 34 csv has "comment" columns that start with '#' 35 after each comment block ( 3 lines usually), the headers of the columns are 36 newly specified 37 38 this function will 39 1. load the data 40 2. pivot with time 41 3. compute laps 42 43 :param path: path to the csv file 44 :return: pd.Dataframe 45 """ 46 47 # load the file a 48 with open(path) as fh: 49 txt = fh.read() 50 51 # split into dumps 52 # this marks the start of a new dataset/dataset 53 start_str = "#group" 54 dataset_txts = [start_str + dataset_txt for dataset_txt in txt.split(start_str)[1:]] 55 56 # load each as dataframe and combine 57 dfs = [pd.read_csv(StringIO(dataset_txt), comment="#") for dataset_txt in dataset_txts] 58 df = pd.concat(dfs) 59 60 # make sure this only 1 session 61 assert len(df["_start"].unique()) == 1 62 assert len(df["SessionId"].unique()) == 1 63 64 # transform to time index with 1 feature per column 65 df = df.pivot("_time", "_field", "_value") 66 67 # fill missing values 68 fill_cols = ["Throttle", "Brake", "TrackPositionPercent", "SteeringAngle", "Gear"] 69 df.loc[:, fill_cols] = df.loc[:, fill_cols].fillna(method="ffill") 70 71 # compute laps 72 df["new_lap_start"] = df["TrackPositionPercent"].diff() < -0.9 73 df["lap"] = df["new_lap_start"].cumsum() 74 return df 75 76 77 def compute_spatial_trajectory(lap_df, interpolation_step_size=0.001, segments=100): 78 """ 79 convert from time index to space index 80 use lap_df (that has time index), take the track position as new index and interpolate to linear grid 81 :return: new dataframe with interpolated values 82 """ 83 assert len(lap_df["lap"].unique()) == 1 84 85 x = np.arange(0, 1, interpolation_step_size) 86 brake = np.interp(x, lap_df["TrackPositionPercent"], lap_df["Brake"]) 87 steering_angle = np.interp(x, lap_df["TrackPositionPercent"], lap_df["SteeringAngle"]) 88 throttle = np.interp(x, lap_df["TrackPositionPercent"], lap_df["Throttle"]) 89 90 # speed = np.interp(x, lap_df["TrackPositionPercent"], lap_df["SpeedKmh"]) 91 92 df = pd.DataFrame( 93 { 94 "x": x, 95 "brake": brake, 96 "steering_angle": steering_angle, 97 "throttle": throttle, 98 # "speed": speed, 99 } 100 ) 101 102 df["segment"] = (df["x"] * segments).astype(int) 103 df = df.groupby("segment").mean().drop("x", axis=1) 104 105 return df 106 107 108 def extract_laps_from_session_csv(inpath: str, out_dir="../data/extracted/"): 109 """call the lower level functions to load the session data, 110 then interpolate lap data 111 and store lap data in individual csv files 112 """ 113 session_df = load_session_csv(inpath) 114 for lap in session_df["lap"].dropna().unique(): 115 fname = f"{inpath.split('/')[-1][:-4]}-lap{lap}.csv" 116 outpath = os.path.join(out_dir, fname) 117 session_df[session_df["lap"] == lap].to_csv(outpath) 118 log.info(f"wrote file: {outpath}") 119 120 121 def preprocess_lap(inpath: str, outpath: str): 122 """pass""" 123 df = pd.read_csv(inpath) 124 df = compute_spatial_trajectory(df) 125 df.to_csv(outpath) 126 log.info(f"wrote file: {outpath}") 127 128 129 if __name__ == "__main__": 130 path = "../data/raw/89db51de-22a6-4033-8201-2fc37a5fe905.csv" 131 extract_laps_from_session_csv(path) 132 133 for inpath in pathlib.Path().glob("../data/extracted/*lap*.csv"): 134 outpath = f"../data/preprocessed/{inpath.name}" 135 preprocess_lap(inpath, outpath)