/ adafruit_gc_iot_core.py
adafruit_gc_iot_core.py
  1  # Copyright 2019 Google Inc.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #         http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  #
 15  # Modified by Brent Rubell for Adafruit Industries, 2019
 16  """
 17  `adafruit_gc_iot_core`
 18  ================================================================================
 19  
 20  CircuitPython Google Cloud IoT Module
 21  
 22  * Author(s): Brent Rubell, Google Inc.
 23  
 24  Implementation Notes
 25  --------------------
 26  
 27  **Software and Dependencies:**
 28  
 29  * Adafruit CircuitPython firmware for the supported boards:
 30    https://github.com/adafruit/circuitpython/releases
 31  
 32  * Adafruit CircuitPython JWT Module:
 33    https://github.com/adafruit/Adafruit_CircuitPython_JWT
 34  
 35  * Adafruit CircuitPython Logging Module:
 36    https://github.com/adafruit/Adafruit_CircuitPython_Logging
 37  
 38  """
 39  import time
 40  
 41  import adafruit_logging as logging
 42  from adafruit_jwt import JWT
 43  import adafruit_ntp as NTP
 44  
 45  __version__ = "0.0.0-auto.0"
 46  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_GC_IOT_Core.git"
 47  
 48  
 49  class MQTT_API_ERROR(Exception):
 50      """Exception raised on MQTT API return-code errors."""
 51  
 52      # pylint: disable=unnecessary-pass
 53      pass
 54  
 55  
 56  class MQTT_API:
 57      """Client for interacting with Google's Cloud Core MQTT API.
 58  
 59      :param MiniMQTT mqtt_client: MiniMQTT Client object.
 60  
 61      """
 62  
 63      def __init__(self, mqtt_client):
 64          # Check that provided object is a MiniMQTT client object
 65          mqtt_client_type = str(type(mqtt_client))
 66          if "MQTT" in mqtt_client_type:
 67              self._client = mqtt_client
 68          else:
 69              raise TypeError(
 70                  "This class requires a MiniMQTT client object, please create one."
 71              )
 72          # Verify that the MiniMQTT client was setup correctly.
 73          try:
 74              self.user = self._client.user
 75          except:
 76              raise TypeError("Google Cloud Core IoT MQTT API requires a username.")
 77          # Validate provided JWT before connecting
 78          try:
 79              JWT.validate(self._client.password)
 80          except:
 81              raise TypeError("Invalid JWT provided.")
 82          # If client has KeepAlive =0 or if KeepAlive > 20min,
 83          # set KeepAlive to 19 minutes to avoid disconnection
 84          # due to Idle Time (https://cloud.google.com/iot/quotas).
 85          if self._client.keep_alive == 0 or self._client.keep_alive >= 1200:
 86              self._client.keep_alive = 1140
 87          # User-defined MQTT callback methods must be init'd to None
 88          self.on_connect = None
 89          self.on_disconnect = None
 90          self.on_message = None
 91          self.on_subscribe = None
 92          self.on_unsubscribe = None
 93          # MQTT event callbacks
 94          self._client.on_connect = self._on_connect_mqtt
 95          self._client.on_disconnect = self._on_disconnect_mqtt
 96          self._client.on_message = self._on_message_mqtt
 97          self._client.on_subscribe = self._on_subscribe_mqtt
 98          self._client.on_unsubscribe = self._on_unsubscribe_mqtt
 99          self.logger = False
