/ components / paddock / telemetry / influx.py
influx.py
  1  import csv
  2  import logging
  3  import sys
  4  import warnings
  5  from datetime import datetime, timedelta
  6  
  7  import influxdb_client
  8  from dateutil import parser
  9  
 10  # import asyncio
 11  from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
 12  from influxdb_client.client.warnings import MissingPivotFunction
 13  
 14  from .utils import get_influxdb2_config
 15  
 16  warnings.simplefilter("ignore", MissingPivotFunction)
 17  
 18  
 19  class Influx:
 20      def __init__(self):
 21          # configure influxdb client
 22          (self.org, self.token, self.url) = get_influxdb2_config()
 23  
 24          self.influx = influxdb_client.InfluxDBClient(
 25              url=self.url, token=self.token, org=self.org, timeout=(10_000, 600_000)
 26          )
 27          if self.influx.ping():
 28              logging.debug(f"Influx: Connected to {self.url}")
 29          else:
 30              logging.error(f"Influx: Connection to {self.url} failed")
 31              sys.exit(1)
 32  
 33          self.query_api = self.influx.query_api()
 34  
 35      def laps_from_file(self, filename="tracks.csv"):
 36          tracks = {}
 37          # open csv file with track data and read it into dictionary
 38          with open(filename) as f:
 39              reader = csv.DictReader(f)
 40              for row in reader:
 41                  if row["game"] not in tracks:
 42                      tracks[row["game"]] = {}
 43                  if row["track"] not in tracks[row["game"]]:
 44                      tracks[row["game"]][row["track"]] = {}
 45                  if row["car"] not in tracks[row["game"]][row["track"]]:
 46                      tracks[row["game"]][row["track"]][row["car"]] = {}
 47                  if row["session"] not in tracks[row["game"]][row["track"]][row["car"]]:
 48                      tracks[row["game"]][row["track"]][row["car"]][row["session"]] = []
 49                  tracks[row["game"]][row["track"]][row["car"]][row["session"]].append(
 50                      {
 51                          "lap": row["lap"],
 52                          "start": row["start"],
 53                          "end": row["end"],
 54                          "time": row["time"],
 55                          "length": row["length"],
 56                      }
 57                  )
 58          self.tracks = tracks
 59  
 60      def telemetry_for(self, game="", track="", car=""):
 61          data = []
 62          fgame = game
 63          ftrack = track
 64          fcar = car
 65          for game, tracks in self.tracks.items():
 66              for track, cars in tracks.items():
 67                  for car, sessions in cars.items():
 68                      if game != fgame or track != ftrack or car != fcar:
 69                          continue
 70                      for session, laps in sessions.items():
 71                          for lap in laps:
 72                              lap_number = lap["lap"]
 73                              lap_time = lap["time"]
 74                              length = lap["length"]
 75                              logging.info(
 76                                  f"Processing {game} {track} : session {session} : "
 77                                  + f"lap #{lap_number: >2} : length {length} : time {lap_time}"
 78                              )
 79                              # 2022-12-05 19:52:18.141110+00:00
 80                              start = datetime.strptime(lap["start"], "%Y-%m-%d %H:%M:%S.%f%z")
 81                              end = datetime.strptime(lap["end"], "%Y-%m-%d %H:%M:%S.%f%z")
 82  
 83                              df = self.session_df(
 84                                  session,
 85                                  lap_number=lap["lap"],
 86                                  start=start,
 87                                  end=end,
 88                                  measurement="fast_laps",
 89                                  bucket="fast_laps",
 90                              )
 91                              data.append(df)
 92          return data
 93  
 94      def telemetry_for_laps(self, laps=[], measurement="laps_cc", bucket="racing"):
 95          data = []
 96          for lap in laps:
 97              game = lap.session.game.name
 98              session = lap.session.session_id
 99              track = lap.track.name
