/ components / paddock / telemetry / utils.py
utils.py
 1  import os
 2  
 3  from paddock.exceptions import RuntimeEnvironmentConfigurationIncompleteError
 4  
 5  
 6  def get_influxdb2_config() -> tuple[str, str, str] | RuntimeEnvironmentConfigurationIncompleteError:
 7      """Get InfluxDB2 configuration from environment variables."""
 8      org = os.environ.get("B4MAD_RACING_INFLUX_ORG", "b4mad")
 9  
10      _INFLUXDB2_TOKEN = os.environ.get("B4MAD_RACING_INFLUX_TOKEN")  # noqa: N806
11  
12      if _INFLUXDB2_TOKEN is None or _INFLUXDB2_TOKEN == "":
13          raise RuntimeEnvironmentConfigurationIncompleteError(
14              "B4MAD_RACING_INFLUX_TOKEN",
15          )
16  
17      token = _INFLUXDB2_TOKEN
18  
19      _INFLUXDB2_SERVICE_HOST = os.environ.get("INFLUXDB2_SERVICE_HOST")  # noqa: N806
20      _INFLUXDB2_SERVICE_PORT = os.environ.get("INFLUXDB2_SERVICE_PORT", 8086)  # noqa: N806
21      _INFLUXDB2_SERVICE_PROTOCOL = os.environ.get("INFLUXDB2_SERVICE_PROTOCOL", "http")  # noqa: N806
22  
23      if _INFLUXDB2_SERVICE_HOST is None:
24          raise RuntimeEnvironmentConfigurationIncompleteError(
25              "INFLUXDB2_SERVICE_HOST",
26          )
27  
28      url = f"{_INFLUXDB2_SERVICE_PROTOCOL}://{_INFLUXDB2_SERVICE_HOST}:{_INFLUXDB2_SERVICE_PORT}/"
29  
30      return (org, token, url)
31  
32  
33  def get_mqtt_config() -> tuple[str, int, str, str] | RuntimeEnvironmentConfigurationIncompleteError:
34      """Get MQTT configuration from environment variables."""
35      _B4MAD_RACING_MQTT_HOST = os.environ.get("MOSQUITTO_MQTT_SERVICE_HOST")  # noqa: N806
36      _B4MAD_RACING_MQTT_PORT = int(os.environ.get("MOSQUITTO_MQTT_SERVICE_PORT", 1883))  # noqa: N806
37      _B4MAD_RACING_MQTT_USER = os.environ.get("B4MAD_RACING_MQTT_USER", "crewchief")  # noqa: N806
38      _B4MAD_RACING_MQTT_PASSWORD = os.environ.get("B4MAD_RACING_MQTT_PASSWORD", "crewchief")  # noqa: N806
39  
40      if _B4MAD_RACING_MQTT_HOST is None:
41          raise RuntimeEnvironmentConfigurationIncompleteError("MOSQUITTO_MQTT_SERVICE_HOST")
42  
43      return (
44          _B4MAD_RACING_MQTT_HOST,
45          _B4MAD_RACING_MQTT_PORT,
46          _B4MAD_RACING_MQTT_USER,
47          _B4MAD_RACING_MQTT_PASSWORD,
48      )