/ adafruit_minimqtt / adafruit_minimqtt.py
adafruit_minimqtt.py
  1  # The MIT License (MIT)
  2  #
  3  # Copyright (c) 2019 Brent Rubell for Adafruit Industries
  4  #
  5  # Original Work Copyright (c) 2016 Paul Sokolovsky, uMQTT
  6  # Modified Work Copyright (c) 2019 Bradley Beach, esp32spi_mqtt
  7  # Modified Work Copyright (c) 2012-2019 Roger Light and others, Paho MQTT Python
  8  #
  9  # Permission is hereby granted, free of charge, to any person obtaining a copy
 10  # of this software and associated documentation files (the "Software"), to deal
 11  # in the Software without restriction, including without limitation the rights
 12  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 13  # copies of the Software, and to permit persons to whom the Software is
 14  # furnished to do so, subject to the following conditions:
 15  #
 16  # The above copyright notice and this permission notice shall be included in
 17  # all copies or substantial portions of the Software.
 18  #
 19  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 20  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 21  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 22  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 23  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 24  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 25  # THE SOFTWARE.
 26  """
 27  `adafruit_minimqtt`
 28  ================================================================================
 29  
 30  MQTT Library for CircuitPython.
 31  
 32  * Author(s): Brent Rubell
 33  
 34  Implementation Notes
 35  --------------------
 36  
 37  **Software and Dependencies:**
 38  
 39  * Adafruit CircuitPython firmware for the supported boards:
 40    https://github.com/adafruit/circuitpython/releases
 41  
 42  """
 43  import struct
 44  import time
 45  from random import randint
 46  from micropython import const
 47  import adafruit_logging as logging
 48  from .matcher import MQTTMatcher
 49  
 50  __version__ = "0.0.0-auto.0"
 51  __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git"
 52  
 53  # Client-specific variables
 54  MQTT_MSG_MAX_SZ = const(268435455)
 55  MQTT_MSG_SZ_LIM = const(10000000)
 56  MQTT_TOPIC_LENGTH_LIMIT = const(65535)
 57  MQTT_TCP_PORT = const(1883)
 58  MQTT_TLS_PORT = const(8883)
 59  
 60  # MQTT Commands
 61  MQTT_PINGREQ = b"\xc0\0"
 62  MQTT_PINGRESP = const(0xD0)
 63  MQTT_SUB = b"\x82"
 64  MQTT_UNSUB = b"\xA2"
 65  MQTT_DISCONNECT = b"\xe0\0"
 66  
 67  # Variable CONNECT header [MQTT 3.1.2]
 68  MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0")
 69  
 70  
 71  CONNACK_ERRORS = {
 72      const(0x01): "Connection Refused - Incorrect Protocol Version",
 73      const(0x02): "Connection Refused - ID Rejected",
 74      const(0x03): "Connection Refused - Server unavailable",
 75      const(0x04): "Connection Refused - Incorrect username/password",
 76      const(0x05): "Connection Refused - Unauthorized",
 77  }
 78  
 79  
 80  _the_interface = None  # pylint: disable=invalid-name
 81  _the_sock = None  # pylint: disable=invalid-name
 82  
 83  
 84  class MMQTTException(Exception):
 85      """MiniMQTT Exception class."""
 86  
 87      # pylint: disable=unnecessary-pass
 88      # pass
 89  
 90  
 91  def set_socket(sock, iface=None):
 92      """Helper to set the global socket and optionally set the global network interface.
 93  
 94      :param sock: socket object.
 95      :param iface: internet interface object
 96      """
 97      global _the_sock  # pylint: disable=invalid-name, global-statement
 98      _the_sock = sock
 99      if iface:
