influx.py
1 import csv 2 import logging 3 import sys 4 import warnings 5 from datetime import datetime, timedelta 6 7 import influxdb_client 8 from dateutil import parser 9 10 # import asyncio 11 from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync 12 from influxdb_client.client.warnings import MissingPivotFunction 13 14 from .utils import get_influxdb2_config 15 16 warnings.simplefilter("ignore", MissingPivotFunction) 17 18 19 class Influx: 20 def __init__(self): 21 # configure influxdb client 22 (self.org, self.token, self.url) = get_influxdb2_config() 23 24 self.influx = influxdb_client.InfluxDBClient( 25 url=self.url, token=self.token, org=self.org, timeout=(10_000, 600_000) 26 ) 27 if self.influx.ping(): 28 logging.debug(f"Influx: Connected to {self.url}") 29 else: 30 logging.error(f"Influx: Connection to {self.url} failed") 31 sys.exit(1) 32 33 self.query_api = self.influx.query_api() 34 35 def laps_from_file(self, filename="tracks.csv"): 36 tracks = {} 37 # open csv file with track data and read it into dictionary 38 with open(filename) as f: 39 reader = csv.DictReader(f) 40 for row in reader: 41 if row["game"] not in tracks: 42 tracks[row["game"]] = {} 43 if row["track"] not in tracks[row["game"]]: 44 tracks[row["game"]][row["track"]] = {} 45 if row["car"] not in tracks[row["game"]][row["track"]]: 46 tracks[row["game"]][row["track"]][row["car"]] = {} 47 if row["session"] not in tracks[row["game"]][row["track"]][row["car"]]: 48 tracks[row["game"]][row["track"]][row["car"]][row["session"]] = [] 49 tracks[row["game"]][row["track"]][row["car"]][row["session"]].append( 50 { 51 "lap": row["lap"], 52 "start": row["start"], 53 "end": row["end"], 54 "time": row["time"], 55 "length": row["length"], 56 } 57 ) 58 self.tracks = tracks 59 60 def telemetry_for(self, game="", track="", car=""): 61 data = [] 62 fgame = game 63 ftrack = track 64 fcar = car 65 for game, tracks in self.tracks.items(): 66 for track, cars in tracks.items(): 67 for car, sessions in cars.items(): 68 if game != fgame or track != ftrack or car != fcar: 69 continue 70 for session, laps in sessions.items(): 71 for lap in laps: 72 lap_number = lap["lap"] 73 lap_time = lap["time"] 74 length = lap["length"] 75 logging.info( 76 f"Processing {game} {track} : session {session} : " 77 + f"lap #{lap_number: >2} : length {length} : time {lap_time}" 78 ) 79 # 2022-12-05 19:52:18.141110+00:00 80 start = datetime.strptime(lap["start"], "%Y-%m-%d %H:%M:%S.%f%z") 81 end = datetime.strptime(lap["end"], "%Y-%m-%d %H:%M:%S.%f%z") 82 83 df = self.session_df( 84 session, 85 lap_number=lap["lap"], 86 start=start, 87 end=end, 88 measurement="fast_laps", 89 bucket="fast_laps", 90 ) 91 data.append(df) 92 return data 93 94 def telemetry_for_laps(self, laps=[], measurement="laps_cc", bucket="racing"): 95 data = [] 96 for lap in laps: 97 game = lap.session.game.name 98 session = lap.session.session_id 99 track = lap.track.name 100 lap_number = lap.number 101 102 logging.info(f"Fetching telemetry for {game} - {track} - {lap.car}") 103 logging.info(f" track.id {lap.track.id} car.id {lap.car.id}") 104 logging.info(f" session {session} lap.id {lap.id} number {lap_number}") 105 logging.info(f" length {lap.length} time {lap.time} valid {lap.valid}") 106 logging.info(f" start {lap.start} end {lap.end}") 107 108 try: 109 df = self.session_df( 110 session, 111 lap_number=lap_number, 112 start=lap.start, 113 end=lap.end, 114 measurement=measurement, 115 bucket=bucket, 116 ) 117 if len(df) > 100: 118 data.append(df) 119 except Exception as e: 120 logging.error(e) 121 122 return data 123 124 def session( 125 self, 126 session_id=None, 127 lap=None, 128 lap_numbers=[], 129 start=None, 130 end=None, 131 measurement="laps_cc", 132 bucket="racing", 133 ): 134 lap_filter = [] 135 136 if start and end and session_id: 137 # subtract one hour from start and add one hour to end 138 start = parser.parse(start) - timedelta(hours=1) 139 end = parser.parse(end) - timedelta(hours=1) 140 start = start.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 141 end = end.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 142 if lap: 143 start = lap.start.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 144 end = lap.end.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 145 session_id = lap.session.session_id 146 147 logging.debug(f"session_id: {session_id}, start: {start}, end: {end}, lap_numbers: {lap_numbers}") 148 149 if lap_numbers and session_id: 150 for lap_number in lap_numbers: 151 lap_filter.append(f'r["CurrentLap"] == "{lap_number}"') 152 153 query = f""" 154 from(bucket: "{bucket}") 155 |> range(start: -10y, stop: now()) 156 |> filter(fn: (r) => r["_measurement"] == "{measurement}") 157 |> filter(fn: (r) => r["SessionId"] == "{session_id}") 158 |> filter(fn: (r) => {' or '.join(lap_filter)}) 159 |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") 160 |> group(columns: []) 161 |> sort(columns: ["_time"]) 162 """ 163 elif start and end and session_id: 164 query = f""" 165 from(bucket: "{bucket}") 166 |> range(start: {start}, stop: {end}) 167 |> filter(fn: (r) => r["_measurement"] == "{measurement}") 168 |> filter(fn: (r) => r["SessionId"] == "{session_id}") 169 |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") 170 |> group(columns: []) 171 |> sort(columns: ["_time"]) 172 """ 173 else: 174 query = f""" 175 from(bucket: "{bucket}") 176 |> range(start: -10y, stop: now()) 177 |> filter(fn: (r) => r["_measurement"] == "{measurement}") 178 |> filter(fn: (r) => r["SessionId"] == "{session_id}") 179 |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") 180 |> group(columns: []) 181 |> sort(columns: ["_time"]) 182 """ 183 184 logging.debug(query) 185 records = self.query_api.query_stream(query=query) 186 for record in records: 187 yield record 188 189 def raw_stream( 190 self, 191 start="-1d", 192 end="now()", 193 measurement="laps_cc", 194 bucket="racing", 195 delta="5m", 196 ): 197 end_d = datetime.now() 198 start_d = datetime.now() 199 if delta.endswith("d"): 200 delta = timedelta(days=int(delta[:-1])) 201 elif delta.endswith("h"): 202 delta = timedelta(hours=int(delta[:-1])) 203 elif delta.endswith("m"): 204 delta = timedelta(hours=int(delta[:-1])) 205 206 if start.startswith("-"): 207 if start.endswith("d"): 208 start_d = end_d - timedelta(days=int(start[1:-1])) 209 elif start.endswith("h"): 210 start_d = end_d - timedelta(hours=int(start[1:-1])) 211 elif start.endswith("m"): 212 start_d = end_d - timedelta(minutes=int(start[1:-1])) 213 delta = timedelta(minutes=1) 214 else: 215 start_d = parser.parse(start) 216 217 while start_d < end_d: 218 start = start_d.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 219 end = (start_d + delta).strftime("%Y-%m-%dT%H:%M:%S.%fZ") 220 logging.debug(f"start: {start}, end: {end}") 221 222 query = f""" 223 from(bucket: "{bucket}") 224 |> range(start: {start}, stop: {end}) 225 |> filter(fn: (r) => r["_measurement"] == "{measurement}") 226 |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") 227 |> group(columns: []) 228 |> sort(columns: ["_time"]) 229 """ 230 print(query) 231 records = self.query_api.query_stream(query=query) 232 for record in records: 233 yield record 234 235 start_d = start_d + delta 236 237 def sessions(self, start="-1d", stop="now()"): 238 query = f""" 239 import "influxdata/influxdb/schema" 240 schema.tagValues( 241 bucket: "racing", 242 tag: "SessionId", 243 start: {start}, 244 stop: {stop} 245 ) 246 """ 247 records = self.query_api.query_stream(query=query) 248 for record in records: 249 yield record["_value"] 250 251 def session_ids(self, measurement="fast_laps", bucket="racing"): 252 query = f""" 253 import "influxdata/influxdb/schema" 254 schema.tagValues( 255 bucket: "{bucket}", 256 tag: "SessionId", 257 predicate: (r) => r["_measurement"] == "{measurement}", 258 start: -10y, 259 stop: now() 260 ) 261 """ 262 records = self.query_api.query_stream(query=query) 263 ids = set() 264 for record in records: 265 ids.add(record["_value"]) 266 267 return ids 268 269 def session_df( 270 self, 271 session_id, 272 lap_number=None, 273 start="-1d", 274 end="now()", 275 measurement="laps_cc", 276 bucket="racing", 277 aggregate="", 278 fields=[], 279 drop_tags=False, 280 ): 281 if isinstance(start, datetime): 282 start = start.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 283 if isinstance(end, datetime): 284 end = end.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 285 286 query = f""" 287 from(bucket: "{bucket}") 288 |> range(start: {start}, stop: {end}) 289 |> filter(fn: (r) => r["_measurement"] == "{measurement}") 290 |> filter(fn: (r) => r["SessionId"] == "{session_id}") 291 """ 292 293 if fields: 294 query += f""" 295 |> filter(fn: (r) => r["_field"] == "{fields[0]}") 296 """ 297 for field in fields[1:]: 298 query += f""" 299 or r["_field"] == "{field}" 300 """ 301 query += ")\n" 302 303 if aggregate: 304 # downsample to 1Hz 305 query += f""" 306 |> aggregateWindow(every: {aggregate}, fn: last, createEmpty: false) 307 """ 308 309 query += """ 310 |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") 311 """ 312 313 if lap_number: 314 query += f""" 315 |> filter(fn: (r) => r["CurrentLap"] == "{lap_number}") 316 """ 317 318 query += """ 319 |> sort(columns: ["_time"], desc: false) 320 """ 321 322 # drop columns that are not needed 323 if drop_tags: 324 query += """ 325 |> drop(columns: ["_start", "_stop", "_measurement", "host", "topic", "user"]) 326 """ 327 328 df = self.query_api.query_data_frame(query=query) 329 if df.empty: 330 raise Exception(f"No data found for {session_id} lap {lap_number}") 331 332 game = df["GameName"].iloc[0] 333 if game == "Assetto Corsa Competizione": 334 # flip y axis 335 df["x"] = df["WorldPosition_x"] 336 df["y"] = df["WorldPosition_z"] * -1 337 if game == "Automobilista 2": 338 df["x"] = df["WorldPosition_x"] 339 df["y"] = df["WorldPosition_z"] 340 341 df["id"] = df["SessionId"].astype(str) + "-" + df["CurrentLap"].astype(str) 342 343 df = df[df["Gear"] != 0] 344 345 return df 346 347 # https://community.influxdata.com/t/how-to-copy-data-between-measurements/22582/14 348 def copy_session( 349 self, 350 session_id, 351 start="-1d", 352 end="now()", 353 from_bucket="racing", 354 to_bucket="fast_laps", 355 ): 356 if isinstance(start, datetime): 357 start = start.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 358 if isinstance(end, datetime): 359 end = end.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 360 361 query = f""" 362 from(bucket: "{from_bucket}") 363 |> range(start: -10y, stop: now()) 364 |> filter(fn: (r) => r._measurement == "laps_cc") 365 |> filter(fn: (r) => r["SessionId"] == "{session_id}") 366 |> set(key: "_measurement", value: "fast_laps") 367 |> to(bucket: "{to_bucket}") 368 |> filter(fn: (r) => false) 369 |> yield() 370 """ 371 # logging.debug(query) 372 373 # asyncio.run(self.run_query_async(query)) 374 records = self.query_api.query_stream(query) 375 for record in records: 376 pass 377 378 async def run_query_async(self, query): 379 async with InfluxDBClientAsync(url=self.url, token=self.token, org=self.org) as client: 380 # Stream of FluxRecords 381 query_api = client.query_api() 382 records = await query_api.query_stream(query) 383 async for record in records: 384 print(".", end="", flush=True) 385 386 def delete_data(self, start="", end=""): 387 if isinstance(start, datetime): 388 start = start.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 389 if isinstance(end, datetime): 390 end = end.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 391 392 predicate = """ 393 _measurement="laps_cc" 394 """ 395 396 self.influx.delete_api().delete(start=start, stop=end, predicate=predicate, bucket="racing") 397 398 399 # def track(influx): 400 # query_api = influx.query_api() 401 # query = f""" 402 # from(bucket: "racing") 403 # |> range(start:-10d, stop: now()) 404 # |> filter(fn: (r) => r._field == "DistanceRoundTrack" ) 405 # |> last() 406 # |> limit(n: 1) 407 # """ 408 409 # records = query_api.query_stream(org=ORG, query=query) 410 411 # tracks = {} 412 # engine = create_engine(models.construct_connection_string(), echo=True) 413 # session = Session(engine) 414 415 # for record in records: 416 # name = record["TrackCode"] 417 # length = record["_value"] 418 # if name not in tracks: 419 # track = Track(name=name, length=length) 420 # tracks[name] = track 421 # session.add(track) 422 # else: 423 # track = tracks[name] 424 # if length > track.length: 425 # track.length = length 426 427 # session.commit() 428 429 430 # def init_db(): 431 # engine = create_engine(models.construct_connection_string(), echo=True) 432 # models.drop_tables(engine) 433 # models.create_tables(engine) 434 435 436 # if __name__ == "__main__": 437 # if os.getenv("DEBUG", "1") == "1": 438 # _LOGGER.setLevel(logging.DEBUG) 439 # _LOGGER.debug("Debug mode enabled") 440 441 # # logging.debug("") 442 # # init_db() 443 # # sessions(influx) 444 # for session in sessions(influx): 445 # print(session) 446 # analyze_session(session, influx)