100              lap_number = lap.number
101  
102              logging.info(f"Fetching telemetry for {game} - {track} - {lap.car}")
103              logging.info(f"  track.id {lap.track.id} car.id {lap.car.id}")
104              logging.info(f"  session {session} lap.id {lap.id} number {lap_number}")
105              logging.info(f"  length {lap.length} time {lap.time} valid {lap.valid}")
106              logging.info(f"  start {lap.start} end {lap.end}")
107  
108              try:
109                  df = self.session_df(
110                      session,
111                      lap_number=lap_number,
112                      start=lap.start,
113                      end=lap.end,
114                      measurement=measurement,
115                      bucket=bucket,
116                  )
117                  if len(df) > 100:
118                      data.append(df)
119              except Exception as e:
120                  logging.error(e)
121  
122          return data
123  
124      def session(
125          self,
126          session_id=None,
127          lap=None,
128          lap_numbers=[],
129          start=None,
130          end=None,
131          measurement="laps_cc",
132          bucket="racing",
133      ):
134          lap_filter = []
135  
136          if start and end and session_id:
137              # subtract one hour from start and add one hour to end
138              start = parser.parse(start) - timedelta(hours=1)
139              end = parser.parse(end) - timedelta(hours=1)
140              start = start.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
141              end = end.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
142          if lap:
143              start = lap.start.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
144              end = lap.end.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
145              session_id = lap.session.session_id
146  
147          logging.debug(f"session_id: {session_id}, start: {start}, end: {end}, lap_numbers: {lap_numbers}")
148  
149          if lap_numbers and session_id:
150              for lap_number in lap_numbers:
151                  lap_filter.append(f'r["CurrentLap"] == "{lap_number}"')
152  
153              query = f"""
154                  from(bucket: "{bucket}")
155                  |> range(start: -10y, stop: now())
156                  |> filter(fn: (r) => r["_measurement"] == "{measurement}")
157                  |> filter(fn: (r) => r["SessionId"] == "{session_id}")
158                  |> filter(fn: (r) => {' or '.join(lap_filter)})
159                  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
160                  |> group(columns: [])
161                  |> sort(columns: ["_time"])
162              """
163          elif start and end and session_id:
164              query = f"""
165                  from(bucket: "{bucket}")
166                  |> range(start: {start}, stop: {end})
167                  |> filter(fn: (r) => r["_measurement"] == "{measurement}")
168                  |> filter(fn: (r) => r["SessionId"] == "{session_id}")
169                  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
170                  |> group(columns: [])
171                  |> sort(columns: ["_time"])
172              """
173          else:
174              query = f"""
175                  from(bucket: "{bucket}")
176                  |> range(start: -10y, stop: now())
177                  |> filter(fn: (r) => r["_measurement"] == "{measurement}")
178                  |> filter(fn: (r) => r["SessionId"] == "{session_id}")
179                  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
180                  |> group(columns: [])
181                  |> sort(columns: ["_time"])
182              """
183  
184          logging.debug(query)
185          records = self.query_api.query_stream(query=query)
186          for record in records:
187              yield record
188  
189      def raw_stream(
190          self,
191          start="-1d",
192          end="now()",
193          measurement="laps_cc",
194          bucket="racing",
195          delta="5m",
196      ):
197          end_d = datetime.now()
198          start_d = datetime.now()
199          if delta.endswith("d"):
200              delta = timedelta(days=int(delta[:-1]))
201          elif delta.endswith("h"):
202              delta = timedelta(hours=int(delta[:-1]))
203          elif delta.endswith("m"):
204              delta = timedelta(hours=int(delta[:-1]))
205  
206          if start.startswith("-"):
207              if start.endswith("d"):
208                  start_d = end_d - timedelta(days=int(start[1:-1]))
209              elif start.endswith("h"):
210                  start_d = end_d - timedelta(hours=int(start[1:-1]))
211              elif start.endswith("m"):
212                  start_d = end_d - timedelta(minutes=int(start[1:-1]))
213                  delta = timedelta(minutes=1)
214          else:
215              start_d = parser.parse(start)
216  
217          while start_d < end_d:
218              start = start_d.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
219              end = (start_d + delta).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
220              logging.debug(f"start: {start}, end: {end}")
221  
222              query = f"""
223                      from(bucket: "{bucket}")
224                      |> range(start: {start}, stop: {end})
225                      |> filter(fn: (r) => r["_measurement"] == "{measurement}")
226                      |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
227                      |> group(columns: [])
228                      |> sort(columns: ["_time"])
229                  """
230              print(query)
231              records = self.query_api.query_stream(query=query)
232              for record in records:
233                  yield record
234  
235              start_d = start_d + delta
236  
237      def sessions(self, start="-1d", stop="now()"):
238          query = f"""
239              import "influxdata/influxdb/schema"
240              schema.tagValues(
241                  bucket: "racing",
242                  tag: "SessionId",
243                  start: {start},
244                  stop: {stop}
245              )
246          """
247          records = self.query_api.query_stream(query=query)
248          for record in records:
249              yield record["_value"]
250  
251      def session_ids(self, measurement="fast_laps", bucket="racing"):
252          query = f"""
253              import "influxdata/influxdb/schema"
254              schema.tagValues(
255                  bucket: "{bucket}",
256                  tag: "SessionId",
257                  predicate: (r) => r["_measurement"] == "{measurement}",
258                  start: -10y,
259                  stop: now()
260              )
261          """
262          records = self.query_api.query_stream(query=query)
263          ids = set()
264          for record in records:
265              ids.add(record["_value"])
266  
267          return ids
268  
269      def session_df(
270          self,
271          session_id,
272          lap_number=None,
273          start="-1d",
274          end="now()",
275          measurement="laps_cc",
276          bucket="racing",
277          aggregate="",
278          fields=[],
279          drop_tags=False,
280      ):
281          if isinstance(start, datetime):
282              start = start.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
283          if isinstance(end, datetime):
284              end = end.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
285  
286          query = f"""
287          from(bucket: "{bucket}")
288          |> range(start: {start}, stop: {end})
289          |> filter(fn: (r) => r["_measurement"] == "{measurement}")
290          |> filter(fn: (r) => r["SessionId"] == "{session_id}")
291          """
292  
293          if fields:
294              query += f"""
295              |> filter(fn: (r) => r["_field"] == "{fields[0]}")
296              """
297              for field in fields[1:]:
298                  query += f"""
299                  or r["_field"] == "{field}"
300                  """
301              query += ")\n"
302  
303          if aggregate:
304              # downsample to 1Hz
305              query += f"""
306              |> aggregateWindow(every: {aggregate}, fn: last, createEmpty: false)
307              """
308  
309          query += """
310          |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
311          """
312  
313          if lap_number:
314              query += f"""
315          |> filter(fn: (r) => r["CurrentLap"] == "{lap_number}")
316          """
317  
318          query += """
319          |> sort(columns: ["_time"], desc: false)
320          """
321  
322          # drop columns that are not needed
323          if drop_tags:
324              query += """
325              |> drop(columns: ["_start", "_stop", "_measurement", "host", "topic", "user"])
326              """
327  
328          df = self.query_api.query_data_frame(query=query)
329          if df.empty:
330              raise Exception(f"No data found for {session_id} lap {lap_number}")
331  
332          game = df["GameName"].iloc[0]
333          if game == "Assetto Corsa Competizione":
334              # flip y axis
335              df["x"] = df["WorldPosition_x"]
336              df["y"] = df["WorldPosition_z"] * -1
337          if game == "Automobilista 2":
338              df["x"] = df["WorldPosition_x"]
339              df["y"] = df["WorldPosition_z"]
340  
341          df["id"] = df["SessionId"].astype(str) + "-" + df["CurrentLap"].astype(str)
342  
343          df = df[df["Gear"] != 0]
344  
345          return df
346  
347      # https://community.influxdata.com/t/how-to-copy-data-between-measurements/22582/14
348      def copy_session(
349          self,
350          session_id,
351          start="-1d",
352          end="now()",
353          from_bucket="racing",
354          to_bucket="fast_laps",
355      ):
356          if isinstance(start, datetime):
357              start = start.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
358          if isinstance(end, datetime):
359              end = end.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
360  
361          query = f"""
362              from(bucket: "{from_bucket}")
363              |> range(start: -10y, stop: now())
364              |> filter(fn: (r) => r._measurement == "laps_cc")
365              |> filter(fn: (r) => r["SessionId"] == "{session_id}")
366              |> set(key: "_measurement", value: "fast_laps")
367              |> to(bucket: "{to_bucket}")
368              |> filter(fn: (r) => false)
369              |> yield()
370          """
371          # logging.debug(query)
372  
373          # asyncio.run(self.run_query_async(query))
374          records = self.query_api.query_stream(query)
375          for record in records:
376              pass
377  
378      async def run_query_async(self, query):
379          async with InfluxDBClientAsync(url=self.url, token=self.token, org=self.org) as client:
380              # Stream of FluxRecords
381              query_api = client.query_api()
382              records = await query_api.query_stream(query)
383              async for record in records:
384                  print(".", end="", flush=True)
385  
386      def delete_data(self, start="", end=""):
387          if isinstance(start, datetime):
388              start = start.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
389          if isinstance(end, datetime):
390              end = end.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
391  
392          predicate = """
393              _measurement="laps_cc"
394          """
395  
396          self.influx.delete_api().delete(start=start, stop=end, predicate=predicate, bucket="racing")
397  
398  
399  # def track(influx):
400  #     query_api = influx.query_api()
401  #     query = f"""
402  #     from(bucket: "racing")
403  #         |> range(start:-10d, stop: now())
404  #         |> filter(fn: (r) => r._field == "DistanceRoundTrack" )
405  #         |> last()
406  #         |> limit(n: 1)
407  #     """
408  
409  #     records = query_api.query_stream(org=ORG, query=query)
410  
411  #     tracks = {}
412  #     engine = create_engine(models.construct_connection_string(), echo=True)
413  #     session = Session(engine)
414  
415  #     for record in records:
416  #         name = record["TrackCode"]
417  #         length = record["_value"]
418  #         if name not in tracks:
419  #             track = Track(name=name, length=length)
420  #             tracks[name] = track
421  #             session.add(track)
422  #         else:
423  #             track = tracks[name]
424  #             if length > track.length:
425  #                 track.length = length
426  
427  #     session.commit()
428  
429  
430  # def init_db():
431  #     engine = create_engine(models.construct_connection_string(), echo=True)
432  #     models.drop_tables(engine)
433  #     models.create_tables(engine)
434  
435  
436  # if __name__ == "__main__":
437  #     if os.getenv("DEBUG", "1") == "1":
438  #         _LOGGER.setLevel(logging.DEBUG)
439  #         _LOGGER.debug("Debug mode enabled")
440  
441  #     # logging.debug("")
442  #     # init_db()
443  #     # sessions(influx)
444  #     for session in sessions(influx):
445  #         print(session)
446  #         analyze_session(session, influx)