100          global _the_interface  # pylint: disable=invalid-name, global-statement
101          _the_interface = iface
102          _the_sock.set_interface(iface)
103  
104  
105  class MQTT:
106      """MQTT Client for CircuitPython
107  
108      :param str broker: MQTT Broker URL or IP Address.
109      :param int port: Optional port definition, defaults to 8883.
110      :param str username: Username for broker authentication.
111      :param str password: Password for broker authentication.
112      :param network_manager: NetworkManager object, such as WiFiManager from ESPSPI_WiFiManager.
113      :param str client_id: Optional client identifier, defaults to a unique, generated string.
114      :param bool is_ssl: Sets a secure or insecure connection with the broker.
115      :param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO.
116      :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
117  
118      """
119  
120      # pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member
121      def __init__(
122          self,
123          broker,
124          port=None,
125          username=None,
126          password=None,
127          client_id=None,
128          is_ssl=True,
129          log=False,
130          keep_alive=60,
131      ):
132          self._sock = None
133          self.broker = broker
134          # port/ssl
135          self.port = MQTT_TCP_PORT
136          if is_ssl:
137              self.port = MQTT_TLS_PORT
138          if port is not None:
139              self.port = port
140          # session identifiers
141          self.user = username
142          # [MQTT-3.1.3.5]
143          self.password = password
144          if (
145              self.password is not None
146              and len(password.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT
147          ):
148              raise MMQTTException("Password length is too large.")
149          if client_id is not None:
150              # user-defined client_id MAY allow client_id's > 23 bytes or
151              # non-alpha-numeric characters
152              self.client_id = client_id
153          else:
154              # assign a unique client_id
155              self.client_id = "cpy{0}{1}".format(
156                  randint(0, int(time.monotonic() * 100) % 1000), randint(0, 99)
157              )
158              # generated client_id's enforce spec.'s length rules
159              if len(self.client_id) > 23 or not self.client_id:
160                  raise ValueError("MQTT Client ID must be between 1 and 23 bytes")
161          self.keep_alive = keep_alive
162          self.user_data = None
163          self.logger = None
164          if log is True:
165              self.logger = logging.getLogger("log")
166              self.logger.setLevel(logging.INFO)
167          self._sock = None
168          self._is_connected = False
169          self._msg_size_lim = MQTT_MSG_SZ_LIM
170          self._pid = 0
171          self._timestamp = 0
172          # LWT
173          self._lw_topic = None
174          self._lw_qos = 0
175          self._lw_topic = None
176          self._lw_msg = None
177          self._lw_retain = False
178          # List of subscribed topics, used for tracking
179          self._subscribed_topics = []
180          self._on_message_filtered = MQTTMatcher()
181          # Server callbacks
182          self._on_message = None
183          self.on_connect = None
184          self.on_disconnect = None
185          self.on_publish = None
186          self.on_subscribe = None
187          self.on_unsubscribe = None
188  
189      def __enter__(self):
190          return self
191  
192      def __exit__(self, exception_type, exception_value, traceback):
193          self.deinit()
194  
195      def deinit(self):
196          """De-initializes the MQTT client and disconnects from the mqtt broker."""
197          self.disconnect()
198  
199      def will_set(self, topic=None, payload=None, qos=0, retain=False):
200          """Sets the last will and testament properties. MUST be called before `connect()`.
201  
202          :param str topic: MQTT Broker topic.
203          :param int,float,str payload: Last will disconnection payload.
204              payloads of type int & float are converted to a string.
205          :param int qos: Quality of Service level, defaults to
206              zero. Conventional options are ``0`` (send at most once), ``1``
207              (send at least once), or ``2`` (send exactly once).
208  
209              .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library.
210          :param bool retain: Specifies if the payload is to be retained when
211              it is published.
212          """
213          if self.logger is not None:
214              self.logger.debug("Setting last will properties")
215          self._check_qos(qos)
216          if self._is_connected:
217              raise MMQTTException("Last Will should only be called before connect().")
218          if payload is None:
219              payload = ""
220          if isinstance(payload, (int, float, str)):
221              payload = str(payload).encode()
222          else:
223              raise MMQTTException("Invalid message data type.")
224          self._lw_qos = qos
225          self._lw_topic = topic
226          self._lw_msg = payload
227          self._lw_retain = retain
228  
229      def add_topic_callback(self, mqtt_topic, callback_method):
230          """Registers a callback_method for a specific MQTT topic.
231  
232          :param str mqtt_topic: MQTT topic identifier.
233          :param str callback_method: Name of callback method.
234          """
235          if mqtt_topic is None or callback_method is None:
236              raise ValueError("MQTT topic and callback method must both be defined.")
237          self._on_message_filtered[mqtt_topic] = callback_method
238  
239      def remove_topic_callback(self, mqtt_topic):
240          """Removes a registered callback method.
241  
242          :param str mqtt_topic: MQTT topic identifier string.
243          """
244          if mqtt_topic is None:
245              raise ValueError("MQTT Topic must be defined.")
246          try:
247              del self._on_message_filtered[mqtt_topic]
248          except KeyError:
249              raise KeyError("MQTT topic callback not added with add_topic_callback.")
250  
251      @property
252      def on_message(self):
253          """Called when a new message has been received on a subscribed topic.
254  
255          Expected method signature is ``on_message(client, topic, message)``
256          """
257          return self._on_message
258  
259      @on_message.setter
260      def on_message(self, method):
261          self._on_message = method
262  
263      def _handle_on_message(self, client, topic, message):
264          matched = False
265          if topic is not None:
266              for callback in self._on_message_filtered.iter_match(topic):
267                  callback(client, topic, message)  # on_msg with callback
268                  matched = True
269  
270          if not matched and self.on_message:  # regular on_message
271              self.on_message(client, topic, message)
272  
273      # pylint: disable=too-many-branches, too-many-statements, too-many-locals
274      def connect(self, clean_session=True):
275          """Initiates connection with the MQTT Broker.
276  
277          :param bool clean_session: Establishes a persistent session.
278          """
279          self._sock = _the_sock.socket()
280          self._sock.settimeout(15)
281          if self.port == 8883:
282              try:
283                  if self.logger is not None:
284                      self.logger.debug(
285                          "Attempting to establish secure MQTT connection..."
286                      )
287                  conntype = _the_interface.TLS_MODE
288                  self._sock.connect((self.broker, self.port), conntype)
289              except RuntimeError as e:
290                  raise MMQTTException("Invalid broker address defined.", e)
291          else:
292              try:
293                  if self.logger is not None:
294                      self.logger.debug(
295                          "Attempting to establish insecure MQTT connection..."
296                      )
297                  addr = _the_sock.getaddrinfo(
298                      self.broker, self.port, 0, _the_sock.SOCK_STREAM
299                  )[0]
300                  self._sock.connect(addr[-1], _the_interface.TCP_MODE)
301              except RuntimeError as e:
302                  raise MMQTTException("Invalid broker address defined.", e)
303  
304          # Fixed Header
305          fixed_header = bytearray([0x10])
306  
307          # NOTE: Variable header is
308          # MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0")
309          # because final 4 bytes are 4, 2, 0, 0
310          # Variable Header
311          var_header = MQTT_HDR_CONNECT
312          var_header[6] = clean_session << 1
313  
314          # Set up variable header and remaining_length
315          remaining_length = 12 + len(self.client_id)
316          if self.user is not None:
317              remaining_length += 2 + len(self.user) + 2 + len(self.password)
318              var_header[6] |= 0xC0
319          if self.keep_alive:
320              assert self.keep_alive < MQTT_TOPIC_LENGTH_LIMIT
321              var_header[7] |= self.keep_alive >> 8
322              var_header[8] |= self.keep_alive & 0x00FF
323          if self._lw_topic:
324              remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
325              var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
326              var_header[6] |= self._lw_retain << 5
327  
328          # Remaining length calculation
329          large_rel_length = False
330          if remaining_length > 0x7F:
331              large_rel_length = True
332              # Calculate Remaining Length [2.2.3]
333              while remaining_length > 0:
334                  encoded_byte = remaining_length % 0x80
335                  remaining_length = remaining_length // 0x80
336                  # if there is more data to encode, set the top bit of the byte
337                  if remaining_length > 0:
338                      encoded_byte |= 0x80
339                  fixed_header.append(encoded_byte)
340          if large_rel_length:
341              fixed_header.append(0x00)
342          else:
343              fixed_header.append(remaining_length)
344              fixed_header.append(0x00)
345  
346          if self.logger is not None:
347              self.logger.debug("Sending CONNECT to broker")
348              self.logger.debug(
349                  "Fixed Header: {}\nVariable Header: {}".format(fixed_header, var_header)
350              )
351          self._sock.send(fixed_header)
352          self._sock.send(var_header)
353          # [MQTT-3.1.3-4]
354          self._send_str(self.client_id)
355          if self._lw_topic:
356              # [MQTT-3.1.3-11]
357              self._send_str(self._lw_topic)
358              self._send_str(self._lw_msg)
359          if self.user is None:
360              self.user = None
361          else:
362              self._send_str(self.user)
363              self._send_str(self.password)
364          if self.logger is not None:
365              self.logger.debug("Receiving CONNACK packet from broker")
366          while True:
367              op = self._wait_for_msg()
368              if op == 32:
369                  rc = self._sock.recv(3)
370                  assert rc[0] == 0x02
371                  if rc[2] != 0x00:
372                      raise MMQTTException(CONNACK_ERRORS[rc[2]])
373                  self._is_connected = True
374                  result = rc[0] & 1
375                  if self.on_connect is not None:
376                      self.on_connect(self, self.user_data, result, rc[2])
377                  return result
378  
379      def disconnect(self):
380          """Disconnects the MiniMQTT client from the MQTT broker.
381          """
382          self.is_connected()
383          if self.logger is not None:
384              self.logger.debug("Sending DISCONNECT packet to broker")
385          self._sock.send(MQTT_DISCONNECT)
386          if self.logger is not None:
387              self.logger.debug("Closing socket")
388          self._sock.close()
389          self._is_connected = False
390          self._subscribed_topics = None
391          if self.on_disconnect is not None:
392              self.on_disconnect(self, self.user_data, 0)
393  
394      def ping(self):
395          """Pings the MQTT Broker to confirm if the broker is alive or if
396          there is an active network connection.
397          """
398          self.is_connected()
399          if self.logger is not None:
400              self.logger.debug("Sending PINGREQ")
401          self._sock.send(MQTT_PINGREQ)
402          if self.logger is not None:
403              self.logger.debug("Checking PINGRESP")
404          while True:
405              op = self._wait_for_msg(0.5)
406              if op == 208:
407                  ping_resp = self._sock.recv(2)
408                  if ping_resp[0] != 0x00:
409                      raise MMQTTException("PINGRESP not returned from broker.")
410              return
411  
412      # pylint: disable=too-many-branches, too-many-statements
413      def publish(self, topic, msg, retain=False, qos=0):
414          """Publishes a message to a topic provided.
415  
416          :param str topic: Unique topic identifier.
417          :param str,int,float msg: Data to send to the broker.
418          :param bool retain: Whether the message is saved by the broker.
419          :param int qos: Quality of Service level for the message, defaults to
420              zero. Conventional options are ``0`` (send at most once), ``1``
421              (send at least once), or ``2`` (send exactly once).
422  
423              .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library.
424  
425          Example of sending an integer, 3, to the broker on topic 'piVal'.
426  
427          .. code-block:: python
428  
429              mqtt_client.publish('topics/piVal', 3)
430  
431          Example of sending a float, 3.14, to the broker on topic 'piVal'.
432  
433          .. code-block:: python
434  
435              mqtt_client.publish('topics/piVal', 3.14)
436  
437          Example of sending a string, 'threepointonefour', to the broker on topic piVal.
438  
439          .. code-block:: python
440  
441              mqtt_client.publish('topics/piVal', 'threepointonefour')
442          """
443          self.is_connected()
444          self._check_topic(topic)
445          if "+" in topic or "#" in topic:
446              raise MMQTTException("Publish topic can not contain wildcards.")
447          # check msg/qos kwargs
448          if msg is None:
449              raise MMQTTException("Message can not be None.")
450          if isinstance(msg, (int, float)):
451              msg = str(msg).encode("ascii")
452          elif isinstance(msg, str):
453              msg = str(msg).encode("utf-8")
454          else:
455              raise MMQTTException("Invalid message data type.")
456          if len(msg) > MQTT_MSG_MAX_SZ:
457              raise MMQTTException("Message size larger than %d bytes." % MQTT_MSG_MAX_SZ)
458          assert (
459              0 <= qos <= 1
460          ), "Quality of Service Level 2 is unsupported by this library."
461  
462          # fixed header. [3.3.1.2], [3.3.1.3]
463          pub_hdr_fixed = bytearray([0x30 | retain | qos << 1])
464  
465          # variable header = 2-byte Topic length (big endian)
466          pub_hdr_var = bytearray(struct.pack(">H", len(topic)))
467          pub_hdr_var.extend(topic.encode("utf-8"))  # Topic name
468  
469          remaining_length = 2 + len(msg) + len(topic)
470          if qos > 0:
471              # packet identifier where QoS level is 1 or 2. [3.3.2.2]
472              pid = self._pid
473              remaining_length += 2
474              pub_hdr_var.append(0x00)
475              pub_hdr_var.append(self._pid)
476              self._pid += 1
477  
478          # Calculate remaining length [2.2.3]
479          if remaining_length > 0x7F:
480              while remaining_length > 0:
481                  encoded_byte = remaining_length % 0x80
482                  remaining_length = remaining_length // 0x80
483                  if remaining_length > 0:
484                      encoded_byte |= 0x80
485                  pub_hdr_fixed.append(encoded_byte)
486          else:
487              pub_hdr_fixed.append(remaining_length)
488  
489          if self.logger is not None:
490              self.logger.debug(
491                  "Sending PUBLISH\nTopic: {0}\nMsg: {1}\
492                                  \nQoS: {2}\nRetain? {3}".format(
493                      topic, msg, qos, retain
494                  )
495              )
496          self._sock.send(pub_hdr_fixed)
497          self._sock.send(pub_hdr_var)
498          self._sock.send(msg)
499          if qos == 0 and self.on_publish is not None:
500              self.on_publish(self, self.user_data, topic, self._pid)
501          if qos == 1:
502              while True:
503                  op = self._wait_for_msg()
504                  if op == 0x40:
505                      sz = self._sock.recv(1)
506                      assert sz == b"\x02"
507                      rcv_pid = self._sock.recv(2)
508                      rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1]
509                      if pid == rcv_pid:
510                          if self.on_publish is not None:
511                              self.on_publish(self, self.user_data, topic, rcv_pid)
512                          return
513  
514      def subscribe(self, topic, qos=0):
515          """Subscribes to a topic on the MQTT Broker.
516          This method can subscribe to one topics or multiple topics.
517  
518          :param str,tuple,list topic: Unique MQTT topic identifier string. If
519              this is a `tuple`, then the tuple should contain topic identifier
520              string and qos level integer. If this is a `list`, then each list
521              element should be a tuple containing a topic identifier string and
522              qos level integer.
523          :param int qos: Quality of Service level for the topic, defaults to
524              zero. Conventional options are ``0`` (send at most once), ``1``
525              (send at least once), or ``2`` (send exactly once).
526  
527              .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library.
528  
529          Example of subscribing a topic string.
530  
531          .. code-block:: python
532  
533              mqtt_client.subscribe('topics/ledState')
534  
535          Example of subscribing to a topic and setting the qos level to 1.
536  
537          .. code-block:: python
538  
539              mqtt_client.subscribe('topics/ledState', 1)
540  
541          Example of subscribing to topic string and setting qos level to 1, as a tuple.
542  
543          .. code-block:: python
544  
545              mqtt_client.subscribe(('topics/ledState', 1))
546  
547          Example of subscribing to multiple topics with different qos levels.
548  
549          .. code-block:: python
550  
551              mqtt_client.subscribe([('topics/ledState', 1), ('topics/servoAngle', 0)])
552  
553          """
554          self.is_connected()
555          topics = None
556          if isinstance(topic, tuple):
557              topic, qos = topic
558              self._check_topic(topic)
559              self._check_qos(qos)
560          if isinstance(topic, str):
561              self._check_topic(topic)
562              self._check_qos(qos)
563              topics = [(topic, qos)]
564          if isinstance(topic, list):
565              topics = []
566              for t, q in topic:
567                  self._check_qos(q)
568                  self._check_topic(t)
569                  topics.append((t, q))
570          # Assemble packet
571          packet_length = 2 + (2 * len(topics)) + (1 * len(topics))
572          packet_length += sum(len(topic) for topic, qos in topics)
573          packet_length_byte = packet_length.to_bytes(1, "big")
574          self._pid += 1
575          packet_id_bytes = self._pid.to_bytes(2, "big")
576          # Packet with variable and fixed headers
577          packet = MQTT_SUB + packet_length_byte + packet_id_bytes
578          # attaching topic and QOS level to the packet
579          for t, q in topics:
580              topic_size = len(t).to_bytes(2, "big")
581              qos_byte = q.to_bytes(1, "big")
582              packet += topic_size + t + qos_byte
583          if self.logger is not None:
584              for t, q in topics:
585                  self.logger.debug("SUBSCRIBING to topic {0} with QoS {1}".format(t, q))
586          self._sock.send(packet)
587          while True:
588              op = self._wait_for_msg()
589              if op == 0x90:
590                  rc = self._sock.recv(4)
591                  assert rc[1] == packet[2] and rc[2] == packet[3]
592                  if rc[3] == 0x80:
593                      raise MMQTTException("SUBACK Failure!")
594                  for t, q in topics:
595                      if self.on_subscribe is not None:
596                          self.on_subscribe(self, self.user_data, t, q)
597                      self._subscribed_topics.append(t)
598                  return
599  
600      def unsubscribe(self, topic):
601          """Unsubscribes from a MQTT topic.
602  
603          :param str,list topic: Unique MQTT topic identifier string or a list
604              of tuples, where each tuple contains an MQTT topic identier
605              string.
606  
607          Example of unsubscribing from a topic string.
608  
609          .. code-block:: python
610  
611              mqtt_client.unsubscribe('topics/ledState')
612  
613          Example of unsubscribing from multiple topics.
614  
615          .. code-block:: python
616  
617              mqtt_client.unsubscribe([('topics/ledState'), ('topics/servoAngle')])
618  
619          """
620          topics = None
621          if isinstance(topic, str):
622              self._check_topic(topic)
623              topics = [(topic)]
624          if isinstance(topic, list):
625              topics = []
626              for t in topic:
627                  self._check_topic(t)
628                  topics.append((t))
629          for t in topics:
630              if t not in self._subscribed_topics:
631                  raise MMQTTException(
632                      "Topic must be subscribed to before attempting unsubscribe."
633                  )
634          # Assemble packet
635          packet_length = 2 + (2 * len(topics))
636          packet_length += sum(len(topic) for topic in topics)
637          packet_length_byte = packet_length.to_bytes(1, "big")
638          self._pid += 1
639          packet_id_bytes = self._pid.to_bytes(2, "big")
640          packet = MQTT_UNSUB + packet_length_byte + packet_id_bytes
641          for t in topics:
642              topic_size = len(t).to_bytes(2, "big")
643              packet += topic_size + t
644          if self.logger is not None:
645              for t in topics:
646                  self.logger.debug("UNSUBSCRIBING from topic {0}.".format(t))
647          self._sock.send(packet)
648          if self.logger is not None:
649              self.logger.debug("Waiting for UNSUBACK...")
650          while True:
651              op = self._wait_for_msg()
652              if op == 176:
653                  return_code = self._sock.recv(3)
654                  assert return_code[0] == 0x02
655                  # [MQTT-3.32]
656                  assert (
657                      return_code[1] == packet_id_bytes[0]
658                      and return_code[2] == packet_id_bytes[1]
659                  )
660                  for t in topics:
661                      if self.on_unsubscribe is not None:
662                          self.on_unsubscribe(self, self.user_data, t, self._pid)
663                      self._subscribed_topics.remove(t)
664                  return
665  
666      def reconnect(self, resub_topics=True):
667          """Attempts to reconnect to the MQTT broker.
668  
669          :param bool resub_topics: Resubscribe to previously subscribed topics.
670          """
671          if self.logger is not None:
672              self.logger.debug("Attempting to reconnect with MQTT broker")
673          self.connect()
674          if self.logger is not None:
675              self.logger.debug("Reconnected with broker")
676          if resub_topics:
677              if self.logger is not None:
678                  self.logger.debug(
679                      "Attempting to resubscribe to previously subscribed topics."
680                  )
681              subscribed_topics = self._subscribed_topics.copy()
682              self._subscribed_topics = []
683              while subscribed_topics:
684                  feed = subscribed_topics.pop()
685                  self.subscribe(feed)
686  
687      def loop(self):
688          """Non-blocking message loop. Use this method to
689          check incoming subscription messages.
690          """
691          if self._timestamp == 0:
692              self._timestamp = time.monotonic()
693          current_time = time.monotonic()
694          if current_time - self._timestamp >= self.keep_alive:
695              # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
696              if self.logger is not None:
697                  self.logger.debug(
698                      "KeepAlive period elapsed - \
699                                     requesting a PINGRESP from the server..."
700                  )
701              self.ping()
702              self._timestamp = 0
703          self._sock.settimeout(0.1)
704          return self._wait_for_msg()
705  
706      def _wait_for_msg(self, timeout=30):
707          """Reads and processes network events.
708          Returns response code if successful.
709          """
710          res = self._sock.recv(1)
711          self._sock.settimeout(timeout)
712          if res in [None, b""]:
713              return None
714          if res == MQTT_PINGRESP:
715              sz = self._sock.recv(1)[0]
716              assert sz == 0
717              return None
718          if res[0] & 0xF0 != 0x30:
719              return res[0]
720          sz = self._recv_len()
721          topic_len = self._sock.recv(2)
722          topic_len = (topic_len[0] << 8) | topic_len[1]
723          topic = self._sock.recv(topic_len)
724          topic = str(topic, "utf-8")
725          sz -= topic_len + 2
726          if res[0] & 0x06:
727              pid = self._sock.recv(2)
728              pid = pid[0] << 0x08 | pid[1]
729              sz -= 0x02
730          msg = self._sock.recv(sz)
731          self._handle_on_message(self, topic, str(msg, "utf-8"))
732          if res[0] & 0x06 == 0x02:
733              pkt = bytearray(b"\x40\x02\0\0")
734              struct.pack_into("!H", pkt, 2, pid)
735              self._sock.send(pkt)
736          elif res[0] & 6 == 4:
737              assert 0
738          return res[0]
739  
740      def _recv_len(self):
741          n = 0
742          sh = 0
743          while True:
744              b = self._sock.recv(1)[0]
745              n |= (b & 0x7F) << sh
746              if not b & 0x80:
747                  return n
748              sh += 7
749  
750      def _send_str(self, string):
751          """Packs and encodes a string to a socket.
752  
753          :param str string: String to write to the socket.
754          """
755          self._sock.send(struct.pack("!H", len(string)))
756          if isinstance(string, str):
757              self._sock.send(str.encode(string, "utf-8"))
758          else:
759              self._sock.send(string)
760  
761      @staticmethod
762      def _check_topic(topic):
763          """Checks if topic provided is a valid mqtt topic.
764  
765          :param str topic: Topic identifier
766          """
767          if topic is None:
768              raise MMQTTException("Topic may not be NoneType")
769          # [MQTT-4.7.3-1]
770          if not topic:
771              raise MMQTTException("Topic may not be empty.")
772          # [MQTT-4.7.3-3]
773          if len(topic.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT:
774              raise MMQTTException("Topic length is too large.")
775  
776      @staticmethod
777      def _check_qos(qos_level):
778          """Validates the quality of service level.
779  
780          :param int qos_level: Desired QoS level.
781          """
782          if isinstance(qos_level, int):
783              if qos_level < 0 or qos_level > 2:
784                  raise MMQTTException("QoS must be between 1 and 2.")
785          else:
786              raise MMQTTException("QoS must be an integer.")
787  
788      def _set_interface(self):
789          """Sets a desired network hardware interface.
790          The network hardware must be set in init
791          prior to calling this method.
792          """
793          if self._wifi:
794              self._socket.set_interface(self._wifi.esp)
795          else:
796              raise TypeError("Network Manager Required.")
797  
798      def is_connected(self):
799          """Returns MQTT client session status as True if connected, raises
800          a `MMQTTException` if `False`.
801          """
802          if self._sock is None or self._is_connected is False:
803              raise MMQTTException("MiniMQTT is not connected.")
804          return self._is_connected
805  
806      @property
807      def mqtt_msg(self):
808          """Returns maximum MQTT payload and topic size."""
809          return self._msg_size_lim, MQTT_TOPIC_LENGTH_LIMIT
810  
811      @mqtt_msg.setter
812      def mqtt_msg(self, msg_size):
813          """Sets the maximum MQTT message payload size.
814  
815          :param int msg_size: Maximum MQTT payload size.
816          """
817          if msg_size < MQTT_MSG_MAX_SZ:
818              self._msg_size_lim = msg_size
819  
820      ### Logging ###
821      def attach_logger(self, logger_name="log"):
822          """Initializes and attaches a logger to the MQTTClient.
823  
824          :param str logger_name: Name of the logger instance
825          """
826          self.logger = logging.getLogger(logger_name)
827          self.logger.setLevel(logging.INFO)
828  
829      def set_logger_level(self, log_level):
830          """Sets the level of the logger, if defined during init.
831  
832          :param str log_level: Level of logging to output to the REPL.
833              Acceptable options are ``DEBUG``, ``INFO``, ``WARNING``, or
834              ``ERROR``.
835          """
836          if self.logger is None:
837              raise MMQTTException(
838                  "No logger attached - did you create it during initialization?"
839              )
840          if log_level == "DEBUG":
841              self.logger.setLevel(logging.DEBUG)
842          elif log_level == "INFO":
843              self.logger.setLevel(logging.INFO)
844          elif log_level == "WARNING":
845              self.logger.setLevel(logging.WARNING)
846          elif log_level == "ERROR":
847              self.logger.setLevel(logging.CRITICIAL)
848          else:
849              raise MMQTTException("Incorrect logging level provided!")