100          if self._client.logger is not None:
101              # Allow MQTT_API to utilize MiniMQTT Client's logger
102              self.logger = True
103              self._client.set_logger_level("DEBUG")
104          self._connected = False
105          # Set up a device identifier by splitting out the full CID
106          self.device_id = self._client.client_id.split("/")[7]
107  
108      def __enter__(self):
109          return self
110  
111      def __exit__(self, exception_type, exception_value, traceback):
112          self.disconnect()
113  
114      def disconnect(self):
115          """Disconnects from the Google MQTT Broker.
116          """
117          try:
118              self._client.disconnect()
119          except:
120              raise ValueError("Unable to disconnect from Google's MQTT broker.")
121          self._connected = False
122          # Reset all user-defined callbacks
123          self.on_connect = None
124          self.on_disconnect = None
125          self.on_message = None
126          self.on_subscribe = None
127          self.on_unsubscribe = None
128          # De-initialize MiniMQTT Client
129          self._client.deinit()
130  
131      def reconnect(self):
132          """Reconnects to the Google MQTT Broker.
133          """
134          try:
135              self._client.reconnect()
136          except:
137              raise MQTT_API_ERROR("Error reconnecting to Google MQTT.")
138  
139      def connect(self):
140          """Connects to the Google MQTT Broker.
141          """
142          self._client.connect()
143          self._connected = True
144  
145      @property
146      def is_connected(self):
147          """Returns if client is connected to Google's MQTT broker.
148          """
149          return self._connected
150  
151      # pylint: disable=not-callable, unused-argument
152      def _on_connect_mqtt(self, client, userdata, flags, return_code):
153          """Runs when the mqtt client calls on_connect.
154          """
155          if self.logger:
156              self._client.logger.debug("Client called on_connect.")
157          if return_code == 0:
158              self._connected = True
159          else:
160              raise MQTT_API_ERROR(return_code)
161          # Call the user-defined on_connect callback if defined
162          if self.on_connect is not None:
163              self.on_connect(self, userdata, flags, return_code)
164  
165      # pylint: disable=not-callable, unused-argument
166      def _on_disconnect_mqtt(self, client, userdata, return_code):
167          """Runs when the client calls on_disconnect.
168          """
169          if self.logger:
170              self._client.logger.debug("Client called on_disconnect")
171          self._connected = False
172          # Call the user-defined on_disconnect callblack if defined
173          if self.on_disconnect is not None:
174              self.on_disconnect(self)
175  
176      # pylint: disable=not-callable
177      def _on_message_mqtt(self, client, topic, payload):
178          """Runs when the client calls on_message.
179          """
180          if self.logger:
181              self._client.logger.debug("Client called on_message")
182          if self.on_message is not None:
183              self.on_message(self, topic, payload)
184  
185      # pylint: disable=not-callable
186      def _on_subscribe_mqtt(self, client, user_data, topic, qos):
187          """Runs when the client calls on_subscribe.
188          """
189          if self.logger:
190              self._client.logger.debug("Client called on_subscribe")
191          if self.on_subscribe is not None:
192              self.on_subscribe(self, user_data, topic, qos)
193  
194      # pylint: disable=not-callable
195      def _on_unsubscribe_mqtt(self, client, user_data, topic, pid):
196          """Runs when the client calls on_unsubscribe.
197          """
198          if self.logger:
199              self._client.logger.debug("Client called on_unsubscribe")
200          if self.on_unsubscribe is not None:
201              self.on_unsubscribe(self, user_data, topic, pid)
202  
203      def loop(self):
204          """Maintains a connection with Google Cloud IoT Core's MQTT broker. You will
205          need to manually call this method within a loop to retain connection.
206  
207          Example of "pumping" a Google Core IoT loop.
208          ..code-block:: python
209  
210              while True:
211                  google_iot.loop()
212  
213          """
214          if self._connected:
215              self._client.loop()
216  
217      def unsubscribe(self, topic, subfolder=None):
218          """Unsubscribes from a Google Cloud IoT device topic.
219          :param str topic: Required MQTT topic. Defaults to events.
220          :param str subfolder: Optional MQTT topic subfolder. Defaults to None.
221  
222          """
223          if subfolder is not None:
224              mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder)
225          else:
226              mqtt_topic = "/devices/{}/{}".format(self.device_id, topic)
227          self._client.unsubscribe(mqtt_topic)
228  
229      def unsubscribe_from_all_commands(self):
230          """Unsubscribes from a device's "commands/#" topic.
231          :param int qos: Quality of Service level for the message.
232  
233          """
234          self.unsubscribe("commands/#")
235  
236      def subscribe(self, topic, subfolder=None, qos=1):
237          """Subscribes to a Google Cloud IoT device topic.
238          :param str topic: Required MQTT topic. Defaults to events.
239          :param str subfolder: Optional MQTT topic subfolder. Defaults to None.
240          :param int qos: Quality of Service level for the message.
241  
242          """
243          if subfolder is not None:
244              mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder)
245          else:
246              mqtt_topic = "/devices/{}/{}".format(self.device_id, topic)
247          self._client.subscribe(mqtt_topic, qos)
248  
249      def subscribe_to_subfolder(self, topic, subfolder, qos=1):
250          """Subscribes to a Google Cloud IoT device's topic subfolder
251          :param str topic: Required MQTT topic.
252          :param str subfolder: Optional MQTT topic subfolder. Defaults to None.
253          :param int qos: Quality of Service level for the message.
254  
255          """
256          self.subscribe(topic, subfolder, qos)
257  
258      def subscribe_to_config(self, qos=1):
259          """Subscribes to a Google Cloud IoT device's configuration
260          topic.
261          :param int qos: Quality of Service level for the message.
262  
263          """
264          self.subscribe("config", qos=qos)
265  
266      def subscribe_to_all_commands(self, qos=1):
267          """Subscribes to a device's "commands/#" topic.
268          :param int qos: Quality of Service level for the message.
269  
270          """
271          self.subscribe("commands/#", qos=qos)
272  
273      def publish(self, payload, topic="events", subfolder=None, qos=0):
274          """Publishes a payload from the device to its Google Cloud IoT
275          device topic, defaults to "events" topic. To send state, use the
276          publish_state method.
277  
278          :param int payload: Data to publish to Google Cloud IoT
279          :param str payload: Data to publish to Google Cloud IoT
280          :param float payload: Data to publish to Google Cloud IoT
281          :param str topic: Required MQTT topic. Defaults to events.
282          :param str subfolder: Optional MQTT topic subfolder. Defaults to None.
283          :param int qos: Quality of Service level for the message.
284  
285          """
286          if subfolder is not None:
287              mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder)
288          elif topic is not None:
289              mqtt_topic = "/devices/{}/{}".format(self.device_id, topic)
290          elif topic == "state" and subfolder is not None:
291              raise ValueError("Subfolders are not supported for state messages.")
292          else:
293              raise TypeError("A topic string must be specified.")
294          self._client.publish(mqtt_topic, payload, qos=qos)
295  
296      def publish_state(self, payload):
297          """Publishes a device state message to the Cloud IoT MQTT API. Data
298          sent by this method should be information about the device itself (such as number of
299          crashes, battery level, or device health). This method is unidirectional,
300          it communicates Device-to-Cloud only.
301  
302          """
303          self._client.publish(payload, "state")
304  
305  
306  # pylint: disable=too-many-instance-attributes
307  class Cloud_Core:
308      """CircuitPython Google Cloud IoT Core module.
309  
310      :param ESP_SPIcontrol esp: ESP32SPI object.
311      :param dict secrets: Secrets.py file.
312      :param bool log: Enable Cloud_Core logging, defaults to False.
313  
314      """
315  
316      def __init__(self, esp, secrets, log=False):
317          self._esp = esp
318          # Validate Secrets
319          if hasattr(secrets, "keys"):
320              self._secrets = secrets
321          else:
322              raise AttributeError(
323                  "Project settings are kept in secrets.py, please add them there!"
324              )
325          self.logger = None
326          if log is True:
327              self.logger = logging.getLogger("log")
328              self.logger.setLevel(logging.DEBUG)
329          # Configuration, from secrets file
330          self._proj_id = secrets["project_id"]
331          self._region = secrets["cloud_region"]
332          self._reg_id = secrets["registry_id"]
333          self._device_id = secrets["device_id"]
334          self._private_key = secrets["private_key"]
335          self.broker = "mqtt.googleapis.com"
336          self.username = b"unused"
337          self.cid = self.client_id
338  
339      @property
340      def client_id(self):
341          """Returns a Google Cloud IOT Core Client ID.
342          """
343          client_id = "projects/{0}/locations/{1}/registries/{2}/devices/{3}".format(
344              self._proj_id, self._region, self._reg_id, self._device_id
345          )
346          if self.logger:
347              self.logger.debug("Client ID: {}".format(client_id))
348          return client_id
349  
350      def generate_jwt(self, ttl=43200, algo="RS256"):
351          """Generates a JSON Web Token (https://jwt.io/) using network time.
352          :param int jwt_ttl: When the JWT token expires, defaults to 43200 minutes (or 12 hours).
353          :param str algo: Algorithm used to create a JSON Web Token.
354  
355          Example usage of generating and setting a JSON-Web-Token:
356          ..code-block:: python
357  
358              jwt = CloudCore.generate_jwt()
359              print("Generated JWT: ", jwt)
360  
361          """
362          if self.logger:
363              self.logger.debug("Generating JWT...")
364          ntp = NTP.NTP(self._esp)
365          ntp.set_time()
366          claims = {
367              # The time that the token was issued at
368              "iat": time.time(),
369              # The time the token expires.
370              "exp": time.time() + ttl,
371              # The audience field should always be set to the GCP project id.
372              "aud": self._proj_id,
373          }
374          jwt = JWT.generate(claims, self._private_key, algo)
375          return jwt