/ adafruit_azureiot / iot_mqtt.py
iot_mqtt.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2020 Jim Bennett
  4  #
  5  # Permission is hereby granted, free of charge, to any person obtaining a copy
  6  # of this software and associated documentation files (the "Software"), to deal
  7  # in the Software without restriction, including without limitation the rights
  8  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9  # copies of the Software, and to permit persons to whom the Software is
 10  # furnished to do so, subject to the following conditions:
 11  #
 12  # The above copyright notice and this permission notice shall be included in
 13  # all copies or substantial portions of the Software.
 14  #
 15  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 16  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 17  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 18  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 19  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 20  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 21  # THE SOFTWARE.
 22  """
 23  `iot_mqtt`
 24  =====================
 25  
 26  An MQTT client for Azure IoT
 27  
 28  * Author(s): Jim Bennett, Elena Horton
 29  """
 30  
 31  import gc
 32  import json
 33  import time
 34  import adafruit_minimqtt as minimqtt
 35  from adafruit_minimqtt import MQTT
 36  import adafruit_logging as logging
 37  from .iot_error import IoTError
 38  from .keys import compute_derived_symmetric_key
 39  from .quote import quote
 40  from . import constants
 41  
 42  # pylint: disable=R0903
 43  class IoTResponse:
 44      """A response from a direct method call
 45      """
 46  
 47      def __init__(self, code: int, message: str):
 48          """Creates an IoT Response object
 49          :param int code: The HTTP response code for this method call, for example 200 if the method was handled successfully
 50          :param str message: The HTTP response message for this method call
 51          """
 52          self.response_code = code
 53          self.response_message = message
 54  
 55  
 56  class IoTMQTTCallback:
 57      """An interface for classes that can be called by MQTT events
 58      """
 59  
 60      def message_sent(self, data: str) -> None:
 61          """Called when a message is sent to the cloud
 62          :param str data: The data send with the message
 63          """
 64  
 65      def connection_status_change(self, connected: bool) -> None:
 66          """Called when the connection status changes
 67          :param bool connected: True if the device is connected, otherwise false
 68          """
 69  
 70      # pylint: disable=W0613, R0201
 71      def direct_method_invoked(self, method_name: str, payload: str) -> IoTResponse:
 72          """Called when a direct method is invoked
 73          :param str method_name: The name of the method that was invoked
 74          :param str payload: The payload with the message
 75          :returns: A response with a code and status to show if the method was correctly handled
 76          :rtype: IoTResponse
 77          """
 78          return IoTResponse(200, "")
 79  
 80      # pylint: disable=C0103
 81      def cloud_to_device_message_received(self, body: str, properties: dict) -> None:
 82          """Called when a cloud to device message is received
 83          :param str body: The body of the message
 84          :param dict properties: The propreties sent with the mesage
 85          """
 86  
 87      def device_twin_desired_updated(self, desired_property_name: str, desired_property_value, desired_version: int) -> None:
 88          """Called when the device twin desired properties are updated
 89          :param str desired_property_name: The name of the desired property that was updated
 90          :param desired_property_value: The value of the desired property that was updated
 91          :param int desired_version: The version of the desired property that was updated
 92          """
 93  
 94      def device_twin_reported_updated(self, reported_property_name: str, reported_property_value, reported_version: int) -> None:
 95          """Called when the device twin reported values are updated
 96          :param str reported_property_name: The name of the reported property that was updated
 97          :param reported_property_value: The value of the reported property that was updated
 98          :param int reported_version: The version of the reported property that was updated
 99          """
