utils.py
1 import os 2 3 from paddock.exceptions import RuntimeEnvironmentConfigurationIncompleteError 4 5 6 def get_influxdb2_config() -> tuple[str, str, str] | RuntimeEnvironmentConfigurationIncompleteError: 7 """Get InfluxDB2 configuration from environment variables.""" 8 org = os.environ.get("B4MAD_RACING_INFLUX_ORG", "b4mad") 9 10 _INFLUXDB2_TOKEN = os.environ.get("B4MAD_RACING_INFLUX_TOKEN") # noqa: N806 11 12 if _INFLUXDB2_TOKEN is None or _INFLUXDB2_TOKEN == "": 13 raise RuntimeEnvironmentConfigurationIncompleteError( 14 "B4MAD_RACING_INFLUX_TOKEN", 15 ) 16 17 token = _INFLUXDB2_TOKEN 18 19 _INFLUXDB2_SERVICE_HOST = os.environ.get("INFLUXDB2_SERVICE_HOST") # noqa: N806 20 _INFLUXDB2_SERVICE_PORT = os.environ.get("INFLUXDB2_SERVICE_PORT", 8086) # noqa: N806 21 _INFLUXDB2_SERVICE_PROTOCOL = os.environ.get("INFLUXDB2_SERVICE_PROTOCOL", "http") # noqa: N806 22 23 if _INFLUXDB2_SERVICE_HOST is None: 24 raise RuntimeEnvironmentConfigurationIncompleteError( 25 "INFLUXDB2_SERVICE_HOST", 26 ) 27 28 url = f"{_INFLUXDB2_SERVICE_PROTOCOL}://{_INFLUXDB2_SERVICE_HOST}:{_INFLUXDB2_SERVICE_PORT}/" 29 30 return (org, token, url) 31 32 33 def get_mqtt_config() -> tuple[str, int, str, str] | RuntimeEnvironmentConfigurationIncompleteError: 34 """Get MQTT configuration from environment variables.""" 35 _B4MAD_RACING_MQTT_HOST = os.environ.get("MOSQUITTO_MQTT_SERVICE_HOST") # noqa: N806 36 _B4MAD_RACING_MQTT_PORT = int(os.environ.get("MOSQUITTO_MQTT_SERVICE_PORT", 1883)) # noqa: N806 37 _B4MAD_RACING_MQTT_USER = os.environ.get("B4MAD_RACING_MQTT_USER", "crewchief") # noqa: N806 38 _B4MAD_RACING_MQTT_PASSWORD = os.environ.get("B4MAD_RACING_MQTT_PASSWORD", "crewchief") # noqa: N806 39 40 if _B4MAD_RACING_MQTT_HOST is None: 41 raise RuntimeEnvironmentConfigurationIncompleteError("MOSQUITTO_MQTT_SERVICE_HOST") 42 43 return ( 44 _B4MAD_RACING_MQTT_HOST, 45 _B4MAD_RACING_MQTT_PORT, 46 _B4MAD_RACING_MQTT_USER, 47 _B4MAD_RACING_MQTT_PASSWORD, 48 )