/ adafruit_gc_iot_core.py
adafruit_gc_iot_core.py
1 # Copyright 2019 Google Inc. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 # 15 # Modified by Brent Rubell for Adafruit Industries, 2019 16 """ 17 `adafruit_gc_iot_core` 18 ================================================================================ 19 20 CircuitPython Google Cloud IoT Module 21 22 * Author(s): Brent Rubell, Google Inc. 23 24 Implementation Notes 25 -------------------- 26 27 **Software and Dependencies:** 28 29 * Adafruit CircuitPython firmware for the supported boards: 30 https://github.com/adafruit/circuitpython/releases 31 32 * Adafruit CircuitPython JWT Module: 33 https://github.com/adafruit/Adafruit_CircuitPython_JWT 34 35 * Adafruit CircuitPython Logging Module: 36 https://github.com/adafruit/Adafruit_CircuitPython_Logging 37 38 """ 39 import time 40 41 import adafruit_logging as logging 42 from adafruit_jwt import JWT 43 import adafruit_ntp as NTP 44 45 __version__ = "0.0.0-auto.0" 46 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_GC_IOT_Core.git" 47 48 49 class MQTT_API_ERROR(Exception): 50 """Exception raised on MQTT API return-code errors.""" 51 52 # pylint: disable=unnecessary-pass 53 pass 54 55 56 class MQTT_API: 57 """Client for interacting with Google's Cloud Core MQTT API. 58 59 :param MiniMQTT mqtt_client: MiniMQTT Client object. 60 61 """ 62 63 def __init__(self, mqtt_client): 64 # Check that provided object is a MiniMQTT client object 65 mqtt_client_type = str(type(mqtt_client)) 66 if "MQTT" in mqtt_client_type: 67 self._client = mqtt_client 68 else: 69 raise TypeError( 70 "This class requires a MiniMQTT client object, please create one." 71 ) 72 # Verify that the MiniMQTT client was setup correctly. 73 try: 74 self.user = self._client.user 75 except: 76 raise TypeError("Google Cloud Core IoT MQTT API requires a username.") 77 # Validate provided JWT before connecting 78 try: 79 JWT.validate(self._client.password) 80 except: 81 raise TypeError("Invalid JWT provided.") 82 # If client has KeepAlive =0 or if KeepAlive > 20min, 83 # set KeepAlive to 19 minutes to avoid disconnection 84 # due to Idle Time (https://cloud.google.com/iot/quotas). 85 if self._client.keep_alive == 0 or self._client.keep_alive >= 1200: 86 self._client.keep_alive = 1140 87 # User-defined MQTT callback methods must be init'd to None 88 self.on_connect = None 89 self.on_disconnect = None 90 self.on_message = None 91 self.on_subscribe = None 92 self.on_unsubscribe = None 93 # MQTT event callbacks 94 self._client.on_connect = self._on_connect_mqtt 95 self._client.on_disconnect = self._on_disconnect_mqtt 96 self._client.on_message = self._on_message_mqtt 97 self._client.on_subscribe = self._on_subscribe_mqtt 98 self._client.on_unsubscribe = self._on_unsubscribe_mqtt 99 self.logger = False 100 if self._client.logger is not None: 101 # Allow MQTT_API to utilize MiniMQTT Client's logger 102 self.logger = True 103 self._client.set_logger_level("DEBUG") 104 self._connected = False 105 # Set up a device identifier by splitting out the full CID 106 self.device_id = self._client.client_id.split("/")[7] 107 108 def __enter__(self): 109 return self 110 111 def __exit__(self, exception_type, exception_value, traceback): 112 self.disconnect() 113 114 def disconnect(self): 115 """Disconnects from the Google MQTT Broker. 116 """ 117 try: 118 self._client.disconnect() 119 except: 120 raise ValueError("Unable to disconnect from Google's MQTT broker.") 121 self._connected = False 122 # Reset all user-defined callbacks 123 self.on_connect = None 124 self.on_disconnect = None 125 self.on_message = None 126 self.on_subscribe = None 127 self.on_unsubscribe = None 128 # De-initialize MiniMQTT Client 129 self._client.deinit() 130 131 def reconnect(self): 132 """Reconnects to the Google MQTT Broker. 133 """ 134 try: 135 self._client.reconnect() 136 except: 137 raise MQTT_API_ERROR("Error reconnecting to Google MQTT.") 138 139 def connect(self): 140 """Connects to the Google MQTT Broker. 141 """ 142 self._client.connect() 143 self._connected = True 144 145 @property 146 def is_connected(self): 147 """Returns if client is connected to Google's MQTT broker. 148 """ 149 return self._connected 150 151 # pylint: disable=not-callable, unused-argument 152 def _on_connect_mqtt(self, client, userdata, flags, return_code): 153 """Runs when the mqtt client calls on_connect. 154 """ 155 if self.logger: 156 self._client.logger.debug("Client called on_connect.") 157 if return_code == 0: 158 self._connected = True 159 else: 160 raise MQTT_API_ERROR(return_code) 161 # Call the user-defined on_connect callback if defined 162 if self.on_connect is not None: 163 self.on_connect(self, userdata, flags, return_code) 164 165 # pylint: disable=not-callable, unused-argument 166 def _on_disconnect_mqtt(self, client, userdata, return_code): 167 """Runs when the client calls on_disconnect. 168 """ 169 if self.logger: 170 self._client.logger.debug("Client called on_disconnect") 171 self._connected = False 172 # Call the user-defined on_disconnect callblack if defined 173 if self.on_disconnect is not None: 174 self.on_disconnect(self) 175 176 # pylint: disable=not-callable 177 def _on_message_mqtt(self, client, topic, payload): 178 """Runs when the client calls on_message. 179 """ 180 if self.logger: 181 self._client.logger.debug("Client called on_message") 182 if self.on_message is not None: 183 self.on_message(self, topic, payload) 184 185 # pylint: disable=not-callable 186 def _on_subscribe_mqtt(self, client, user_data, topic, qos): 187 """Runs when the client calls on_subscribe. 188 """ 189 if self.logger: 190 self._client.logger.debug("Client called on_subscribe") 191 if self.on_subscribe is not None: 192 self.on_subscribe(self, user_data, topic, qos) 193 194 # pylint: disable=not-callable 195 def _on_unsubscribe_mqtt(self, client, user_data, topic, pid): 196 """Runs when the client calls on_unsubscribe. 197 """ 198 if self.logger: 199 self._client.logger.debug("Client called on_unsubscribe") 200 if self.on_unsubscribe is not None: 201 self.on_unsubscribe(self, user_data, topic, pid) 202 203 def loop(self): 204 """Maintains a connection with Google Cloud IoT Core's MQTT broker. You will 205 need to manually call this method within a loop to retain connection. 206 207 Example of "pumping" a Google Core IoT loop. 208 ..code-block:: python 209 210 while True: 211 google_iot.loop() 212 213 """ 214 if self._connected: 215 self._client.loop() 216 217 def unsubscribe(self, topic, subfolder=None): 218 """Unsubscribes from a Google Cloud IoT device topic. 219 :param str topic: Required MQTT topic. Defaults to events. 220 :param str subfolder: Optional MQTT topic subfolder. Defaults to None. 221 222 """ 223 if subfolder is not None: 224 mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder) 225 else: 226 mqtt_topic = "/devices/{}/{}".format(self.device_id, topic) 227 self._client.unsubscribe(mqtt_topic) 228 229 def unsubscribe_from_all_commands(self): 230 """Unsubscribes from a device's "commands/#" topic. 231 :param int qos: Quality of Service level for the message. 232 233 """ 234 self.unsubscribe("commands/#") 235 236 def subscribe(self, topic, subfolder=None, qos=1): 237 """Subscribes to a Google Cloud IoT device topic. 238 :param str topic: Required MQTT topic. Defaults to events. 239 :param str subfolder: Optional MQTT topic subfolder. Defaults to None. 240 :param int qos: Quality of Service level for the message. 241 242 """ 243 if subfolder is not None: 244 mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder) 245 else: 246 mqtt_topic = "/devices/{}/{}".format(self.device_id, topic) 247 self._client.subscribe(mqtt_topic, qos) 248 249 def subscribe_to_subfolder(self, topic, subfolder, qos=1): 250 """Subscribes to a Google Cloud IoT device's topic subfolder 251 :param str topic: Required MQTT topic. 252 :param str subfolder: Optional MQTT topic subfolder. Defaults to None. 253 :param int qos: Quality of Service level for the message. 254 255 """ 256 self.subscribe(topic, subfolder, qos) 257 258 def subscribe_to_config(self, qos=1): 259 """Subscribes to a Google Cloud IoT device's configuration 260 topic. 261 :param int qos: Quality of Service level for the message. 262 263 """ 264 self.subscribe("config", qos=qos) 265 266 def subscribe_to_all_commands(self, qos=1): 267 """Subscribes to a device's "commands/#" topic. 268 :param int qos: Quality of Service level for the message. 269 270 """ 271 self.subscribe("commands/#", qos=qos) 272 273 def publish(self, payload, topic="events", subfolder=None, qos=0): 274 """Publishes a payload from the device to its Google Cloud IoT 275 device topic, defaults to "events" topic. To send state, use the 276 publish_state method. 277 278 :param int payload: Data to publish to Google Cloud IoT 279 :param str payload: Data to publish to Google Cloud IoT 280 :param float payload: Data to publish to Google Cloud IoT 281 :param str topic: Required MQTT topic. Defaults to events. 282 :param str subfolder: Optional MQTT topic subfolder. Defaults to None. 283 :param int qos: Quality of Service level for the message. 284 285 """ 286 if subfolder is not None: 287 mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder) 288 elif topic is not None: 289 mqtt_topic = "/devices/{}/{}".format(self.device_id, topic) 290 elif topic == "state" and subfolder is not None: 291 raise ValueError("Subfolders are not supported for state messages.") 292 else: 293 raise TypeError("A topic string must be specified.") 294 self._client.publish(mqtt_topic, payload, qos=qos) 295 296 def publish_state(self, payload): 297 """Publishes a device state message to the Cloud IoT MQTT API. Data 298 sent by this method should be information about the device itself (such as number of 299 crashes, battery level, or device health). This method is unidirectional, 300 it communicates Device-to-Cloud only. 301 302 """ 303 self._client.publish(payload, "state") 304 305 306 # pylint: disable=too-many-instance-attributes 307 class Cloud_Core: 308 """CircuitPython Google Cloud IoT Core module. 309 310 :param ESP_SPIcontrol esp: ESP32SPI object. 311 :param dict secrets: Secrets.py file. 312 :param bool log: Enable Cloud_Core logging, defaults to False. 313 314 """ 315 316 def __init__(self, esp, secrets, log=False): 317 self._esp = esp 318 # Validate Secrets 319 if hasattr(secrets, "keys"): 320 self._secrets = secrets 321 else: 322 raise AttributeError( 323 "Project settings are kept in secrets.py, please add them there!" 324 ) 325 self.logger = None 326 if log is True: 327 self.logger = logging.getLogger("log") 328 self.logger.setLevel(logging.DEBUG) 329 # Configuration, from secrets file 330 self._proj_id = secrets["project_id"] 331 self._region = secrets["cloud_region"] 332 self._reg_id = secrets["registry_id"] 333 self._device_id = secrets["device_id"] 334 self._private_key = secrets["private_key"] 335 self.broker = "mqtt.googleapis.com" 336 self.username = b"unused" 337 self.cid = self.client_id 338 339 @property 340 def client_id(self): 341 """Returns a Google Cloud IOT Core Client ID. 342 """ 343 client_id = "projects/{0}/locations/{1}/registries/{2}/devices/{3}".format( 344 self._proj_id, self._region, self._reg_id, self._device_id 345 ) 346 if self.logger: 347 self.logger.debug("Client ID: {}".format(client_id)) 348 return client_id 349 350 def generate_jwt(self, ttl=43200, algo="RS256"): 351 """Generates a JSON Web Token (https://jwt.io/) using network time. 352 :param int jwt_ttl: When the JWT token expires, defaults to 43200 minutes (or 12 hours). 353 :param str algo: Algorithm used to create a JSON Web Token. 354 355 Example usage of generating and setting a JSON-Web-Token: 356 ..code-block:: python 357 358 jwt = CloudCore.generate_jwt() 359 print("Generated JWT: ", jwt) 360 361 """ 362 if self.logger: 363 self.logger.debug("Generating JWT...") 364 ntp = NTP.NTP(self._esp) 365 ntp.set_time() 366 claims = { 367 # The time that the token was issued at 368 "iat": time.time(), 369 # The time the token expires. 370 "exp": time.time() + ttl, 371 # The audience field should always be set to the GCP project id. 372 "aud": self._proj_id, 373 } 374 jwt = JWT.generate(claims, self._private_key, algo) 375 return jwt