/ adafruit_minimqtt / adafruit_minimqtt.py
adafruit_minimqtt.py
1 # The MIT License (MIT) 2 # 3 # Copyright (c) 2019 Brent Rubell for Adafruit Industries 4 # 5 # Original Work Copyright (c) 2016 Paul Sokolovsky, uMQTT 6 # Modified Work Copyright (c) 2019 Bradley Beach, esp32spi_mqtt 7 # Modified Work Copyright (c) 2012-2019 Roger Light and others, Paho MQTT Python 8 # 9 # Permission is hereby granted, free of charge, to any person obtaining a copy 10 # of this software and associated documentation files (the "Software"), to deal 11 # in the Software without restriction, including without limitation the rights 12 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 13 # copies of the Software, and to permit persons to whom the Software is 14 # furnished to do so, subject to the following conditions: 15 # 16 # The above copyright notice and this permission notice shall be included in 17 # all copies or substantial portions of the Software. 18 # 19 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 20 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 21 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 22 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 23 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 24 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 25 # THE SOFTWARE. 26 """ 27 `adafruit_minimqtt` 28 ================================================================================ 29 30 MQTT Library for CircuitPython. 31 32 * Author(s): Brent Rubell 33 34 Implementation Notes 35 -------------------- 36 37 **Software and Dependencies:** 38 39 * Adafruit CircuitPython firmware for the supported boards: 40 https://github.com/adafruit/circuitpython/releases 41 42 """ 43 import struct 44 import time 45 from random import randint 46 from micropython import const 47 import adafruit_logging as logging 48 from .matcher import MQTTMatcher 49 50 __version__ = "0.0.0-auto.0" 51 __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git" 52 53 # Client-specific variables 54 MQTT_MSG_MAX_SZ = const(268435455) 55 MQTT_MSG_SZ_LIM = const(10000000) 56 MQTT_TOPIC_LENGTH_LIMIT = const(65535) 57 MQTT_TCP_PORT = const(1883) 58 MQTT_TLS_PORT = const(8883) 59 60 # MQTT Commands 61 MQTT_PINGREQ = b"\xc0\0" 62 MQTT_PINGRESP = const(0xD0) 63 MQTT_SUB = b"\x82" 64 MQTT_UNSUB = b"\xA2" 65 MQTT_DISCONNECT = b"\xe0\0" 66 67 # Variable CONNECT header [MQTT 3.1.2] 68 MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0") 69 70 71 CONNACK_ERRORS = { 72 const(0x01): "Connection Refused - Incorrect Protocol Version", 73 const(0x02): "Connection Refused - ID Rejected", 74 const(0x03): "Connection Refused - Server unavailable", 75 const(0x04): "Connection Refused - Incorrect username/password", 76 const(0x05): "Connection Refused - Unauthorized", 77 } 78 79 80 _the_interface = None # pylint: disable=invalid-name 81 _the_sock = None # pylint: disable=invalid-name 82 83 84 class MMQTTException(Exception): 85 """MiniMQTT Exception class.""" 86 87 # pylint: disable=unnecessary-pass 88 # pass 89 90 91 def set_socket(sock, iface=None): 92 """Helper to set the global socket and optionally set the global network interface. 93 94 :param sock: socket object. 95 :param iface: internet interface object 96 """ 97 global _the_sock # pylint: disable=invalid-name, global-statement 98 _the_sock = sock 99 if iface: 100 global _the_interface # pylint: disable=invalid-name, global-statement 101 _the_interface = iface 102 _the_sock.set_interface(iface) 103 104 105 class MQTT: 106 """MQTT Client for CircuitPython 107 108 :param str broker: MQTT Broker URL or IP Address. 109 :param int port: Optional port definition, defaults to 8883. 110 :param str username: Username for broker authentication. 111 :param str password: Password for broker authentication. 112 :param network_manager: NetworkManager object, such as WiFiManager from ESPSPI_WiFiManager. 113 :param str client_id: Optional client identifier, defaults to a unique, generated string. 114 :param bool is_ssl: Sets a secure or insecure connection with the broker. 115 :param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO. 116 :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client. 117 118 """ 119 120 # pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member 121 def __init__( 122 self, 123 broker, 124 port=None, 125 username=None, 126 password=None, 127 client_id=None, 128 is_ssl=True, 129 log=False, 130 keep_alive=60, 131 ): 132 self._sock = None 133 self.broker = broker 134 # port/ssl 135 self.port = MQTT_TCP_PORT 136 if is_ssl: 137 self.port = MQTT_TLS_PORT 138 if port is not None: 139 self.port = port 140 # session identifiers 141 self.user = username 142 # [MQTT-3.1.3.5] 143 self.password = password 144 if ( 145 self.password is not None 146 and len(password.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT 147 ): 148 raise MMQTTException("Password length is too large.") 149 if client_id is not None: 150 # user-defined client_id MAY allow client_id's > 23 bytes or 151 # non-alpha-numeric characters 152 self.client_id = client_id 153 else: 154 # assign a unique client_id 155 self.client_id = "cpy{0}{1}".format( 156 randint(0, int(time.monotonic() * 100) % 1000), randint(0, 99) 157 ) 158 # generated client_id's enforce spec.'s length rules 159 if len(self.client_id) > 23 or not self.client_id: 160 raise ValueError("MQTT Client ID must be between 1 and 23 bytes") 161 self.keep_alive = keep_alive 162 self.user_data = None 163 self.logger = None 164 if log is True: 165 self.logger = logging.getLogger("log") 166 self.logger.setLevel(logging.INFO) 167 self._sock = None 168 self._is_connected = False 169 self._msg_size_lim = MQTT_MSG_SZ_LIM 170 self._pid = 0 171 self._timestamp = 0 172 # LWT 173 self._lw_topic = None 174 self._lw_qos = 0 175 self._lw_topic = None 176 self._lw_msg = None 177 self._lw_retain = False 178 # List of subscribed topics, used for tracking 179 self._subscribed_topics = [] 180 self._on_message_filtered = MQTTMatcher() 181 # Server callbacks 182 self._on_message = None 183 self.on_connect = None 184 self.on_disconnect = None 185 self.on_publish = None 186 self.on_subscribe = None 187 self.on_unsubscribe = None 188 189 def __enter__(self): 190 return self 191 192 def __exit__(self, exception_type, exception_value, traceback): 193 self.deinit() 194 195 def deinit(self): 196 """De-initializes the MQTT client and disconnects from the mqtt broker.""" 197 self.disconnect() 198 199 def will_set(self, topic=None, payload=None, qos=0, retain=False): 200 """Sets the last will and testament properties. MUST be called before `connect()`. 201 202 :param str topic: MQTT Broker topic. 203 :param int,float,str payload: Last will disconnection payload. 204 payloads of type int & float are converted to a string. 205 :param int qos: Quality of Service level, defaults to 206 zero. Conventional options are ``0`` (send at most once), ``1`` 207 (send at least once), or ``2`` (send exactly once). 208 209 .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library. 210 :param bool retain: Specifies if the payload is to be retained when 211 it is published. 212 """ 213 if self.logger is not None: 214 self.logger.debug("Setting last will properties") 215 self._check_qos(qos) 216 if self._is_connected: 217 raise MMQTTException("Last Will should only be called before connect().") 218 if payload is None: 219 payload = "" 220 if isinstance(payload, (int, float, str)): 221 payload = str(payload).encode() 222 else: 223 raise MMQTTException("Invalid message data type.") 224 self._lw_qos = qos 225 self._lw_topic = topic 226 self._lw_msg = payload 227 self._lw_retain = retain 228 229 def add_topic_callback(self, mqtt_topic, callback_method): 230 """Registers a callback_method for a specific MQTT topic. 231 232 :param str mqtt_topic: MQTT topic identifier. 233 :param str callback_method: Name of callback method. 234 """ 235 if mqtt_topic is None or callback_method is None: 236 raise ValueError("MQTT topic and callback method must both be defined.") 237 self._on_message_filtered[mqtt_topic] = callback_method 238 239 def remove_topic_callback(self, mqtt_topic): 240 """Removes a registered callback method. 241 242 :param str mqtt_topic: MQTT topic identifier string. 243 """ 244 if mqtt_topic is None: 245 raise ValueError("MQTT Topic must be defined.") 246 try: 247 del self._on_message_filtered[mqtt_topic] 248 except KeyError: 249 raise KeyError("MQTT topic callback not added with add_topic_callback.") 250 251 @property 252 def on_message(self): 253 """Called when a new message has been received on a subscribed topic. 254 255 Expected method signature is ``on_message(client, topic, message)`` 256 """ 257 return self._on_message 258 259 @on_message.setter 260 def on_message(self, method): 261 self._on_message = method 262 263 def _handle_on_message(self, client, topic, message): 264 matched = False 265 if topic is not None: 266 for callback in self._on_message_filtered.iter_match(topic): 267 callback(client, topic, message) # on_msg with callback 268 matched = True 269 270 if not matched and self.on_message: # regular on_message 271 self.on_message(client, topic, message) 272 273 # pylint: disable=too-many-branches, too-many-statements, too-many-locals 274 def connect(self, clean_session=True): 275 """Initiates connection with the MQTT Broker. 276 277 :param bool clean_session: Establishes a persistent session. 278 """ 279 self._sock = _the_sock.socket() 280 self._sock.settimeout(15) 281 if self.port == 8883: 282 try: 283 if self.logger is not None: 284 self.logger.debug( 285 "Attempting to establish secure MQTT connection..." 286 ) 287 conntype = _the_interface.TLS_MODE 288 self._sock.connect((self.broker, self.port), conntype) 289 except RuntimeError as e: 290 raise MMQTTException("Invalid broker address defined.", e) 291 else: 292 try: 293 if self.logger is not None: 294 self.logger.debug( 295 "Attempting to establish insecure MQTT connection..." 296 ) 297 addr = _the_sock.getaddrinfo( 298 self.broker, self.port, 0, _the_sock.SOCK_STREAM 299 )[0] 300 self._sock.connect(addr[-1], _the_interface.TCP_MODE) 301 except RuntimeError as e: 302 raise MMQTTException("Invalid broker address defined.", e) 303 304 # Fixed Header 305 fixed_header = bytearray([0x10]) 306 307 # NOTE: Variable header is 308 # MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0") 309 # because final 4 bytes are 4, 2, 0, 0 310 # Variable Header 311 var_header = MQTT_HDR_CONNECT 312 var_header[6] = clean_session << 1 313 314 # Set up variable header and remaining_length 315 remaining_length = 12 + len(self.client_id) 316 if self.user is not None: 317 remaining_length += 2 + len(self.user) + 2 + len(self.password) 318 var_header[6] |= 0xC0 319 if self.keep_alive: 320 assert self.keep_alive < MQTT_TOPIC_LENGTH_LIMIT 321 var_header[7] |= self.keep_alive >> 8 322 var_header[8] |= self.keep_alive & 0x00FF 323 if self._lw_topic: 324 remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) 325 var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 326 var_header[6] |= self._lw_retain << 5 327 328 # Remaining length calculation 329 large_rel_length = False 330 if remaining_length > 0x7F: 331 large_rel_length = True 332 # Calculate Remaining Length [2.2.3] 333 while remaining_length > 0: 334 encoded_byte = remaining_length % 0x80 335 remaining_length = remaining_length // 0x80 336 # if there is more data to encode, set the top bit of the byte 337 if remaining_length > 0: 338 encoded_byte |= 0x80 339 fixed_header.append(encoded_byte) 340 if large_rel_length: 341 fixed_header.append(0x00) 342 else: 343 fixed_header.append(remaining_length) 344 fixed_header.append(0x00) 345 346 if self.logger is not None: 347 self.logger.debug("Sending CONNECT to broker") 348 self.logger.debug( 349 "Fixed Header: {}\nVariable Header: {}".format(fixed_header, var_header) 350 ) 351 self._sock.send(fixed_header) 352 self._sock.send(var_header) 353 # [MQTT-3.1.3-4] 354 self._send_str(self.client_id) 355 if self._lw_topic: 356 # [MQTT-3.1.3-11] 357 self._send_str(self._lw_topic) 358 self._send_str(self._lw_msg) 359 if self.user is None: 360 self.user = None 361 else: 362 self._send_str(self.user) 363 self._send_str(self.password) 364 if self.logger is not None: 365 self.logger.debug("Receiving CONNACK packet from broker") 366 while True: 367 op = self._wait_for_msg() 368 if op == 32: 369 rc = self._sock.recv(3) 370 assert rc[0] == 0x02 371 if rc[2] != 0x00: 372 raise MMQTTException(CONNACK_ERRORS[rc[2]]) 373 self._is_connected = True 374 result = rc[0] & 1 375 if self.on_connect is not None: 376 self.on_connect(self, self.user_data, result, rc[2]) 377 return result 378 379 def disconnect(self): 380 """Disconnects the MiniMQTT client from the MQTT broker. 381 """ 382 self.is_connected() 383 if self.logger is not None: 384 self.logger.debug("Sending DISCONNECT packet to broker") 385 self._sock.send(MQTT_DISCONNECT) 386 if self.logger is not None: 387 self.logger.debug("Closing socket") 388 self._sock.close() 389 self._is_connected = False 390 self._subscribed_topics = None 391 if self.on_disconnect is not None: 392 self.on_disconnect(self, self.user_data, 0) 393 394 def ping(self): 395 """Pings the MQTT Broker to confirm if the broker is alive or if 396 there is an active network connection. 397 """ 398 self.is_connected() 399 if self.logger is not None: 400 self.logger.debug("Sending PINGREQ") 401 self._sock.send(MQTT_PINGREQ) 402 if self.logger is not None: 403 self.logger.debug("Checking PINGRESP") 404 while True: 405 op = self._wait_for_msg(0.5) 406 if op == 208: 407 ping_resp = self._sock.recv(2) 408 if ping_resp[0] != 0x00: 409 raise MMQTTException("PINGRESP not returned from broker.") 410 return 411 412 # pylint: disable=too-many-branches, too-many-statements 413 def publish(self, topic, msg, retain=False, qos=0): 414 """Publishes a message to a topic provided. 415 416 :param str topic: Unique topic identifier. 417 :param str,int,float msg: Data to send to the broker. 418 :param bool retain: Whether the message is saved by the broker. 419 :param int qos: Quality of Service level for the message, defaults to 420 zero. Conventional options are ``0`` (send at most once), ``1`` 421 (send at least once), or ``2`` (send exactly once). 422 423 .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library. 424 425 Example of sending an integer, 3, to the broker on topic 'piVal'. 426 427 .. code-block:: python 428 429 mqtt_client.publish('topics/piVal', 3) 430 431 Example of sending a float, 3.14, to the broker on topic 'piVal'. 432 433 .. code-block:: python 434 435 mqtt_client.publish('topics/piVal', 3.14) 436 437 Example of sending a string, 'threepointonefour', to the broker on topic piVal. 438 439 .. code-block:: python 440 441 mqtt_client.publish('topics/piVal', 'threepointonefour') 442 """ 443 self.is_connected() 444 self._check_topic(topic) 445 if "+" in topic or "#" in topic: 446 raise MMQTTException("Publish topic can not contain wildcards.") 447 # check msg/qos kwargs 448 if msg is None: 449 raise MMQTTException("Message can not be None.") 450 if isinstance(msg, (int, float)): 451 msg = str(msg).encode("ascii") 452 elif isinstance(msg, str): 453 msg = str(msg).encode("utf-8") 454 else: 455 raise MMQTTException("Invalid message data type.") 456 if len(msg) > MQTT_MSG_MAX_SZ: 457 raise MMQTTException("Message size larger than %d bytes." % MQTT_MSG_MAX_SZ) 458 assert ( 459 0 <= qos <= 1 460 ), "Quality of Service Level 2 is unsupported by this library." 461 462 # fixed header. [3.3.1.2], [3.3.1.3] 463 pub_hdr_fixed = bytearray([0x30 | retain | qos << 1]) 464 465 # variable header = 2-byte Topic length (big endian) 466 pub_hdr_var = bytearray(struct.pack(">H", len(topic))) 467 pub_hdr_var.extend(topic.encode("utf-8")) # Topic name 468 469 remaining_length = 2 + len(msg) + len(topic) 470 if qos > 0: 471 # packet identifier where QoS level is 1 or 2. [3.3.2.2] 472 pid = self._pid 473 remaining_length += 2 474 pub_hdr_var.append(0x00) 475 pub_hdr_var.append(self._pid) 476 self._pid += 1 477 478 # Calculate remaining length [2.2.3] 479 if remaining_length > 0x7F: 480 while remaining_length > 0: 481 encoded_byte = remaining_length % 0x80 482 remaining_length = remaining_length // 0x80 483 if remaining_length > 0: 484 encoded_byte |= 0x80 485 pub_hdr_fixed.append(encoded_byte) 486 else: 487 pub_hdr_fixed.append(remaining_length) 488 489 if self.logger is not None: 490 self.logger.debug( 491 "Sending PUBLISH\nTopic: {0}\nMsg: {1}\ 492 \nQoS: {2}\nRetain? {3}".format( 493 topic, msg, qos, retain 494 ) 495 ) 496 self._sock.send(pub_hdr_fixed) 497 self._sock.send(pub_hdr_var) 498 self._sock.send(msg) 499 if qos == 0 and self.on_publish is not None: 500 self.on_publish(self, self.user_data, topic, self._pid) 501 if qos == 1: 502 while True: 503 op = self._wait_for_msg() 504 if op == 0x40: 505 sz = self._sock.recv(1) 506 assert sz == b"\x02" 507 rcv_pid = self._sock.recv(2) 508 rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1] 509 if pid == rcv_pid: 510 if self.on_publish is not None: 511 self.on_publish(self, self.user_data, topic, rcv_pid) 512 return 513 514 def subscribe(self, topic, qos=0): 515 """Subscribes to a topic on the MQTT Broker. 516 This method can subscribe to one topics or multiple topics. 517 518 :param str,tuple,list topic: Unique MQTT topic identifier string. If 519 this is a `tuple`, then the tuple should contain topic identifier 520 string and qos level integer. If this is a `list`, then each list 521 element should be a tuple containing a topic identifier string and 522 qos level integer. 523 :param int qos: Quality of Service level for the topic, defaults to 524 zero. Conventional options are ``0`` (send at most once), ``1`` 525 (send at least once), or ``2`` (send exactly once). 526 527 .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library. 528 529 Example of subscribing a topic string. 530 531 .. code-block:: python 532 533 mqtt_client.subscribe('topics/ledState') 534 535 Example of subscribing to a topic and setting the qos level to 1. 536 537 .. code-block:: python 538 539 mqtt_client.subscribe('topics/ledState', 1) 540 541 Example of subscribing to topic string and setting qos level to 1, as a tuple. 542 543 .. code-block:: python 544 545 mqtt_client.subscribe(('topics/ledState', 1)) 546 547 Example of subscribing to multiple topics with different qos levels. 548 549 .. code-block:: python 550 551 mqtt_client.subscribe([('topics/ledState', 1), ('topics/servoAngle', 0)]) 552 553 """ 554 self.is_connected() 555 topics = None 556 if isinstance(topic, tuple): 557 topic, qos = topic 558 self._check_topic(topic) 559 self._check_qos(qos) 560 if isinstance(topic, str): 561 self._check_topic(topic) 562 self._check_qos(qos) 563 topics = [(topic, qos)] 564 if isinstance(topic, list): 565 topics = [] 566 for t, q in topic: 567 self._check_qos(q) 568 self._check_topic(t) 569 topics.append((t, q)) 570 # Assemble packet 571 packet_length = 2 + (2 * len(topics)) + (1 * len(topics)) 572 packet_length += sum(len(topic) for topic, qos in topics) 573 packet_length_byte = packet_length.to_bytes(1, "big") 574 self._pid += 1 575 packet_id_bytes = self._pid.to_bytes(2, "big") 576 # Packet with variable and fixed headers 577 packet = MQTT_SUB + packet_length_byte + packet_id_bytes 578 # attaching topic and QOS level to the packet 579 for t, q in topics: 580 topic_size = len(t).to_bytes(2, "big") 581 qos_byte = q.to_bytes(1, "big") 582 packet += topic_size + t + qos_byte 583 if self.logger is not None: 584 for t, q in topics: 585 self.logger.debug("SUBSCRIBING to topic {0} with QoS {1}".format(t, q)) 586 self._sock.send(packet) 587 while True: 588 op = self._wait_for_msg() 589 if op == 0x90: 590 rc = self._sock.recv(4) 591 assert rc[1] == packet[2] and rc[2] == packet[3] 592 if rc[3] == 0x80: 593 raise MMQTTException("SUBACK Failure!") 594 for t, q in topics: 595 if self.on_subscribe is not None: 596 self.on_subscribe(self, self.user_data, t, q) 597 self._subscribed_topics.append(t) 598 return 599 600 def unsubscribe(self, topic): 601 """Unsubscribes from a MQTT topic. 602 603 :param str,list topic: Unique MQTT topic identifier string or a list 604 of tuples, where each tuple contains an MQTT topic identier 605 string. 606 607 Example of unsubscribing from a topic string. 608 609 .. code-block:: python 610 611 mqtt_client.unsubscribe('topics/ledState') 612 613 Example of unsubscribing from multiple topics. 614 615 .. code-block:: python 616 617 mqtt_client.unsubscribe([('topics/ledState'), ('topics/servoAngle')]) 618 619 """ 620 topics = None 621 if isinstance(topic, str): 622 self._check_topic(topic) 623 topics = [(topic)] 624 if isinstance(topic, list): 625 topics = [] 626 for t in topic: 627 self._check_topic(t) 628 topics.append((t)) 629 for t in topics: 630 if t not in self._subscribed_topics: 631 raise MMQTTException( 632 "Topic must be subscribed to before attempting unsubscribe." 633 ) 634 # Assemble packet 635 packet_length = 2 + (2 * len(topics)) 636 packet_length += sum(len(topic) for topic in topics) 637 packet_length_byte = packet_length.to_bytes(1, "big") 638 self._pid += 1 639 packet_id_bytes = self._pid.to_bytes(2, "big") 640 packet = MQTT_UNSUB + packet_length_byte + packet_id_bytes 641 for t in topics: 642 topic_size = len(t).to_bytes(2, "big") 643 packet += topic_size + t 644 if self.logger is not None: 645 for t in topics: 646 self.logger.debug("UNSUBSCRIBING from topic {0}.".format(t)) 647 self._sock.send(packet) 648 if self.logger is not None: 649 self.logger.debug("Waiting for UNSUBACK...") 650 while True: 651 op = self._wait_for_msg() 652 if op == 176: 653 return_code = self._sock.recv(3) 654 assert return_code[0] == 0x02 655 # [MQTT-3.32] 656 assert ( 657 return_code[1] == packet_id_bytes[0] 658 and return_code[2] == packet_id_bytes[1] 659 ) 660 for t in topics: 661 if self.on_unsubscribe is not None: 662 self.on_unsubscribe(self, self.user_data, t, self._pid) 663 self._subscribed_topics.remove(t) 664 return 665 666 def reconnect(self, resub_topics=True): 667 """Attempts to reconnect to the MQTT broker. 668 669 :param bool resub_topics: Resubscribe to previously subscribed topics. 670 """ 671 if self.logger is not None: 672 self.logger.debug("Attempting to reconnect with MQTT broker") 673 self.connect() 674 if self.logger is not None: 675 self.logger.debug("Reconnected with broker") 676 if resub_topics: 677 if self.logger is not None: 678 self.logger.debug( 679 "Attempting to resubscribe to previously subscribed topics." 680 ) 681 subscribed_topics = self._subscribed_topics.copy() 682 self._subscribed_topics = [] 683 while subscribed_topics: 684 feed = subscribed_topics.pop() 685 self.subscribe(feed) 686 687 def loop(self): 688 """Non-blocking message loop. Use this method to 689 check incoming subscription messages. 690 """ 691 if self._timestamp == 0: 692 self._timestamp = time.monotonic() 693 current_time = time.monotonic() 694 if current_time - self._timestamp >= self.keep_alive: 695 # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server 696 if self.logger is not None: 697 self.logger.debug( 698 "KeepAlive period elapsed - \ 699 requesting a PINGRESP from the server..." 700 ) 701 self.ping() 702 self._timestamp = 0 703 self._sock.settimeout(0.1) 704 return self._wait_for_msg() 705 706 def _wait_for_msg(self, timeout=30): 707 """Reads and processes network events. 708 Returns response code if successful. 709 """ 710 res = self._sock.recv(1) 711 self._sock.settimeout(timeout) 712 if res in [None, b""]: 713 return None 714 if res == MQTT_PINGRESP: 715 sz = self._sock.recv(1)[0] 716 assert sz == 0 717 return None 718 if res[0] & 0xF0 != 0x30: 719 return res[0] 720 sz = self._recv_len() 721 topic_len = self._sock.recv(2) 722 topic_len = (topic_len[0] << 8) | topic_len[1] 723 topic = self._sock.recv(topic_len) 724 topic = str(topic, "utf-8") 725 sz -= topic_len + 2 726 if res[0] & 0x06: 727 pid = self._sock.recv(2) 728 pid = pid[0] << 0x08 | pid[1] 729 sz -= 0x02 730 msg = self._sock.recv(sz) 731 self._handle_on_message(self, topic, str(msg, "utf-8")) 732 if res[0] & 0x06 == 0x02: 733 pkt = bytearray(b"\x40\x02\0\0") 734 struct.pack_into("!H", pkt, 2, pid) 735 self._sock.send(pkt) 736 elif res[0] & 6 == 4: 737 assert 0 738 return res[0] 739 740 def _recv_len(self): 741 n = 0 742 sh = 0 743 while True: 744 b = self._sock.recv(1)[0] 745 n |= (b & 0x7F) << sh 746 if not b & 0x80: 747 return n 748 sh += 7 749 750 def _send_str(self, string): 751 """Packs and encodes a string to a socket. 752 753 :param str string: String to write to the socket. 754 """ 755 self._sock.send(struct.pack("!H", len(string))) 756 if isinstance(string, str): 757 self._sock.send(str.encode(string, "utf-8")) 758 else: 759 self._sock.send(string) 760 761 @staticmethod 762 def _check_topic(topic): 763 """Checks if topic provided is a valid mqtt topic. 764 765 :param str topic: Topic identifier 766 """ 767 if topic is None: 768 raise MMQTTException("Topic may not be NoneType") 769 # [MQTT-4.7.3-1] 770 if not topic: 771 raise MMQTTException("Topic may not be empty.") 772 # [MQTT-4.7.3-3] 773 if len(topic.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT: 774 raise MMQTTException("Topic length is too large.") 775 776 @staticmethod 777 def _check_qos(qos_level): 778 """Validates the quality of service level. 779 780 :param int qos_level: Desired QoS level. 781 """ 782 if isinstance(qos_level, int): 783 if qos_level < 0 or qos_level > 2: 784 raise MMQTTException("QoS must be between 1 and 2.") 785 else: 786 raise MMQTTException("QoS must be an integer.") 787 788 def _set_interface(self): 789 """Sets a desired network hardware interface. 790 The network hardware must be set in init 791 prior to calling this method. 792 """ 793 if self._wifi: 794 self._socket.set_interface(self._wifi.esp) 795 else: 796 raise TypeError("Network Manager Required.") 797 798 def is_connected(self): 799 """Returns MQTT client session status as True if connected, raises 800 a `MMQTTException` if `False`. 801 """ 802 if self._sock is None or self._is_connected is False: 803 raise MMQTTException("MiniMQTT is not connected.") 804 return self._is_connected 805 806 @property 807 def mqtt_msg(self): 808 """Returns maximum MQTT payload and topic size.""" 809 return self._msg_size_lim, MQTT_TOPIC_LENGTH_LIMIT 810 811 @mqtt_msg.setter 812 def mqtt_msg(self, msg_size): 813 """Sets the maximum MQTT message payload size. 814 815 :param int msg_size: Maximum MQTT payload size. 816 """ 817 if msg_size < MQTT_MSG_MAX_SZ: 818 self._msg_size_lim = msg_size 819 820 ### Logging ### 821 def attach_logger(self, logger_name="log"): 822 """Initializes and attaches a logger to the MQTTClient. 823 824 :param str logger_name: Name of the logger instance 825 """ 826 self.logger = logging.getLogger(logger_name) 827 self.logger.setLevel(logging.INFO) 828 829 def set_logger_level(self, log_level): 830 """Sets the level of the logger, if defined during init. 831 832 :param str log_level: Level of logging to output to the REPL. 833 Acceptable options are ``DEBUG``, ``INFO``, ``WARNING``, or 834 ``ERROR``. 835 """ 836 if self.logger is None: 837 raise MMQTTException( 838 "No logger attached - did you create it during initialization?" 839 ) 840 if log_level == "DEBUG": 841 self.logger.setLevel(logging.DEBUG) 842 elif log_level == "INFO": 843 self.logger.setLevel(logging.INFO) 844 elif log_level == "WARNING": 845 self.logger.setLevel(logging.WARNING) 846 elif log_level == "ERROR": 847 self.logger.setLevel(logging.CRITICIAL) 848 else: 849 raise MMQTTException("Incorrect logging level provided!")