/ adafruit_azureiot / iot_mqtt.py
iot_mqtt.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2020 Jim Bennett 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 """ 23 `iot_mqtt` 24 ===================== 25 26 An MQTT client for Azure IoT 27 28 * Author(s): Jim Bennett, Elena Horton 29 """ 30 31 import gc 32 import json 33 import time 34 import adafruit_minimqtt as minimqtt 35 from adafruit_minimqtt import MQTT 36 import adafruit_logging as logging 37 from .iot_error import IoTError 38 from .keys import compute_derived_symmetric_key 39 from .quote import quote 40 from . import constants 41 42 # pylint: disable=R0903 43 class IoTResponse: 44 """A response from a direct method call 45 """ 46 47 def __init__(self, code: int, message: str): 48 """Creates an IoT Response object 49 :param int code: The HTTP response code for this method call, for example 200 if the method was handled successfully 50 :param str message: The HTTP response message for this method call 51 """ 52 self.response_code = code 53 self.response_message = message 54 55 56 class IoTMQTTCallback: 57 """An interface for classes that can be called by MQTT events 58 """ 59 60 def message_sent(self, data: str) -> None: 61 """Called when a message is sent to the cloud 62 :param str data: The data send with the message 63 """ 64 65 def connection_status_change(self, connected: bool) -> None: 66 """Called when the connection status changes 67 :param bool connected: True if the device is connected, otherwise false 68 """ 69 70 # pylint: disable=W0613, R0201 71 def direct_method_invoked(self, method_name: str, payload: str) -> IoTResponse: 72 """Called when a direct method is invoked 73 :param str method_name: The name of the method that was invoked 74 :param str payload: The payload with the message 75 :returns: A response with a code and status to show if the method was correctly handled 76 :rtype: IoTResponse 77 """ 78 return IoTResponse(200, "") 79 80 # pylint: disable=C0103 81 def cloud_to_device_message_received(self, body: str, properties: dict) -> None: 82 """Called when a cloud to device message is received 83 :param str body: The body of the message 84 :param dict properties: The propreties sent with the mesage 85 """ 86 87 def device_twin_desired_updated(self, desired_property_name: str, desired_property_value, desired_version: int) -> None: 88 """Called when the device twin desired properties are updated 89 :param str desired_property_name: The name of the desired property that was updated 90 :param desired_property_value: The value of the desired property that was updated 91 :param int desired_version: The version of the desired property that was updated 92 """ 93 94 def device_twin_reported_updated(self, reported_property_name: str, reported_property_value, reported_version: int) -> None: 95 """Called when the device twin reported values are updated 96 :param str reported_property_name: The name of the reported property that was updated 97 :param reported_property_value: The value of the reported property that was updated 98 :param int reported_version: The version of the reported property that was updated 99 """ 100 101 102 # pylint: disable=R0902 103 class IoTMQTT: 104 """MQTT client for Azure IoT 105 """ 106 107 def _gen_sas_token(self) -> str: 108 token_expiry = int(time.time() + self._token_expires) 109 uri = self._hostname + "%2Fdevices%2F" + self._device_id 110 signed_hmac_sha256 = compute_derived_symmetric_key(self._key, uri + "\n" + str(token_expiry)) 111 signature = quote(signed_hmac_sha256, "~()*!.'") 112 if signature.endswith("\n"): # somewhere along the crypto chain a newline is inserted 113 signature = signature[:-1] 114 token = "SharedAccessSignature sr={}&sig={}&se={}".format(uri, signature, token_expiry) 115 return token 116 117 def _create_mqtt_client(self) -> None: 118 minimqtt.set_socket(self._socket, self._iface) 119 120 self._mqtts = MQTT( 121 broker=self._hostname, 122 username=self._username, 123 password=self._passwd, 124 port=8883, 125 keep_alive=120, 126 is_ssl=True, 127 client_id=self._device_id, 128 log=True, 129 ) 130 131 self._mqtts.logger.setLevel(self._logger.getEffectiveLevel()) 132 133 # set actions to take throughout connection lifecycle 134 self._mqtts.on_connect = self._on_connect 135 self._mqtts.on_log = self._on_log 136 self._mqtts.on_publish = self._on_publish 137 self._mqtts.on_disconnect = self._on_disconnect 138 139 # initiate the connection using the adafruit_minimqtt library 140 self._mqtts.connect() 141 142 # pylint: disable=C0103, W0613 143 def _on_connect(self, client, userdata, _, rc) -> None: 144 self._logger.info("- iot_mqtt :: _on_connect :: rc = " + str(rc) + ", userdata = " + str(userdata)) 145 if rc == 0: 146 self._mqtt_connected = True 147 self._auth_response_received = True 148 self._callback.connection_status_change(True) 149 150 # pylint: disable=C0103, W0613 151 def _on_log(self, client, userdata, level, buf) -> None: 152 self._logger.info("mqtt-log : " + buf) 153 if level <= 8: 154 self._logger.error("mqtt-log : " + buf) 155 156 def _on_disconnect(self, client, userdata, rc) -> None: 157 self._logger.info("- iot_mqtt :: _on_disconnect :: rc = " + str(rc)) 158 self._auth_response_received = True 159 160 if rc == 5: 161 self._logger.error("on(disconnect) : Not authorized") 162 self.disconnect() 163 164 if rc == 1: 165 self._mqtt_connected = False 166 167 if rc != 5: 168 self._callback.connection_status_change(False) 169 170 def _on_publish(self, client, data, topic, msg_id) -> None: 171 self._logger.info("- iot_mqtt :: _on_publish :: " + str(data) + " on topic " + str(topic)) 172 173 # pylint: disable=W0703 174 def _handle_device_twin_update(self, client, topic: str, msg: str) -> None: 175 self._logger.debug("- iot_mqtt :: _echo_desired :: " + topic) 176 twin = None 177 desired = None 178 179 try: 180 twin = json.loads(msg) 181 except json.JSONDecodeError as e: 182 self._logger.error("ERROR: JSON parse for Device Twin message object has failed. => " + msg + " => " + str(e)) 183 return 184 185 if "reported" in twin: 186 reported = twin["reported"] 187 188 if "$version" in reported: 189 reported_version = reported["$version"] 190 reported.pop("$version") 191 else: 192 self._logger.error("ERROR: Unexpected payload for reported twin update => " + msg) 193 return 194 195 for property_name, value in reported.items(): 196 self._callback.device_twin_reported_updated(property_name, value, reported_version) 197 198 is_patch = "desired" not in twin 199 200 if is_patch: 201 desired = twin 202 else: 203 desired = twin["desired"] 204 205 if "$version" in desired: 206 desired_version = desired["$version"] 207 desired.pop("$version") 208 else: 209 self._logger.error("ERROR: Unexpected payload for desired twin update => " + msg) 210 return 211 212 for property_name, value in desired.items(): 213 self._callback.device_twin_desired_updated(property_name, value, desired_version) 214 215 def _handle_direct_method(self, client, topic: str, msg: str) -> None: 216 index = topic.find("$rid=") 217 method_id = 1 218 method_name = "None" 219 if index == -1: 220 self._logger.error("ERROR: C2D doesn't include topic id") 221 else: 222 method_id = topic[index + 5 :] 223 topic_template = "$iothub/methods/POST/" 224 len_temp = len(topic_template) 225 method_name = topic[len_temp : topic.find("/", len_temp + 1)] 226 227 ret = self._callback.direct_method_invoked(method_name, msg) 228 gc.collect() 229 230 ret_code = 200 231 ret_message = "{}" 232 if ret.response_code is not None: 233 ret_code = ret.response_code 234 if ret.response_message is not None: 235 ret_message = ret.response_message 236 237 # ret message must be JSON 238 if not ret_message.startswith("{") or not ret_message.endswith("}"): 239 ret_json = {"Value": ret_message} 240 ret_message = json.dumps(ret_json) 241 242 next_topic = "$iothub/methods/res/{}/?$rid={}".format(ret_code, method_id) 243 self._logger.info("C2D: => " + next_topic + " with data " + ret_message + " and name => " + method_name) 244 self._send_common(next_topic, ret_message) 245 246 def _handle_cloud_to_device_message(self, client, topic: str, msg: str) -> None: 247 parts = topic.split("&")[1:] 248 249 properties = {} 250 for part in parts: 251 key_value = part.split("=") 252 properties[key_value[0]] = key_value[1] 253 254 self._callback.cloud_to_device_message_received(msg, properties) 255 gc.collect() 256 257 def _send_common(self, topic: str, data) -> None: 258 # Convert data to a string 259 if isinstance(data, dict): 260 data = json.dumps(data) 261 262 if not isinstance(data, str): 263 raise IoTError("Data must be a string or a dictionary") 264 265 self._logger.debug("Sending message on topic: " + topic) 266 self._logger.debug("Sending message: " + str(data)) 267 268 retry = 0 269 270 while True: 271 gc.collect() 272 try: 273 self._logger.debug("Trying to send...") 274 self._mqtts.publish(topic, data) 275 self._logger.debug("Data sent") 276 break 277 except RuntimeError as runtime_error: 278 self._logger.info("Could not send data, retrying after 0.5 seconds: " + str(runtime_error)) 279 retry = retry + 1 280 281 if retry >= 10: 282 self._logger.error("Failed to send data") 283 raise 284 285 time.sleep(0.5) 286 continue 287 288 gc.collect() 289 290 def _get_device_settings(self) -> None: 291 self._logger.info("- iot_mqtt :: _get_device_settings :: ") 292 self.loop() 293 self._send_common("$iothub/twin/GET/?$rid=0", " ") 294 295 # pylint: disable=R0913 296 def __init__( 297 self, 298 callback: IoTMQTTCallback, 299 socket, 300 iface, 301 hostname: str, 302 device_id: str, 303 key: str, 304 token_expires: int = 21600, 305 logger: logging = None, 306 ): 307 """Create the Azure IoT MQTT client 308 :param IoTMQTTCallback callback: A callback class 309 :param socket: The socket to communicate over 310 :param iface: The network interface to communicate over 311 :param str hostname: The hostname of the MQTT broker to connect to, get this by registering the device 312 :param str device_id: The device ID of the device to register 313 :param str key: The primary or secondary key of the device to register 314 :param int token_expires: The number of seconds till the token expires, defaults to 6 hours 315 :param adafruit_logging logger: The logger 316 """ 317 self._callback = callback 318 self._socket = socket 319 self._iface = iface 320 self._mqtt_connected = False 321 self._auth_response_received = False 322 self._mqtts = None 323 self._device_id = device_id 324 self._hostname = hostname 325 self._key = key 326 self._token_expires = token_expires 327 self._username = "{}/{}/api-version={}".format(self._hostname, device_id, constants.IOTC_API_VERSION) 328 self._passwd = self._gen_sas_token() 329 self._logger = logger if logger is not None else logging.getLogger("log") 330 self._is_subscribed_to_twins = False 331 332 def _subscribe_to_core_topics(self): 333 device_bound_topic = "devices/{}/messages/devicebound/#".format(self._device_id) 334 self._mqtts.add_topic_callback(device_bound_topic, self._handle_cloud_to_device_message) 335 self._mqtts.subscribe(device_bound_topic) 336 337 self._mqtts.add_topic_callback("$iothub/methods/#", self._handle_direct_method) 338 self._mqtts.subscribe("$iothub/methods/#") 339 340 def _subscribe_to_twin_topics(self): 341 self._mqtts.add_topic_callback("$iothub/twin/PATCH/properties/desired/#", self._handle_device_twin_update) 342 self._mqtts.subscribe("$iothub/twin/PATCH/properties/desired/#") # twin desired property changes 343 344 self._mqtts.add_topic_callback("$iothub/twin/res/200/#", self._handle_device_twin_update) 345 self._mqtts.subscribe("$iothub/twin/res/200/#") # twin properties response 346 347 def connect(self) -> bool: 348 """Connects to the MQTT broker 349 :returns: True if the connection is successful, otherwise False 350 :rtype: bool 351 """ 352 self._logger.info("- iot_mqtt :: connect :: " + self._hostname) 353 354 self._create_mqtt_client() 355 356 self._logger.info(" - iot_mqtt :: connect :: created mqtt client. connecting..") 357 while self._auth_response_received is None: 358 self.loop() 359 360 self._logger.info(" - iot_mqtt :: connect :: on_connect must be fired. Connected ? " + str(self.is_connected())) 361 if not self.is_connected(): 362 return False 363 364 self._mqtt_connected = True 365 self._auth_response_received = True 366 367 self._subscribe_to_core_topics() 368 369 return True 370 371 def subscribe_to_twins(self) -> None: 372 """Subscribes to digital twin updates 373 Only call this if your tier of IoT Hub supports this 374 """ 375 if self._is_subscribed_to_twins: 376 return 377 378 # do this separately as this is not supported in B1 hubs 379 self._subscribe_to_twin_topics() 380 381 self._get_device_settings() 382 383 self._is_subscribed_to_twins = True 384 385 def disconnect(self) -> None: 386 """Disconnects from the MQTT broker 387 """ 388 if not self.is_connected(): 389 return 390 391 self._logger.info("- iot_mqtt :: disconnect :: ") 392 self._mqtt_connected = False 393 self._mqtts.disconnect() 394 395 def reconnect(self) -> None: 396 """Reconnects to the MQTT broker 397 """ 398 self._logger.info("- iot_mqtt :: reconnect :: ") 399 400 self._mqtts.reconnect() 401 402 def is_connected(self) -> bool: 403 """Gets if there is an open connection to the MQTT broker 404 :returns: True if there is an open connection, False if not 405 :rtype: bool 406 """ 407 return self._mqtt_connected 408 409 def loop(self) -> None: 410 """Listens for MQTT messages 411 """ 412 if not self.is_connected(): 413 return 414 415 self._mqtts.loop() 416 gc.collect() 417 418 def send_device_to_cloud_message(self, message, system_properties: dict = None) -> None: 419 """Send a device to cloud message from this device to Azure IoT Hub 420 :param message: The message data as a JSON string or a dictionary 421 :param system_properties: System properties to send with the message 422 :raises: ValueError if the message is not a string or dictionary 423 :raises RuntimeError: if the internet connection is not responding or is unable to connect 424 """ 425 self._logger.info("- iot_mqtt :: send_device_to_cloud_message :: " + message) 426 topic = "devices/{}/messages/events/".format(self._device_id) 427 428 if system_properties is not None: 429 firstProp = True 430 for prop in system_properties: 431 if not firstProp: 432 topic += "&" 433 else: 434 firstProp = False 435 topic += prop + "=" + str(system_properties[prop]) 436 437 # Convert message to a string 438 if isinstance(message, dict): 439 message = json.dumps(message) 440 441 if not isinstance(message, str): 442 raise ValueError("message must be a string or a dictionary") 443 444 self._send_common(topic, message) 445 self._callback.message_sent(message) 446 447 def send_twin_patch(self, patch) -> None: 448 """Send a patch for the reported properties of the device twin 449 :param patch: The patch as a JSON string or a dictionary 450 :raises: IoTError if the data is not a string or dictionary 451 :raises RuntimeError: if the internet connection is not responding or is unable to connect 452 """ 453 self._logger.info("- iot_mqtt :: sendProperty :: " + str(patch)) 454 topic = "$iothub/twin/PATCH/properties/reported/?$rid={}".format(int(time.time())) 455 self._send_common(topic, patch)