/ adafruit_azureiot / iothub_device.py
iothub_device.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2020 Jim Bennett, Elena Horton 4 # 5 # Permission is hereby granted, free of charge, to any person obtaining a copy 6 # of this software and associated documentation files (the "Software"), to deal 7 # in the Software without restriction, including without limitation the rights 8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 # copies of the Software, and to permit persons to whom the Software is 10 # furnished to do so, subject to the following conditions: 11 # 12 # The above copyright notice and this permission notice shall be included in 13 # all copies or substantial portions of the Software. 14 # 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 # THE SOFTWARE. 22 """ 23 `iothub_device` 24 ===================== 25 26 Connectivity to Azure IoT Hub 27 28 * Author(s): Jim Bennett, Elena Horton 29 """ 30 31 import json 32 import adafruit_logging as logging 33 from .iot_error import IoTError 34 from .iot_mqtt import IoTMQTT, IoTMQTTCallback, IoTResponse 35 36 37 def _validate_keys(connection_string_parts): 38 """Raise ValueError if incorrect combination of keys 39 """ 40 host_name = connection_string_parts.get(HOST_NAME) 41 shared_access_key_name = connection_string_parts.get(SHARED_ACCESS_KEY_NAME) 42 shared_access_key = connection_string_parts.get(SHARED_ACCESS_KEY) 43 device_id = connection_string_parts.get(DEVICE_ID) 44 45 if host_name and device_id and shared_access_key: 46 pass 47 elif host_name and shared_access_key and shared_access_key_name: 48 pass 49 else: 50 raise ValueError("Invalid Connection String - Incomplete") 51 52 53 DELIMITER = ";" 54 VALUE_SEPARATOR = "=" 55 56 HOST_NAME = "HostName" 57 SHARED_ACCESS_KEY_NAME = "SharedAccessKeyName" 58 SHARED_ACCESS_KEY = "SharedAccessKey" 59 SHARED_ACCESS_SIGNATURE = "SharedAccessSignature" 60 DEVICE_ID = "DeviceId" 61 MODULE_ID = "ModuleId" 62 GATEWAY_HOST_NAME = "GatewayHostName" 63 64 VALID_KEYS = [ 65 HOST_NAME, 66 SHARED_ACCESS_KEY_NAME, 67 SHARED_ACCESS_KEY, 68 SHARED_ACCESS_SIGNATURE, 69 DEVICE_ID, 70 MODULE_ID, 71 GATEWAY_HOST_NAME, 72 ] 73 74 75 class IoTHubDevice(IoTMQTTCallback): 76 """A device client for the Azure IoT Hub service 77 """ 78 79 def connection_status_change(self, connected: bool) -> None: 80 """Called when the connection status changes 81 :param bool connected: True if the device is connected, otherwise false 82 """ 83 if self._on_connection_status_changed is not None: 84 # pylint: disable=E1102 85 self._on_connection_status_changed(connected) 86 87 # pylint: disable=W0613, R0201 88 def direct_method_invoked(self, method_name: str, payload) -> IoTResponse: 89 """Called when a direct method is invoked 90 :param str method_name: The name of the method that was invoked 91 :param str payload: The payload with the message 92 :returns: A response with a code and status to show if the method was correctly handled 93 :rtype: IoTResponse 94 """ 95 if self._on_direct_method_invoked is not None: 96 # pylint: disable=E1102 97 return self._on_direct_method_invoked(method_name, payload) 98 99 raise IoTError("on_direct_method_invoked not set") 100 101 # pylint: disable=C0103 102 def cloud_to_device_message_received(self, body: str, properties: dict) -> None: 103 """Called when a cloud to device message is received 104 :param str body: The body of the message 105 :param dict properties: The propreties sent with the mesage 106 """ 107 if self._on_cloud_to_device_message_received is not None: 108 # pylint: disable=E1102 109 self._on_cloud_to_device_message_received(body, properties) 110 111 def device_twin_desired_updated(self, desired_property_name: str, desired_property_value, desired_version: int) -> None: 112 """Called when the device twin desired properties are updated 113 :param str desired_property_name: The name of the desired property that was updated 114 :param desired_property_value: The value of the desired property that was updated 115 :param int desired_version: The version of the desired property that was updated 116 """ 117 if self._on_device_twin_desired_updated is not None: 118 # pylint: disable=E1102 119 self._on_device_twin_desired_updated(desired_property_name, desired_property_value, desired_version) 120 121 def device_twin_reported_updated(self, reported_property_name: str, reported_property_value, reported_version: int) -> None: 122 """Called when the device twin reported values are updated 123 :param str reported_property_name: The name of the reported property that was updated 124 :param reported_property_value: The value of the reported property that was updated 125 :param int reported_version: The version of the reported property that was updated 126 """ 127 if self._on_device_twin_reported_updated is not None: 128 # pylint: disable=E1102 129 self._on_device_twin_reported_updated(reported_property_name, reported_property_value, reported_version) 130 131 def __init__(self, socket, iface, device_connection_string: str, token_expires: int = 21600, logger: logging = None): 132 """Create the Azure IoT Central device client 133 :param socket: The network socket 134 :param iface: The network interface 135 :param str device_connection_string: The Iot Hub device connection string 136 :param int token_expires: The number of seconds till the token expires, defaults to 6 hours 137 :param adafruit_logging logger: The logger 138 """ 139 self._socket = socket 140 self._iface = iface 141 self._token_expires = token_expires 142 self._logger = logger if logger is not None else logging.getLogger("log") 143 144 connection_string_values = {} 145 146 try: 147 cs_args = device_connection_string.split(DELIMITER) 148 connection_string_values = dict(arg.split(VALUE_SEPARATOR, 1) for arg in cs_args) 149 except (ValueError, AttributeError): 150 raise ValueError("Connection string is required and should not be empty or blank and must be supplied as a string") 151 152 if len(cs_args) != len(connection_string_values): 153 raise ValueError("Invalid Connection String - Unable to parse") 154 155 _validate_keys(connection_string_values) 156 157 self._hostname = connection_string_values[HOST_NAME] 158 self._device_id = connection_string_values[DEVICE_ID] 159 self._shared_access_key = connection_string_values[SHARED_ACCESS_KEY] 160 161 self._logger.debug("Hostname: " + self._hostname) 162 self._logger.debug("Device Id: " + self._device_id) 163 self._logger.debug("Shared Access Key: " + self._shared_access_key) 164 165 self._on_connection_status_changed = None 166 self._on_direct_method_invoked = None 167 self._on_cloud_to_device_message_received = None 168 self._on_device_twin_desired_updated = None 169 self._on_device_twin_reported_updated = None 170 171 self._mqtt = None 172 173 @property 174 def on_connection_status_changed(self): 175 """A callback method that is called when the connection status is changed. This method should have the following signature: 176 def connection_status_changed(connected: bool) -> None 177 """ 178 return self._on_connection_status_changed 179 180 @on_connection_status_changed.setter 181 def on_connection_status_changed(self, new_on_connection_status_changed): 182 """A callback method that is called when the connection status is changed. This method should have the following signature: 183 def connection_status_changed(connected: bool) -> None 184 """ 185 self._on_connection_status_changed = new_on_connection_status_changed 186 187 @property 188 def on_direct_method_invoked(self): 189 """A callback method that is called when a direct method is invoked. This method should have the following signature: 190 def direct_method_invoked(method_name: str, payload: str) -> IoTResponse: 191 192 This method returns an IoTResponse containing a status code and message from the method invocation. Set this appropriately 193 depending on if the method was successfully handled or not. For example, if the method was handled successfully, set 194 the code to 200 and message to "OK": 195 196 return IoTResponse(200, "OK") 197 """ 198 return self._on_direct_method_invoked 199 200 @on_direct_method_invoked.setter 201 def on_direct_method_invoked(self, new_on_direct_method_invoked): 202 """A callback method that is called when a direct method is invoked. This method should have the following signature: 203 def direct_method_invoked(method_name: str, payload: str) -> IoTResponse: 204 205 This method returns an IoTResponse containing a status code and message from the method invocation. Set this appropriately 206 depending on if the method was successfully handled or not. For example, if the method was handled successfully, set 207 the code to 200 and message to "OK": 208 209 return IoTResponse(200, "OK") 210 """ 211 self._on_direct_method_invoked = new_on_direct_method_invoked 212 213 @property 214 def on_cloud_to_device_message_received(self): 215 """A callback method that is called when a cloud to device message is received. This method should have the following signature: 216 def cloud_to_device_message_received(body: str, properties: dict) -> None: 217 """ 218 return self._on_cloud_to_device_message_received 219 220 @on_cloud_to_device_message_received.setter 221 def on_cloud_to_device_message_received(self, new_on_cloud_to_device_message_received): 222 """A callback method that is called when a cloud to device message is received. This method should have the following signature: 223 def cloud_to_device_message_received(body: str, properties: dict) -> None: 224 """ 225 self._on_cloud_to_device_message_received = new_on_cloud_to_device_message_received 226 227 @property 228 def on_device_twin_desired_updated(self): 229 """A callback method that is called when the desired properties of the devices device twin are updated. 230 This method should have the following signature: 231 def device_twin_desired_updated(desired_property_name: str, desired_property_value, desired_version: int) -> None: 232 """ 233 return self._on_device_twin_desired_updated 234 235 @on_device_twin_desired_updated.setter 236 def on_device_twin_desired_updated(self, new_on_device_twin_desired_updated): 237 """A callback method that is called when the desired properties of the devices device twin are updated. 238 This method should have the following signature: 239 def device_twin_desired_updated(desired_property_name: str, desired_property_value, desired_version: int) -> None: 240 """ 241 self._on_device_twin_desired_updated = new_on_device_twin_desired_updated 242 243 if self._mqtt is not None: 244 self._mqtt.subscribe_to_twins() 245 246 @property 247 def on_device_twin_reported_updated(self): 248 """A callback method that is called when the reported properties of the devices device twin are updated. 249 This method should have the following signature: 250 def device_twin_reported_updated(reported_property_name: str, reported_property_value, reported_version: int) -> None: 251 """ 252 return self._on_device_twin_reported_updated 253 254 @on_device_twin_reported_updated.setter 255 def on_device_twin_reported_updated(self, new_on_device_twin_reported_updated): 256 """A callback method that is called when the reported properties of the devices device twin are updated. 257 This method should have the following signature: 258 def device_twin_reported_updated(reported_property_name: str, reported_property_value, reported_version: int) -> None: 259 """ 260 self._on_device_twin_reported_updated = new_on_device_twin_reported_updated 261 262 if self._mqtt is not None: 263 self._mqtt.subscribe_to_twins() 264 265 def connect(self) -> None: 266 """Connects to Azure IoT Hub 267 :raises RuntimeError: if the internet connection is not responding or is unable to connect 268 """ 269 self._mqtt = IoTMQTT( 270 self, self._socket, self._iface, self._hostname, self._device_id, self._shared_access_key, self._token_expires, self._logger 271 ) 272 self._mqtt.connect() 273 274 if self._on_device_twin_desired_updated is not None or self._on_device_twin_reported_updated is not None: 275 self._mqtt.subscribe_to_twins() 276 277 def disconnect(self) -> None: 278 """Disconnects from the MQTT broker 279 :raises IoTError: if there is no open connection to the MQTT broker 280 """ 281 if self._mqtt is None: 282 raise IoTError("You are not connected to IoT Central") 283 284 self._mqtt.disconnect() 285 286 def reconnect(self) -> None: 287 """Reconnects to the MQTT broker 288 """ 289 if self._mqtt is None: 290 raise IoTError("You are not connected to IoT Central") 291 292 self._mqtt.reconnect() 293 294 def is_connected(self) -> bool: 295 """Gets if there is an open connection to the MQTT broker 296 :returns: True if there is an open connection, False if not 297 :rtype: bool 298 """ 299 if self._mqtt is not None: 300 return self._mqtt.is_connected() 301 302 return False 303 304 def loop(self) -> None: 305 """Listens for MQTT messages 306 :raises IoTError: if there is no open connection to the MQTT broker 307 """ 308 if self._mqtt is None: 309 raise IoTError("You are not connected to IoT Central") 310 311 self._mqtt.loop() 312 313 def send_device_to_cloud_message(self, message, system_properties=None) -> None: 314 """Send a device to cloud message from this device to Azure IoT Hub 315 :param message: The message data as a JSON string or a dictionary 316 :param system_properties: System properties to send with the message 317 :raises: ValueError if the message is not a string or dictionary 318 :raises RuntimeError: if the internet connection is not responding or is unable to connect 319 """ 320 if self._mqtt is None: 321 raise IoTError("You are not connected to IoT Central") 322 323 self._mqtt.send_device_to_cloud_message(message, system_properties) 324 325 def update_twin(self, patch) -> None: 326 """Updates the reported properties in the devices device twin 327 :param patch: The JSON patch to apply to the device twin reported properties 328 """ 329 if self._mqtt is None: 330 raise IoTError("You are not connected to IoT Central") 331 332 if isinstance(patch, dict): 333 patch = json.dumps(patch) 334 335 self._mqtt.send_twin_patch(patch)