100  
101  
102  # pylint: disable=R0902
103  class IoTMQTT:
104      """MQTT client for Azure IoT
105      """
106  
107      def _gen_sas_token(self) -> str:
108          token_expiry = int(time.time() + self._token_expires)
109          uri = self._hostname + "%2Fdevices%2F" + self._device_id
110          signed_hmac_sha256 = compute_derived_symmetric_key(self._key, uri + "\n" + str(token_expiry))
111          signature = quote(signed_hmac_sha256, "~()*!.'")
112          if signature.endswith("\n"):  # somewhere along the crypto chain a newline is inserted
113              signature = signature[:-1]
114          token = "SharedAccessSignature sr={}&sig={}&se={}".format(uri, signature, token_expiry)
115          return token
116  
117      def _create_mqtt_client(self) -> None:
118          minimqtt.set_socket(self._socket, self._iface)
119  
120          self._mqtts = MQTT(
121              broker=self._hostname,
122              username=self._username,
123              password=self._passwd,
124              port=8883,
125              keep_alive=120,
126              is_ssl=True,
127              client_id=self._device_id,
128              log=True,
129          )
130  
131          self._mqtts.logger.setLevel(self._logger.getEffectiveLevel())
132  
133          # set actions to take throughout connection lifecycle
134          self._mqtts.on_connect = self._on_connect
135          self._mqtts.on_log = self._on_log
136          self._mqtts.on_publish = self._on_publish
137          self._mqtts.on_disconnect = self._on_disconnect
138  
139          # initiate the connection using the adafruit_minimqtt library
140          self._mqtts.connect()
141  
142      # pylint: disable=C0103, W0613
143      def _on_connect(self, client, userdata, _, rc) -> None:
144          self._logger.info("- iot_mqtt :: _on_connect :: rc = " + str(rc) + ", userdata = " + str(userdata))
145          if rc == 0:
146              self._mqtt_connected = True
147          self._auth_response_received = True
148          self._callback.connection_status_change(True)
149  
150      # pylint: disable=C0103, W0613
151      def _on_log(self, client, userdata, level, buf) -> None:
152          self._logger.info("mqtt-log : " + buf)
153          if level <= 8:
154              self._logger.error("mqtt-log : " + buf)
155  
156      def _on_disconnect(self, client, userdata, rc) -> None:
157          self._logger.info("- iot_mqtt :: _on_disconnect :: rc = " + str(rc))
158          self._auth_response_received = True
159  
160          if rc == 5:
161              self._logger.error("on(disconnect) : Not authorized")
162              self.disconnect()
163  
164          if rc == 1:
165              self._mqtt_connected = False
166  
167          if rc != 5:
168              self._callback.connection_status_change(False)
169  
170      def _on_publish(self, client, data, topic, msg_id) -> None:
171          self._logger.info("- iot_mqtt :: _on_publish :: " + str(data) + " on topic " + str(topic))
172  
173      # pylint: disable=W0703
174      def _handle_device_twin_update(self, client, topic: str, msg: str) -> None:
175          self._logger.debug("- iot_mqtt :: _echo_desired :: " + topic)
176          twin = None
177          desired = None
178  
179          try:
180              twin = json.loads(msg)
181          except json.JSONDecodeError as e:
182              self._logger.error("ERROR: JSON parse for Device Twin message object has failed. => " + msg + " => " + str(e))
183              return
184  
185          if "reported" in twin:
186              reported = twin["reported"]
187  
188              if "$version" in reported:
189                  reported_version = reported["$version"]
190                  reported.pop("$version")
191              else:
192                  self._logger.error("ERROR: Unexpected payload for reported twin update => " + msg)
193                  return
194  
195              for property_name, value in reported.items():
196                  self._callback.device_twin_reported_updated(property_name, value, reported_version)
197  
198          is_patch = "desired" not in twin
199  
200          if is_patch:
201              desired = twin
202          else:
203              desired = twin["desired"]
204  
205          if "$version" in desired:
206              desired_version = desired["$version"]
207              desired.pop("$version")
208          else:
209              self._logger.error("ERROR: Unexpected payload for desired twin update => " + msg)
210              return
211  
212          for property_name, value in desired.items():
213              self._callback.device_twin_desired_updated(property_name, value, desired_version)
214  
215      def _handle_direct_method(self, client, topic: str, msg: str) -> None:
216          index = topic.find("$rid=")
217          method_id = 1
218          method_name = "None"
219          if index == -1:
220              self._logger.error("ERROR: C2D doesn't include topic id")
221          else:
222              method_id = topic[index + 5 :]
223              topic_template = "$iothub/methods/POST/"
224              len_temp = len(topic_template)
225              method_name = topic[len_temp : topic.find("/", len_temp + 1)]
226  
227          ret = self._callback.direct_method_invoked(method_name, msg)
228          gc.collect()
229  
230          ret_code = 200
231          ret_message = "{}"
232          if ret.response_code is not None:
233              ret_code = ret.response_code
234          if ret.response_message is not None:
235              ret_message = ret.response_message
236  
237              # ret message must be JSON
238              if not ret_message.startswith("{") or not ret_message.endswith("}"):
239                  ret_json = {"Value": ret_message}
240                  ret_message = json.dumps(ret_json)
241  
242          next_topic = "$iothub/methods/res/{}/?$rid={}".format(ret_code, method_id)
243          self._logger.info("C2D: => " + next_topic + " with data " + ret_message + " and name => " + method_name)
244          self._send_common(next_topic, ret_message)
245  
246      def _handle_cloud_to_device_message(self, client, topic: str, msg: str) -> None:
247          parts = topic.split("&")[1:]
248  
249          properties = {}
250          for part in parts:
251              key_value = part.split("=")
252              properties[key_value[0]] = key_value[1]
253  
254          self._callback.cloud_to_device_message_received(msg, properties)
255          gc.collect()
256  
257      def _send_common(self, topic: str, data) -> None:
258          # Convert data to a string
259          if isinstance(data, dict):
260              data = json.dumps(data)
261  
262          if not isinstance(data, str):
263              raise IoTError("Data must be a string or a dictionary")
264  
265          self._logger.debug("Sending message on topic: " + topic)
266          self._logger.debug("Sending message: " + str(data))
267  
268          retry = 0
269  
270          while True:
271              gc.collect()
272              try:
273                  self._logger.debug("Trying to send...")
274                  self._mqtts.publish(topic, data)
275                  self._logger.debug("Data sent")
276                  break
277              except RuntimeError as runtime_error:
278                  self._logger.info("Could not send data, retrying after 0.5 seconds: " + str(runtime_error))
279                  retry = retry + 1
280  
281                  if retry >= 10:
282                      self._logger.error("Failed to send data")
283                      raise
284  
285                  time.sleep(0.5)
286                  continue
287  
288          gc.collect()
289  
290      def _get_device_settings(self) -> None:
291          self._logger.info("- iot_mqtt :: _get_device_settings :: ")
292          self.loop()
293          self._send_common("$iothub/twin/GET/?$rid=0", " ")
294  
295      # pylint: disable=R0913
296      def __init__(
297          self,
298          callback: IoTMQTTCallback,
299          socket,
300          iface,
301          hostname: str,
302          device_id: str,
303          key: str,
304          token_expires: int = 21600,
305          logger: logging = None,
306      ):
307          """Create the Azure IoT MQTT client
308          :param IoTMQTTCallback callback: A callback class
309          :param socket: The socket to communicate over
310          :param iface: The network interface to communicate over
311          :param str hostname: The hostname of the MQTT broker to connect to, get this by registering the device
312          :param str device_id: The device ID of the device to register
313          :param str key: The primary or secondary key of the device to register
314          :param int token_expires: The number of seconds till the token expires, defaults to 6 hours
315          :param adafruit_logging logger: The logger
316          """
317          self._callback = callback
318          self._socket = socket
319          self._iface = iface
320          self._mqtt_connected = False
321          self._auth_response_received = False
322          self._mqtts = None
323          self._device_id = device_id
324          self._hostname = hostname
325          self._key = key
326          self._token_expires = token_expires
327          self._username = "{}/{}/api-version={}".format(self._hostname, device_id, constants.IOTC_API_VERSION)
328          self._passwd = self._gen_sas_token()
329          self._logger = logger if logger is not None else logging.getLogger("log")
330          self._is_subscribed_to_twins = False
331  
332      def _subscribe_to_core_topics(self):
333          device_bound_topic = "devices/{}/messages/devicebound/#".format(self._device_id)
334          self._mqtts.add_topic_callback(device_bound_topic, self._handle_cloud_to_device_message)
335          self._mqtts.subscribe(device_bound_topic)
336  
337          self._mqtts.add_topic_callback("$iothub/methods/#", self._handle_direct_method)
338          self._mqtts.subscribe("$iothub/methods/#")
339  
340      def _subscribe_to_twin_topics(self):
341          self._mqtts.add_topic_callback("$iothub/twin/PATCH/properties/desired/#", self._handle_device_twin_update)
342          self._mqtts.subscribe("$iothub/twin/PATCH/properties/desired/#")  # twin desired property changes
343  
344          self._mqtts.add_topic_callback("$iothub/twin/res/200/#", self._handle_device_twin_update)
345          self._mqtts.subscribe("$iothub/twin/res/200/#")  # twin properties response
346  
347      def connect(self) -> bool:
348          """Connects to the MQTT broker
349          :returns: True if the connection is successful, otherwise False
350          :rtype: bool
351          """
352          self._logger.info("- iot_mqtt :: connect :: " + self._hostname)
353  
354          self._create_mqtt_client()
355  
356          self._logger.info(" - iot_mqtt :: connect :: created mqtt client. connecting..")
357          while self._auth_response_received is None:
358              self.loop()
359  
360          self._logger.info(" - iot_mqtt :: connect :: on_connect must be fired. Connected ? " + str(self.is_connected()))
361          if not self.is_connected():
362              return False
363  
364          self._mqtt_connected = True
365          self._auth_response_received = True
366  
367          self._subscribe_to_core_topics()
368  
369          return True
370  
371      def subscribe_to_twins(self) -> None:
372          """Subscribes to digital twin updates
373          Only call this if your tier of IoT Hub supports this
374          """
375          if self._is_subscribed_to_twins:
376              return
377  
378          # do this separately as this is not supported in B1 hubs
379          self._subscribe_to_twin_topics()
380  
381          self._get_device_settings()
382  
383          self._is_subscribed_to_twins = True
384  
385      def disconnect(self) -> None:
386          """Disconnects from the MQTT broker
387          """
388          if not self.is_connected():
389              return
390  
391          self._logger.info("- iot_mqtt :: disconnect :: ")
392          self._mqtt_connected = False
393          self._mqtts.disconnect()
394  
395      def reconnect(self) -> None:
396          """Reconnects to the MQTT broker
397          """
398          self._logger.info("- iot_mqtt :: reconnect :: ")
399  
400          self._mqtts.reconnect()
401  
402      def is_connected(self) -> bool:
403          """Gets if there is an open connection to the MQTT broker
404          :returns: True if there is an open connection, False if not
405          :rtype: bool
406          """
407          return self._mqtt_connected
408  
409      def loop(self) -> None:
410          """Listens for MQTT messages
411          """
412          if not self.is_connected():
413              return
414  
415          self._mqtts.loop()
416          gc.collect()
417  
418      def send_device_to_cloud_message(self, message, system_properties: dict = None) -> None:
419          """Send a device to cloud message from this device to Azure IoT Hub
420          :param message: The message data as a JSON string or a dictionary
421          :param system_properties: System properties to send with the message
422          :raises: ValueError if the message is not a string or dictionary
423          :raises RuntimeError: if the internet connection is not responding or is unable to connect
424          """
425          self._logger.info("- iot_mqtt :: send_device_to_cloud_message :: " + message)
426          topic = "devices/{}/messages/events/".format(self._device_id)
427  
428          if system_properties is not None:
429              firstProp = True
430              for prop in system_properties:
431                  if not firstProp:
432                      topic += "&"
433                  else:
434                      firstProp = False
435                  topic += prop + "=" + str(system_properties[prop])
436  
437          # Convert message to a string
438          if isinstance(message, dict):
439              message = json.dumps(message)
440  
441          if not isinstance(message, str):
442              raise ValueError("message must be a string or a dictionary")
443  
444          self._send_common(topic, message)
445          self._callback.message_sent(message)
446  
447      def send_twin_patch(self, patch) -> None:
448          """Send a patch for the reported properties of the device twin
449          :param patch: The patch as a JSON string or a dictionary
450          :raises: IoTError if the data is not a string or dictionary
451          :raises RuntimeError: if the internet connection is not responding or is unable to connect
452          """
453          self._logger.info("- iot_mqtt :: sendProperty :: " + str(patch))
454          topic = "$iothub/twin/PATCH/properties/reported/?$rid={}".format(int(time.time()))
455          self._send_common(topic, patch)