/ NY_Tower_Light / lambda / paho / mqtt / client.py
client.py
   1  # SPDX-FileCopyrightText: 2014 Roger Light
   2  #
   3  # SPDX-License-Identifier: EPL-1.0
   4  
   5  # Copyright (c) 2012-2014 Roger Light <roger@atchoo.org>
   6  #
   7  # All rights reserved. This program and the accompanying materials
   8  # are made available under the terms of the Eclipse Public License v1.0
   9  # and Eclipse Distribution License v1.0 which accompany this distribution.
  10  #
  11  # The Eclipse Public License is available at
  12  #    http://www.eclipse.org/legal/epl-v10.html
  13  # and the Eclipse Distribution License is available at
  14  #   http://www.eclipse.org/org/documents/edl-v10.php.
  15  #
  16  # Contributors:
  17  #    Roger Light - initial API and implementation
  18  
  19  """
  20  This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging
  21  protocol that is easy to implement and suitable for low powered devices.
  22  """
  23  import errno
  24  import platform
  25  import random
  26  import select
  27  import socket
  28  HAVE_SSL = True
  29  try:
  30      import ssl
  31      cert_reqs = ssl.CERT_REQUIRED
  32      tls_version = ssl.PROTOCOL_TLSv1
  33  except:
  34      HAVE_SSL = False
  35      cert_reqs = None
  36      tls_version = None
  37  import struct
  38  import sys
  39  import threading
  40  import time
  41  HAVE_DNS = True
  42  try:
  43      import dns.resolver
  44  except ImportError:
  45      HAVE_DNS = False
  46  
  47  if platform.system() == 'Windows':
  48      EAGAIN = errno.WSAEWOULDBLOCK
  49  else:
  50      EAGAIN = errno.EAGAIN
  51  
  52  VERSION_MAJOR=1
  53  VERSION_MINOR=0
  54  VERSION_REVISION=0
  55  VERSION_NUMBER=(VERSION_MAJOR*1000000+VERSION_MINOR*1000+VERSION_REVISION)
  56  
  57  MQTTv31 = 3
  58  MQTTv311 = 4
  59  
  60  if sys.version_info[0] < 3:
  61      PROTOCOL_NAMEv31 = "MQIsdp"
  62      PROTOCOL_NAMEv311 = "MQTT"
  63  else:
  64      PROTOCOL_NAMEv31 = b"MQIsdp"
  65      PROTOCOL_NAMEv311 = b"MQTT"
  66  
  67  PROTOCOL_VERSION = 3
  68  
  69  # Message types
  70  CONNECT = 0x10
  71  CONNACK = 0x20
  72  PUBLISH = 0x30
  73  PUBACK = 0x40
  74  PUBREC = 0x50
  75  PUBREL = 0x60
  76  PUBCOMP = 0x70
  77  SUBSCRIBE = 0x80
  78  SUBACK = 0x90
  79  UNSUBSCRIBE = 0xA0
  80  UNSUBACK = 0xB0
  81  PINGREQ = 0xC0
  82  PINGRESP = 0xD0
  83  DISCONNECT = 0xE0
  84  
  85  # Log levels
  86  MQTT_LOG_INFO = 0x01
  87  MQTT_LOG_NOTICE = 0x02
  88  MQTT_LOG_WARNING = 0x04
  89  MQTT_LOG_ERR = 0x08
  90  MQTT_LOG_DEBUG = 0x10
  91  
  92  # CONNACK codes
  93  CONNACK_ACCEPTED = 0
  94  CONNACK_REFUSED_PROTOCOL_VERSION = 1
  95  CONNACK_REFUSED_IDENTIFIER_REJECTED = 2
  96  CONNACK_REFUSED_SERVER_UNAVAILABLE = 3
  97  CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
  98  CONNACK_REFUSED_NOT_AUTHORIZED = 5
  99  
 100  # Connection state
 101  mqtt_cs_new = 0
 102  mqtt_cs_connected = 1
 103  mqtt_cs_disconnecting = 2
 104  mqtt_cs_connect_async = 3
 105  
 106  # Message state
 107  mqtt_ms_invalid = 0
 108  mqtt_ms_publish= 1
 109  mqtt_ms_wait_for_puback = 2
 110  mqtt_ms_wait_for_pubrec = 3
 111  mqtt_ms_resend_pubrel = 4
 112  mqtt_ms_wait_for_pubrel = 5
 113  mqtt_ms_resend_pubcomp = 6
 114  mqtt_ms_wait_for_pubcomp = 7
 115  mqtt_ms_send_pubrec = 8
 116  mqtt_ms_queued = 9
 117  
 118  # Error values
 119  MQTT_ERR_AGAIN = -1
 120  MQTT_ERR_SUCCESS = 0
 121  MQTT_ERR_NOMEM = 1
 122  MQTT_ERR_PROTOCOL = 2
 123  MQTT_ERR_INVAL = 3
 124  MQTT_ERR_NO_CONN = 4
 125  MQTT_ERR_CONN_REFUSED = 5
 126  MQTT_ERR_NOT_FOUND = 6
 127  MQTT_ERR_CONN_LOST = 7
 128  MQTT_ERR_TLS = 8
 129  MQTT_ERR_PAYLOAD_SIZE = 9
 130  MQTT_ERR_NOT_SUPPORTED = 10
 131  MQTT_ERR_AUTH = 11
 132  MQTT_ERR_ACL_DENIED = 12
 133  MQTT_ERR_UNKNOWN = 13
 134  MQTT_ERR_ERRNO = 14
 135  
 136  if sys.version_info[0] < 3:
 137      sockpair_data = "0"
 138  else:
 139      sockpair_data = b"0"
 140  
 141  def error_string(mqtt_errno):
 142      """Return the error string associated with an mqtt error number."""
 143      if mqtt_errno == MQTT_ERR_SUCCESS:
 144          return "No error."
 145      elif mqtt_errno == MQTT_ERR_NOMEM:
 146          return "Out of memory."
 147      elif mqtt_errno == MQTT_ERR_PROTOCOL:
 148          return "A network protocol error occurred when communicating with the broker."
 149      elif mqtt_errno == MQTT_ERR_INVAL:
 150          return "Invalid function arguments provided."
 151      elif mqtt_errno == MQTT_ERR_NO_CONN:
 152          return "The client is not currently connected."
 153      elif mqtt_errno == MQTT_ERR_CONN_REFUSED:
 154          return "The connection was refused."
 155      elif mqtt_errno == MQTT_ERR_NOT_FOUND:
 156          return "Message not found (internal error)."
 157      elif mqtt_errno == MQTT_ERR_CONN_LOST:
 158          return "The connection was lost."
 159      elif mqtt_errno == MQTT_ERR_TLS:
 160          return "A TLS error occurred."
 161      elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE:
 162          return "Payload too large."
 163      elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED:
 164          return "This feature is not supported."
 165      elif mqtt_errno == MQTT_ERR_AUTH:
 166          return "Authorisation failed."
 167      elif mqtt_errno == MQTT_ERR_ACL_DENIED:
 168          return "Access denied by ACL."
 169      elif mqtt_errno == MQTT_ERR_UNKNOWN:
 170          return "Unknown error."
 171      elif mqtt_errno == MQTT_ERR_ERRNO:
 172          return "Error defined by errno."
 173      else:
 174          return "Unknown error."
 175  
 176  
 177  def connack_string(connack_code):
 178      """Return the string associated with a CONNACK result."""
 179      if connack_code == 0:
 180          return "Connection Accepted."
 181      elif connack_code == 1:
 182          return "Connection Refused: unacceptable protocol version."
 183      elif connack_code == 2:
 184          return "Connection Refused: identifier rejected."
 185      elif connack_code == 3:
 186          return "Connection Refused: broker unavailable."
 187      elif connack_code == 4:
 188          return "Connection Refused: bad user name or password."
 189      elif connack_code == 5:
 190          return "Connection Refused: not authorised."
 191      else:
 192          return "Connection Refused: unknown reason."
 193  
 194  
 195  def topic_matches_sub(sub, topic):
 196      """Check whether a topic matches a subscription.
 197      For example:
 198      foo/bar would match the subscription foo/# or +/bar
 199      non/matching would not match the subscription non/+/+
 200      """
 201      result = True
 202      multilevel_wildcard = False
 203  
 204      slen = len(sub)
 205      tlen = len(topic)
 206  
 207      if slen > 0 and tlen > 0:
 208          if (sub[0] == '$' and topic[0] != '$') or (topic[0] == '$' and sub[0] != '$'):
 209              return False
 210  
 211      spos = 0
 212      tpos = 0
 213  
 214      while spos < slen and tpos < tlen:
 215          if sub[spos] == topic[tpos]:
 216              if tpos == tlen-1:
 217                  # Check for e.g. foo matching foo/#
 218                  if spos == slen-3 and sub[spos+1] == '/' and sub[spos+2] == '#':
 219                      result = True
 220                      multilevel_wildcard = True
 221                      break
 222  
 223              spos += 1
 224              tpos += 1
 225  
 226              if tpos == tlen and spos == slen-1 and sub[spos] == '+':
 227                  spos += 1
 228                  result = True
 229                  break
 230          else:
 231              if sub[spos] == '+':
 232                  spos += 1
 233                  while tpos < tlen and topic[tpos] != '/':
 234                      tpos += 1
 235                  if tpos == tlen and spos == slen:
 236                      result = True
 237                      break
 238  
 239              elif sub[spos] == '#':
 240                  multilevel_wildcard = True
 241                  if spos+1 != slen:
 242                      result = False
 243                      break
 244                  else:
 245                      result = True
 246                      break
 247  
 248              else:
 249                  result = False
 250                  break
 251  
 252      if not multilevel_wildcard and (tpos < tlen or spos < slen):
 253          result = False
 254  
 255      return result
 256  
 257  
 258  def _socketpair_compat():
 259      """TCP/IP socketpair including Windows support"""
 260      listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
 261      listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 262      listensock.bind(("127.0.0.1", 0))
 263      listensock.listen(1)
 264  
 265      iface, port = listensock.getsockname()
 266      sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
 267      sock1.setblocking(0)
 268      try:
 269          sock1.connect(("localhost", port))
 270      except socket.error as err:
 271          if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN:
 272              raise
 273      sock2, address = listensock.accept()
 274      sock2.setblocking(0)
 275      listensock.close()
 276      return (sock1, sock2)
 277  
 278  
 279  class MQTTMessage:
 280      """ This is a class that describes an incoming message. It is passed to the
 281      on_message callback as the message parameter.
 282      Members:
 283      topic : String. topic that the message was published on.
 284      payload : String/bytes the message payload.
 285      qos : Integer. The message Quality of Service 0, 1 or 2.
 286      retain : Boolean. If true, the message is a retained message and not fresh.
 287      mid : Integer. The message id.
 288      """
 289      def __init__(self):
 290          self.timestamp = 0
 291          self.state = mqtt_ms_invalid
 292          self.dup = False
 293          self.mid = 0
 294          self.topic = ""
 295          self.payload = None
 296          self.qos = 0
 297          self.retain = False
 298  
 299  
 300  class Client(object):
 301      """MQTT version 3.1/3.1.1 client class.
 302      This is the main class for use communicating with an MQTT broker.
 303      General usage flow:
 304      * Use connect()/connect_async() to connect to a broker
 305      * Call loop() frequently to maintain network traffic flow with the broker
 306      * Or use loop_start() to set a thread running to call loop() for you.
 307      * Or use loop_forever() to handle calling loop() for you in a blocking
 308      * function.
 309      * Use subscribe() to subscribe to a topic and receive messages
 310      * Use publish() to send messages
 311      * Use disconnect() to disconnect from the broker
 312      Data returned from the broker is made available with the use of callback
 313      functions as described below.
 314      Callbacks
 315      =========
 316      A number of callback functions are available to receive data back from the
 317      broker. To use a callback, define a function and then assign it to the
 318      client:
 319      def on_connect(client, userdata, flags, rc):
 320          print("Connection returned " + str(rc))
 321      client.on_connect = on_connect
 322      All of the callbacks as described below have a "client" and an "userdata"
 323      argument. "client" is the Client instance that is calling the callback.
 324      "userdata" is user data of any type and can be set when creating a new client
 325      instance or with user_data_set(userdata).
 326      The callbacks:
 327      on_connect(client, userdata, flags, rc): called when the broker responds to our connection
 328        request.
 329        flags is a dict that contains response flags from the broker:
 330          flags['session present'] - this flag is useful for clients that are
 331              using clean session set to 0 only. If a client with clean
 332              session=0, that reconnects to a broker that it has previously
 333              connected to, this flag indicates whether the broker still has the
 334              session information for the client. If 1, the session still exists.
 335        The value of rc determines success or not:
 336          0: Connection successful
 337          1: Connection refused - incorrect protocol version
 338          2: Connection refused - invalid client identifier
 339          3: Connection refused - server unavailable
 340          4: Connection refused - bad username or password
 341          5: Connection refused - not authorised
 342          6-255: Currently unused.
 343      on_disconnect(client, userdata, rc): called when the client disconnects from the broker.
 344        The rc parameter indicates the disconnection state. If MQTT_ERR_SUCCESS
 345        (0), the callback was called in response to a disconnect() call. If any
 346        other value the disconnection was unexpected, such as might be caused by
 347        a network error.
 348      on_message(client, userdata, message): called when a message has been received on a
 349        topic that the client subscribes to. The message variable is a
 350        MQTTMessage that describes all of the message parameters.
 351      on_publish(client, userdata, mid): called when a message that was to be sent using the
 352        publish() call has completed transmission to the broker. For messages
 353        with QoS levels 1 and 2, this means that the appropriate handshakes have
 354        completed. For QoS 0, this simply means that the message has left the
 355        client. The mid variable matches the mid variable returned from the
 356        corresponding publish() call, to allow outgoing messages to be tracked.
 357        This callback is important because even if the publish() call returns
 358        success, it does not always mean that the message has been sent.
 359      on_subscribe(client, userdata, mid, granted_qos): called when the broker responds to a
 360        subscribe request. The mid variable matches the mid variable returned
 361        from the corresponding subscribe() call. The granted_qos variable is a
 362        list of integers that give the QoS level the broker has granted for each
 363        of the different subscription requests.
 364      on_unsubscribe(client, userdata, mid): called when the broker responds to an unsubscribe
 365        request. The mid variable matches the mid variable returned from the
 366        corresponding unsubscribe() call.
 367      on_log(client, userdata, level, buf): called when the client has log information. Define
 368        to allow debugging. The level variable gives the severity of the message
 369        and will be one of MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING,
 370        MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf.
 371      """
 372      def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv31):
 373          """client_id is the unique client id string used when connecting to the
 374          broker. If client_id is zero length or None, then one will be randomly
 375          generated. In this case, clean_session must be True. If this is not the
 376          case a ValueError will be raised.
 377          clean_session is a boolean that determines the client type. If True,
 378          the broker will remove all information about this client when it
 379          disconnects. If False, the client is a persistent client and
 380          subscription information and queued messages will be retained when the
 381          client disconnects.
 382          Note that a client will never discard its own outgoing messages on
 383          disconnect. Calling connect() or reconnect() will cause the messages to
 384          be resent.  Use reinitialise() to reset a client to its original state.
 385          userdata is user defined data of any type that is passed as the "userdata"
 386          parameter to callbacks. It may be updated at a later point with the
 387          user_data_set() function.
 388          The protocol argument allows explicit setting of the MQTT version to
 389          use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1) or
 390          paho.mqtt.client.MQTTv31 (v3.1), with the default being v3.1. If the
 391          broker reports that the client connected with an invalid protocol
 392          version, the client will automatically attempt to reconnect using v3.1
 393          instead.
 394          """
 395          if not clean_session and (client_id == "" or client_id is None):
 396              raise ValueError('A client id must be provided if clean session is False.')
 397  
 398          self._protocol = protocol
 399          self._userdata = userdata
 400          self._sock = None
 401          self._sockpairR, self._sockpairW = _socketpair_compat()
 402          self._keepalive = 60
 403          self._message_retry = 20
 404          self._last_retry_check = 0
 405          self._clean_session = clean_session
 406          if client_id == "" or client_id is None:
 407              self._client_id = "paho/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5))
 408          else:
 409              self._client_id = client_id
 410  
 411          self._username = ""
 412          self._password = ""
 413          self._in_packet = {
 414              "command": 0,
 415              "have_remaining": 0,
 416              "remaining_count": [],
 417              "remaining_mult": 1,
 418              "remaining_length": 0,
 419              "packet": b"",
 420              "to_process": 0,
 421              "pos": 0}
 422          self._out_packet = []
 423          self._current_out_packet = None
 424          self._last_msg_in = time.time()
 425          self._last_msg_out = time.time()
 426          self._ping_t = 0
 427          self._last_mid = 0
 428          self._state = mqtt_cs_new
 429          self._out_messages = []
 430          self._in_messages = []
 431          self._max_inflight_messages = 20
 432          self._inflight_messages = 0
 433          self._will = False
 434          self._will_topic = ""
 435          self._will_payload = None
 436          self._will_qos = 0
 437          self._will_retain = False
 438          self.on_disconnect = None
 439          self.on_connect = None
 440          self.on_publish = None
 441          self.on_message = None
 442          self.on_message_filtered = []
 443          self.on_subscribe = None
 444          self.on_unsubscribe = None
 445          self.on_log = None
 446          self._host = ""
 447          self._port = 1883
 448          self._bind_address = ""
 449          self._in_callback = False
 450          self._strict_protocol = False
 451          self._callback_mutex = threading.Lock()
 452          self._state_mutex = threading.Lock()
 453          self._out_packet_mutex = threading.Lock()
 454          self._current_out_packet_mutex = threading.Lock()
 455          self._msgtime_mutex = threading.Lock()
 456          self._out_message_mutex = threading.Lock()
 457          self._in_message_mutex = threading.Lock()
 458          self._thread = None
 459          self._thread_terminate = False
 460          self._ssl = None
 461          self._tls_certfile = None
 462          self._tls_keyfile = None
 463          self._tls_ca_certs = None
 464          self._tls_cert_reqs = None
 465          self._tls_ciphers = None
 466          self._tls_version = tls_version
 467          self._tls_insecure = False
 468  
 469      def __del__(self):
 470          pass
 471  
 472      def reinitialise(self, client_id="", clean_session=True, userdata=None):
 473          if self._ssl:
 474              self._ssl.close()
 475              self._ssl = None
 476              self._sock = None
 477          elif self._sock:
 478              self._sock.close()
 479              self._sock = None
 480          if self._sockpairR:
 481              self._sockpairR.close()
 482              self._sockpairR = None
 483          if self._sockpairW:
 484              self._sockpairW.close()
 485              self._sockpairW = None
 486  
 487          self.__init__(client_id, clean_session, userdata)
 488  
 489      def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=cert_reqs, tls_version=tls_version, ciphers=None):
 490          """Configure network encryption and authentication options. Enables SSL/TLS support.
 491          ca_certs : a string path to the Certificate Authority certificate files
 492          that are to be treated as trusted by this client. If this is the only
 493          option given then the client will operate in a similar manner to a web
 494          browser. That is to say it will require the broker to have a
 495          certificate signed by the Certificate Authorities in ca_certs and will
 496          communicate using TLS v1, but will not attempt any form of
 497          authentication. This provides basic network encryption but may not be
 498          sufficient depending on how the broker is configured.
 499          certfile and keyfile are strings pointing to the PEM encoded client
 500          certificate and private keys respectively. If these arguments are not
 501          None then they will be used as client information for TLS based
 502          authentication.  Support for this feature is broker dependent. Note
 503          that if either of these files in encrypted and needs a password to
 504          decrypt it, Python will ask for the password at the command line. It is
 505          not currently possible to define a callback to provide the password.
 506          cert_reqs allows the certificate requirements that the client imposes
 507          on the broker to be changed. By default this is ssl.CERT_REQUIRED,
 508          which means that the broker must provide a certificate. See the ssl
 509          pydoc for more information on this parameter.
 510          tls_version allows the version of the SSL/TLS protocol used to be
 511          specified. By default TLS v1 is used. Previous versions (all versions
 512          beginning with SSL) are possible but not recommended due to possible
 513          security problems.
 514          ciphers is a string specifying which encryption ciphers are allowable
 515          for this connection, or None to use the defaults. See the ssl pydoc for
 516          more information.
 517          Must be called before connect() or connect_async()."""
 518          if HAVE_SSL is False:
 519              raise ValueError('This platform has no SSL/TLS.')
 520  
 521          if sys.version < '2.7':
 522              raise ValueError('Python 2.7 is the minimum supported version for TLS.')
 523  
 524          if ca_certs is None:
 525              raise ValueError('ca_certs must not be None.')
 526  
 527          try:
 528              f = open(ca_certs, "r")
 529          except IOError as err:
 530              raise IOError(ca_certs+": "+err.strerror)
 531          else:
 532              f.close()
 533          if certfile is not None:
 534              try:
 535                  f = open(certfile, "r")
 536              except IOError as err:
 537                  raise IOError(certfile+": "+err.strerror)
 538              else:
 539                  f.close()
 540          if keyfile is not None:
 541              try:
 542                  f = open(keyfile, "r")
 543              except IOError as err:
 544                  raise IOError(keyfile+": "+err.strerror)
 545              else:
 546                  f.close()
 547  
 548          self._tls_ca_certs = ca_certs
 549          self._tls_certfile = certfile
 550          self._tls_keyfile = keyfile
 551          self._tls_cert_reqs = cert_reqs
 552          self._tls_version = tls_version
 553          self._tls_ciphers = ciphers
 554  
 555      def tls_insecure_set(self, value):
 556          """Configure verification of the server hostname in the server certificate.
 557          If value is set to true, it is impossible to guarantee that the host
 558          you are connecting to is not impersonating your server. This can be
 559          useful in initial server testing, but makes it possible for a malicious
 560          third party to impersonate your server through DNS spoofing, for
 561          example.
 562          Do not use this function in a real system. Setting value to true means
 563          there is no point using encryption.
 564          Must be called before connect()."""
 565          if HAVE_SSL is False:
 566              raise ValueError('This platform has no SSL/TLS.')
 567  
 568          self._tls_insecure = value
 569  
 570      def connect(self, host, port=1883, keepalive=60, bind_address=""):
 571          """Connect to a remote broker.
 572          host is the hostname or IP address of the remote broker.
 573          port is the network port of the server host to connect to. Defaults to
 574          1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
 575          are using tls_set() the port may need providing.
 576          keepalive: Maximum period in seconds between communications with the
 577          broker. If no other messages are being exchanged, this controls the
 578          rate at which the client will send ping messages to the broker.
 579          """
 580          self.connect_async(host, port, keepalive, bind_address)
 581          return self.reconnect()
 582  
 583      def connect_srv(self, domain=None, keepalive=60, bind_address=""):
 584          """Connect to a remote broker.
 585          domain is the DNS domain to search for SRV records; if None,
 586          try to determine local domain name.
 587          keepalive and bind_address are as for connect()
 588          """
 589  
 590          if HAVE_DNS is False:
 591              raise ValueError('No DNS resolver library found.')
 592  
 593          if domain is None:
 594              domain = socket.getfqdn()
 595              domain = domain[domain.find('.') + 1:]
 596  
 597          try:
 598              rr = '_mqtt._tcp.%s' % domain
 599              if self._ssl is not None:
 600                  # IANA specifies secure-mqtt (not mqtts) for port 8883
 601                  rr = '_secure-mqtt._tcp.%s' % domain
 602              answers = []
 603              for answer in dns.resolver.query(rr, dns.rdatatype.SRV):
 604                  addr = answer.target.to_text()[:-1]
 605                  answers.append((addr, answer.port, answer.priority, answer.weight))
 606          except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers):
 607              raise ValueError("No answer/NXDOMAIN for SRV in %s" % (domain))
 608  
 609          # FIXME: doesn't account for weight
 610          for answer in answers:
 611              host, port, prio, weight = answer
 612  
 613              try:
 614                  return self.connect(host, port, keepalive, bind_address)
 615              except:
 616                  pass
 617  
 618          raise ValueError("No SRV hosts responded")
 619  
 620      def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
 621          """Connect to a remote broker asynchronously. This is a non-blocking
 622          connect call that can be used with loop_start() to provide very quick
 623          start.
 624          host is the hostname or IP address of the remote broker.
 625          port is the network port of the server host to connect to. Defaults to
 626          1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
 627          are using tls_set() the port may need providing.
 628          keepalive: Maximum period in seconds between communications with the
 629          broker. If no other messages are being exchanged, this controls the
 630          rate at which the client will send ping messages to the broker.
 631          """
 632          if host is None or len(host) == 0:
 633              raise ValueError('Invalid host.')
 634          if port <= 0:
 635              raise ValueError('Invalid port number.')
 636          if keepalive < 0:
 637              raise ValueError('Keepalive must be >=0.')
 638          if bind_address != "" and bind_address is not None:
 639              if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
 640                  raise ValueError('bind_address requires Python 2.7 or 3.2.')
 641  
 642          self._host = host
 643          self._port = port
 644          self._keepalive = keepalive
 645          self._bind_address = bind_address
 646  
 647          self._state_mutex.acquire()
 648          self._state = mqtt_cs_connect_async
 649          self._state_mutex.release()
 650  
 651      def reconnect(self):
 652          """Reconnect the client after a disconnect. Can only be called after
 653          connect()/connect_async()."""
 654          if len(self._host) == 0:
 655              raise ValueError('Invalid host.')
 656          if self._port <= 0:
 657              raise ValueError('Invalid port number.')
 658  
 659          self._in_packet = {
 660              "command": 0,
 661              "have_remaining": 0,
 662              "remaining_count": [],
 663              "remaining_mult": 1,
 664              "remaining_length": 0,
 665              "packet": b"",
 666              "to_process": 0,
 667              "pos": 0}
 668  
 669          self._out_packet_mutex.acquire()
 670          self._out_packet = []
 671          self._out_packet_mutex.release()
 672  
 673          self._current_out_packet_mutex.acquire()
 674          self._current_out_packet = None
 675          self._current_out_packet_mutex.release()
 676  
 677          self._msgtime_mutex.acquire()
 678          self._last_msg_in = time.time()
 679          self._last_msg_out = time.time()
 680          self._msgtime_mutex.release()
 681  
 682          self._ping_t = 0
 683          self._state_mutex.acquire()
 684          self._state = mqtt_cs_new
 685          self._state_mutex.release()
 686          if self._ssl:
 687              self._ssl.close()
 688              self._ssl = None
 689              self._sock = None
 690          elif self._sock:
 691              self._sock.close()
 692              self._sock = None
 693  
 694          # Put messages in progress in a valid state.
 695          self._messages_reconnect_reset()
 696  
 697          try:
 698              if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
 699                  sock = socket.create_connection((self._host, self._port))
 700              else:
 701                  sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0))
 702          except socket.error as err:
 703              if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN:
 704                  raise
 705  
 706          if self._tls_ca_certs is not None:
 707              self._ssl = ssl.wrap_socket(
 708                  sock,
 709                  certfile=self._tls_certfile,
 710                  keyfile=self._tls_keyfile,
 711                  ca_certs=self._tls_ca_certs,
 712                  cert_reqs=self._tls_cert_reqs,
 713                  ssl_version=self._tls_version,
 714                  ciphers=self._tls_ciphers)
 715  
 716              if self._tls_insecure is False:
 717                  if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
 718                      self._tls_match_hostname()
 719                  else:
 720                      ssl.match_hostname(self._ssl.getpeercert(), self._host)
 721  
 722          self._sock = sock
 723          self._sock.setblocking(0)
 724  
 725          return self._send_connect(self._keepalive, self._clean_session)
 726  
 727      def loop(self, timeout=1.0, max_packets=1):
 728          """Process network events.
 729          This function must be called regularly to ensure communication with the
 730          broker is carried out. It calls select() on the network socket to wait
 731          for network events. If incoming data is present it will then be
 732          processed. Outgoing commands, from e.g. publish(), are normally sent
 733          immediately that their function is called, but this is not always
 734          possible. loop() will also attempt to send any remaining outgoing
 735          messages, which also includes commands that are part of the flow for
 736          messages with QoS>0.
 737          timeout: The time in seconds to wait for incoming/outgoing network
 738            traffic before timing out and returning.
 739          max_packets: Not currently used.
 740          Returns MQTT_ERR_SUCCESS on success.
 741          Returns >0 on error.
 742          A ValueError will be raised if timeout < 0"""
 743          if timeout < 0.0:
 744              raise ValueError('Invalid timeout.')
 745  
 746          self._current_out_packet_mutex.acquire()
 747          self._out_packet_mutex.acquire()
 748          if self._current_out_packet is None and len(self._out_packet) > 0:
 749              self._current_out_packet = self._out_packet.pop(0)
 750  
 751          if self._current_out_packet:
 752              wlist = [self.socket()]
 753          else:
 754              wlist = []
 755          self._out_packet_mutex.release()
 756          self._current_out_packet_mutex.release()
 757  
 758          # sockpairR is used to break out of select() before the timeout, on a
 759          # call to publish() etc.
 760          rlist = [self.socket(), self._sockpairR]
 761          try:
 762              socklist = select.select(rlist, wlist, [], timeout)
 763          except TypeError:
 764              # Socket isn't correct type, in likelihood connection is lost
 765              return MQTT_ERR_CONN_LOST
 766          except ValueError:
 767              # Can occur if we just reconnected but rlist/wlist contain a -1 for
 768              # some reason.
 769              return MQTT_ERR_CONN_LOST
 770          except:
 771              return MQTT_ERR_UNKNOWN
 772  
 773          if self.socket() in socklist[0]:
 774              rc = self.loop_read(max_packets)
 775              if rc or (self._ssl is None and self._sock is None):
 776                  return rc
 777  
 778          if self._sockpairR in socklist[0]:
 779              # Stimulate output write even though we didn't ask for it, because
 780              # at that point the publish or other command wasn't present.
 781              socklist[1].insert(0, self.socket())
 782              # Clear sockpairR - only ever a single byte written.
 783              try:
 784                  self._sockpairR.recv(1)
 785              except socket.error as err:
 786                  if err.errno != EAGAIN:
 787                      raise
 788  
 789          if self.socket() in socklist[1]:
 790              rc = self.loop_write(max_packets)
 791              if rc or (self._ssl is None and self._sock is None):
 792                  return rc
 793  
 794          return self.loop_misc()
 795  
 796      def publish(self, topic, payload=None, qos=0, retain=False):
 797          """Publish a message on a topic.
 798          This causes a message to be sent to the broker and subsequently from
 799          the broker to any clients subscribing to matching topics.
 800          topic: The topic that the message should be published on.
 801          payload: The actual message to send. If not given, or set to None a
 802          zero length message will be used. Passing an int or float will result
 803          in the payload being converted to a string representing that number. If
 804          you wish to send a true int/float, use struct.pack() to create the
 805          payload you require.
 806          qos: The quality of service level to use.
 807          retain: If set to true, the message will be set as the "last known
 808          good"/retained message for the topic.
 809          Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to
 810          indicate success or MQTT_ERR_NO_CONN if the client is not currently
 811          connected.  mid is the message ID for the publish request. The mid
 812          value can be used to track the publish request by checking against the
 813          mid argument in the on_publish() callback if it is defined.
 814          A ValueError will be raised if topic is None, has zero length or is
 815          invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if
 816          the length of the payload is greater than 268435455 bytes."""
 817          if topic is None or len(topic) == 0:
 818              raise ValueError('Invalid topic.')
 819          if qos<0 or qos>2:
 820              raise ValueError('Invalid QoS level.')
 821          if isinstance(payload, str) or isinstance(payload, bytearray):
 822              local_payload = payload
 823          elif sys.version_info[0] < 3 and isinstance(payload, unicode):
 824              local_payload = payload
 825          elif isinstance(payload, int) or isinstance(payload, float):
 826              local_payload = str(payload)
 827          elif payload is None:
 828              local_payload = None
 829          else:
 830              raise TypeError('payload must be a string, bytearray, int, float or None.')
 831  
 832          if local_payload is not None and len(local_payload) > 268435455:
 833              raise ValueError('Payload too large.')
 834  
 835          if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS:
 836              raise ValueError('Publish topic cannot contain wildcards.')
 837  
 838          local_mid = self._mid_generate()
 839  
 840          if qos == 0:
 841              rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False)
 842              return (rc, local_mid)
 843          else:
 844              message = MQTTMessage()
 845              message.timestamp = time.time()
 846  
 847              message.mid = local_mid
 848              message.topic = topic
 849              if local_payload is None or len(local_payload) == 0:
 850                  message.payload = None
 851              else:
 852                  message.payload = local_payload
 853  
 854              message.qos = qos
 855              message.retain = retain
 856              message.dup = False
 857  
 858              self._out_message_mutex.acquire()
 859              self._out_messages.append(message)
 860              if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
 861                  self._inflight_messages = self._inflight_messages+1
 862                  if qos == 1:
 863                      message.state = mqtt_ms_wait_for_puback
 864                  elif qos == 2:
 865                      message.state = mqtt_ms_wait_for_pubrec
 866                  self._out_message_mutex.release()
 867  
 868                  rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
 869  
 870                  # remove from inflight messages so it will be send after a connection is made
 871                  if rc is MQTT_ERR_NO_CONN:
 872                      with self._out_message_mutex:
 873                          self._inflight_messages -= 1
 874                          message.state = mqtt_ms_publish
 875  
 876                  return (rc, local_mid)
 877              else:
 878                  message.state = mqtt_ms_queued;
 879                  self._out_message_mutex.release()
 880                  return (MQTT_ERR_SUCCESS, local_mid)
 881  
 882      def username_pw_set(self, username, password=None):
 883          """Set a username and optionally a password for broker authentication.
 884          Must be called before connect() to have any effect.
 885          Requires a broker that supports MQTT v3.1.
 886          username: The username to authenticate with. Need have no relationship to the client id.
 887          password: The password to authenticate with. Optional, set to None if not required.
 888          """
 889          self._username = username.encode('utf-8')
 890          self._password = password
 891  
 892      def disconnect(self):
 893          """Disconnect a connected client from the broker."""
 894          self._state_mutex.acquire()
 895          self._state = mqtt_cs_disconnecting
 896          self._state_mutex.release()
 897  
 898          if self._sock is None and self._ssl is None:
 899              return MQTT_ERR_NO_CONN
 900  
 901          return self._send_disconnect()
 902  
 903      def subscribe(self, topic, qos=0):
 904          """Subscribe the client to one or more topics.
 905          This function may be called in three different ways:
 906          Simple string and integer
 907          -------------------------
 908          e.g. subscribe("my/topic", 2)
 909          topic: A string specifying the subscription topic to subscribe to.
 910          qos: The desired quality of service level for the subscription.
 911               Defaults to 0.
 912          String and integer tuple
 913          ------------------------
 914          e.g. subscribe(("my/topic", 1))
 915          topic: A tuple of (topic, qos). Both topic and qos must be present in
 916                 the tuple.
 917          qos: Not used.
 918          List of string and integer tuples
 919          ------------------------
 920          e.g. subscribe([("my/topic", 0), ("another/topic", 2)])
 921          This allows multiple topic subscriptions in a single SUBSCRIPTION
 922          command, which is more efficient than using multiple calls to
 923          subscribe().
 924          topic: A list of tuple of format (topic, qos). Both topic and qos must
 925                 be present in all of the tuples.
 926          qos: Not used.
 927          The function returns a tuple (result, mid), where result is
 928          MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the
 929          client is not currently connected.  mid is the message ID for the
 930          subscribe request. The mid value can be used to track the subscribe
 931          request by checking against the mid argument in the on_subscribe()
 932          callback if it is defined.
 933          Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
 934          zero string length, or if topic is not a string, tuple or list.
 935          """
 936          topic_qos_list = None
 937          if isinstance(topic, str):
 938              if qos<0 or qos>2:
 939                  raise ValueError('Invalid QoS level.')
 940              if topic is None or len(topic) == 0:
 941                  raise ValueError('Invalid topic.')
 942              topic_qos_list = [(topic.encode('utf-8'), qos)]
 943          elif isinstance(topic, tuple):
 944              if topic[1]<0 or topic[1]>2:
 945                  raise ValueError('Invalid QoS level.')
 946              if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str):
 947                  raise ValueError('Invalid topic.')
 948              topic_qos_list = [(topic[0].encode('utf-8'), topic[1])]
 949          elif isinstance(topic, list):
 950              topic_qos_list = []
 951              for t in topic:
 952                  if t[1]<0 or t[1]>2:
 953                      raise ValueError('Invalid QoS level.')
 954                  if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str):
 955                      raise ValueError('Invalid topic.')
 956                  topic_qos_list.append((t[0].encode('utf-8'), t[1]))
 957  
 958          if topic_qos_list is None:
 959              raise ValueError("No topic specified, or incorrect topic type.")
 960  
 961          if self._sock is None and self._ssl is None:
 962              return (MQTT_ERR_NO_CONN, None)
 963  
 964          return self._send_subscribe(False, topic_qos_list)
 965  
 966      def unsubscribe(self, topic):
 967          """Unsubscribe the client from one or more topics.
 968          topic: A single string, or list of strings that are the subscription
 969                 topics to unsubscribe from.
 970          Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS
 971          to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not
 972          currently connected.
 973          mid is the message ID for the unsubscribe request. The mid value can be
 974          used to track the unsubscribe request by checking against the mid
 975          argument in the on_unsubscribe() callback if it is defined.
 976          Raises a ValueError if topic is None or has zero string length, or is
 977          not a string or list.
 978          """
 979          topic_list = None
 980          if topic is None:
 981              raise ValueError('Invalid topic.')
 982          if isinstance(topic, str):
 983              if len(topic) == 0:
 984                  raise ValueError('Invalid topic.')
 985              topic_list = [topic.encode('utf-8')]
 986          elif isinstance(topic, list):
 987              topic_list = []
 988              for t in topic:
 989                  if len(t) == 0 or not isinstance(t, str):
 990                      raise ValueError('Invalid topic.')
 991                  topic_list.append(t.encode('utf-8'))
 992  
 993          if topic_list is None:
 994              raise ValueError("No topic specified, or incorrect topic type.")
 995  
 996          if self._sock is None and self._ssl is None:
 997              return (MQTT_ERR_NO_CONN, None)
 998  
 999          return self._send_unsubscribe(False, topic_list)
1000  
1001      def loop_read(self, max_packets=1):
1002          """Process read network events. Use in place of calling loop() if you
1003          wish to handle your client reads as part of your own application.
1004          Use socket() to obtain the client socket to call select() or equivalent
1005          on.
1006          Do not use if you are using the threaded interface loop_start()."""
1007          if self._sock is None and self._ssl is None:
1008              return MQTT_ERR_NO_CONN
1009  
1010          max_packets = len(self._out_messages) + len(self._in_messages)
1011          if max_packets < 1:
1012              max_packets = 1
1013  
1014          for i in range(0, max_packets):
1015              rc = self._packet_read()
1016              if rc > 0:
1017                  return self._loop_rc_handle(rc)
1018              elif rc == MQTT_ERR_AGAIN:
1019                  return MQTT_ERR_SUCCESS
1020          return MQTT_ERR_SUCCESS
1021  
1022      def loop_write(self, max_packets=1):
1023          """Process read network events. Use in place of calling loop() if you
1024          wish to handle your client reads as part of your own application.
1025          Use socket() to obtain the client socket to call select() or equivalent
1026          on.
1027          Use want_write() to determine if there is data waiting to be written.
1028          Do not use if you are using the threaded interface loop_start()."""
1029          if self._sock is None and self._ssl is None:
1030              return MQTT_ERR_NO_CONN
1031  
1032          max_packets = len(self._out_packet) + 1
1033          if max_packets < 1:
1034              max_packets = 1
1035  
1036          for i in range(0, max_packets):
1037              rc = self._packet_write()
1038              if rc > 0:
1039                  return self._loop_rc_handle(rc)
1040              elif rc == MQTT_ERR_AGAIN:
1041                  return MQTT_ERR_SUCCESS
1042          return MQTT_ERR_SUCCESS
1043  
1044      def want_write(self):
1045          """Call to determine if there is network data waiting to be written.
1046          Useful if you are calling select() yourself rather than using loop().
1047          """
1048          if self._current_out_packet or len(self._out_packet) > 0:
1049              return True
1050          else:
1051              return False
1052  
1053      def loop_misc(self):
1054          """Process miscellaneous network events. Use in place of calling loop() if you
1055          wish to call select() or equivalent on.
1056          Do not use if you are using the threaded interface loop_start()."""
1057          if self._sock is None and self._ssl is None:
1058              return MQTT_ERR_NO_CONN
1059  
1060          now = time.time()
1061          self._check_keepalive()
1062          if self._last_retry_check+1 < now:
1063              # Only check once a second at most
1064              self._message_retry_check()
1065              self._last_retry_check = now
1066  
1067          if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
1068              # client->ping_t != 0 means we are waiting for a pingresp.
1069              # This hasn't happened in the keepalive time so we should disconnect.
1070              if self._ssl:
1071                  self._ssl.close()
1072                  self._ssl = None
1073              elif self._sock:
1074                  self._sock.close()
1075                  self._sock = None
1076  
1077              self._callback_mutex.acquire()
1078              if self._state == mqtt_cs_disconnecting:
1079                  rc = MQTT_ERR_SUCCESS
1080              else:
1081                  rc = 1
1082              if self.on_disconnect:
1083                  self._in_callback = True
1084                  self.on_disconnect(self, self._userdata, rc)
1085                  self._in_callback = False
1086              self._callback_mutex.release()
1087              return MQTT_ERR_CONN_LOST
1088  
1089          return MQTT_ERR_SUCCESS
1090  
1091      def max_inflight_messages_set(self, inflight):
1092          """Set the maximum number of messages with QoS>0 that can be part way
1093          through their network flow at once. Defaults to 20."""
1094          if inflight < 0:
1095              raise ValueError('Invalid inflight.')
1096          self._max_inflight_messages = inflight
1097  
1098      def message_retry_set(self, retry):
1099          """Set the timeout in seconds before a message with QoS>0 is retried.
1100          20 seconds by default."""
1101          if retry < 0:
1102              raise ValueError('Invalid retry.')
1103  
1104          self._message_retry = retry
1105  
1106      def user_data_set(self, userdata):
1107          """Set the user data variable passed to callbacks. May be any data type."""
1108          self._userdata = userdata
1109  
1110      def will_set(self, topic, payload=None, qos=0, retain=False):
1111          """Set a Will to be sent by the broker in case the client disconnects unexpectedly.
1112          This must be called before connect() to have any effect.
1113          topic: The topic that the will message should be published on.
1114          payload: The message to send as a will. If not given, or set to None a
1115          zero length message will be used as the will. Passing an int or float
1116          will result in the payload being converted to a string representing
1117          that number. If you wish to send a true int/float, use struct.pack() to
1118          create the payload you require.
1119          qos: The quality of service level to use for the will.
1120          retain: If set to true, the will message will be set as the "last known
1121          good"/retained message for the topic.
1122          Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
1123          zero string length.
1124          """
1125          if topic is None or len(topic) == 0:
1126              raise ValueError('Invalid topic.')
1127          if qos<0 or qos>2:
1128              raise ValueError('Invalid QoS level.')
1129          if isinstance(payload, str):
1130              self._will_payload = payload.encode('utf-8')
1131          elif isinstance(payload, bytearray):
1132              self._will_payload = payload
1133          elif isinstance(payload, int) or isinstance(payload, float):
1134              self._will_payload = str(payload)
1135          elif payload is None:
1136              self._will_payload = None
1137          else:
1138              raise TypeError('payload must be a string, bytearray, int, float or None.')
1139  
1140          self._will = True
1141          self._will_topic = topic.encode('utf-8')
1142          self._will_qos = qos
1143          self._will_retain = retain
1144  
1145      def will_clear(self):
1146          """ Removes a will that was previously configured with will_set().
1147          Must be called before connect() to have any effect."""
1148          self._will = False
1149          self._will_topic = ""
1150          self._will_payload = None
1151          self._will_qos = 0
1152          self._will_retain = False
1153  
1154      def socket(self):
1155          """Return the socket or ssl object for this client."""
1156          if self._ssl:
1157              return self._ssl
1158          else:
1159              return self._sock
1160  
1161      def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False):
1162          """This function call loop() for you in an infinite blocking loop. It
1163          is useful for the case where you only want to run the MQTT client loop
1164          in your program.
1165          loop_forever() will handle reconnecting for you. If you call
1166          disconnect() in a callback it will return.
1167          timeout: The time in seconds to wait for incoming/outgoing network
1168            traffic before timing out and returning.
1169          max_packets: Not currently used.
1170          retry_first_connection: Should the first connection attempt be retried on failure.
1171          Raises socket.error on first connection failures unless retry_first_connection=True
1172          """
1173  
1174          run = True
1175  
1176          while run:
1177              if self._state == mqtt_cs_connect_async:
1178                  try:
1179                      self.reconnect()
1180                  except socket.error:
1181                      if not retry_first_connection:
1182                          raise
1183                      self._easy_log(MQTT_LOG_DEBUG, "Connection failed, retrying")
1184                      time.sleep(1)
1185              else:
1186                  break
1187  
1188          while run:
1189              rc = MQTT_ERR_SUCCESS
1190              while rc == MQTT_ERR_SUCCESS:
1191                  rc = self.loop(timeout, max_packets)
1192                  # We don't need to worry about locking here, because we've
1193                  # either called loop_forever() when in single threaded mode, or
1194                  # in multi threaded mode when loop_stop() has been called and
1195                  # so no other threads can access _current_out_packet,
1196                  # _out_packet or _messages.
1197                  if (self._thread_terminate is True
1198                          and self._current_out_packet is None
1199                          and len(self._out_packet) == 0
1200                          and len(self._out_messages) == 0):
1201  
1202                      rc = 1
1203                      run = False
1204  
1205              self._state_mutex.acquire()
1206              if self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True:
1207                  run = False
1208                  self._state_mutex.release()
1209              else:
1210                  self._state_mutex.release()
1211                  time.sleep(1)
1212  
1213                  self._state_mutex.acquire()
1214                  if self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True:
1215                      run = False
1216                      self._state_mutex.release()
1217                  else:
1218                      self._state_mutex.release()
1219                      try:
1220                          self.reconnect()
1221                      except socket.error as err:
1222                          pass
1223  
1224          return rc
1225  
1226      def loop_start(self):
1227          """This is part of the threaded client interface. Call this once to
1228          start a new thread to process network traffic. This provides an
1229          alternative to repeatedly calling loop() yourself.
1230          """
1231          if self._thread is not None:
1232              return MQTT_ERR_INVAL
1233  
1234          self._thread_terminate = False
1235          self._thread = threading.Thread(target=self._thread_main)
1236          self._thread.daemon = True
1237          self._thread.start()
1238  
1239      def loop_stop(self, force=False):
1240          """This is part of the threaded client interface. Call this once to
1241          stop the network thread previously created with loop_start(). This call
1242          will block until the network thread finishes.
1243          The force parameter is currently ignored.
1244          """
1245          if self._thread is None:
1246              return MQTT_ERR_INVAL
1247  
1248          self._thread_terminate = True
1249          self._thread.join()
1250          self._thread = None
1251  
1252      def message_callback_add(self, sub, callback):
1253          """Register a message callback for a specific topic.
1254          Messages that match 'sub' will be passed to 'callback'. Any
1255          non-matching messages will be passed to the default on_message
1256          callback.
1257          
1258          Call multiple times with different 'sub' to define multiple topic
1259          specific callbacks.
1260          
1261          Topic specific callbacks may be removed with
1262          message_callback_remove()."""
1263          if callback is None or sub is None:
1264              raise ValueError("sub and callback must both be defined.")
1265  
1266          self._callback_mutex.acquire()
1267  
1268          for i in range(0, len(self.on_message_filtered)):
1269              if self.on_message_filtered[i][0] == sub:
1270                  self.on_message_filtered[i] = (sub, callback)
1271                  self._callback_mutex.release()
1272                  return
1273  
1274          self.on_message_filtered.append((sub, callback))
1275          self._callback_mutex.release()
1276  
1277      def message_callback_remove(self, sub):
1278          """Remove a message callback previously registered with
1279          message_callback_add()."""
1280          if sub is None:
1281              raise ValueError("sub must defined.")
1282  
1283          self._callback_mutex.acquire()
1284          for i in range(0, len(self.on_message_filtered)):
1285              if self.on_message_filtered[i][0] == sub:
1286                  self.on_message_filtered.pop(i)
1287                  self._callback_mutex.release()
1288                  return
1289          self._callback_mutex.release()
1290  
1291      # ============================================================
1292      # Private functions
1293      # ============================================================
1294  
1295      def _loop_rc_handle(self, rc):
1296          if rc:
1297              if self._ssl:
1298                  self._ssl.close()
1299                  self._ssl = None
1300              elif self._sock:
1301                  self._sock.close()
1302                  self._sock = None
1303  
1304              self._state_mutex.acquire()
1305              if self._state == mqtt_cs_disconnecting:
1306                  rc = MQTT_ERR_SUCCESS
1307              self._state_mutex.release()
1308              self._callback_mutex.acquire()
1309              if self.on_disconnect:
1310                  self._in_callback = True
1311                  self.on_disconnect(self, self._userdata, rc)
1312                  self._in_callback = False
1313  
1314              self._callback_mutex.release()
1315          return rc
1316  
1317      def _packet_read(self):
1318          # This gets called if pselect() indicates that there is network data
1319          # available - ie. at least one byte.  What we do depends on what data we
1320          # already have.
1321          # If we've not got a command, attempt to read one and save it. This should
1322          # always work because it's only a single byte.
1323          # Then try to read the remaining length. This may fail because it is may
1324          # be more than one byte - will need to save data pending next read if it
1325          # does fail.
1326          # Then try to read the remaining payload, where 'payload' here means the
1327          # combined variable header and actual payload. This is the most likely to
1328          # fail due to longer length, so save current data and current position.
1329          # After all data is read, send to _mqtt_handle_packet() to deal with.
1330          # Finally, free the memory and reset everything to starting conditions.
1331          if self._in_packet['command'] == 0:
1332              try:
1333                  if self._ssl:
1334                      command = self._ssl.read(1)
1335                  else:
1336                      command = self._sock.recv(1)
1337              except socket.error as err:
1338                  if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
1339                      return MQTT_ERR_AGAIN
1340                  if err.errno == EAGAIN:
1341                      return MQTT_ERR_AGAIN
1342                  print(err)
1343                  return 1
1344              else:
1345                  if len(command) == 0:
1346                      return 1
1347                  command = struct.unpack("!B", command)
1348                  self._in_packet['command'] = command[0]
1349  
1350          if self._in_packet['have_remaining'] == 0:
1351              # Read remaining
1352              # Algorithm for decoding taken from pseudo code at
1353              # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
1354              while True:
1355                  try:
1356                      if self._ssl:
1357                          byte = self._ssl.read(1)
1358                      else:
1359                          byte = self._sock.recv(1)
1360                  except socket.error as err:
1361                      if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
1362                          return MQTT_ERR_AGAIN
1363                      if err.errno == EAGAIN:
1364                          return MQTT_ERR_AGAIN
1365                      print(err)
1366                      return 1
1367                  else:
1368                      byte = struct.unpack("!B", byte)
1369                      byte = byte[0]
1370                      self._in_packet['remaining_count'].append(byte)
1371                      # Max 4 bytes length for remaining length as defined by protocol.
1372                       # Anything more likely means a broken/malicious client.
1373                      if len(self._in_packet['remaining_count']) > 4:
1374                          return MQTT_ERR_PROTOCOL
1375  
1376                      self._in_packet['remaining_length'] = self._in_packet['remaining_length'] + (byte & 127)*self._in_packet['remaining_mult']
1377                      self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128
1378  
1379                  if (byte & 128) == 0:
1380                      break
1381  
1382              self._in_packet['have_remaining'] = 1
1383              self._in_packet['to_process'] = self._in_packet['remaining_length']
1384  
1385          while self._in_packet['to_process'] > 0:
1386              try:
1387                  if self._ssl:
1388                      data = self._ssl.read(self._in_packet['to_process'])
1389                  else:
1390                      data = self._sock.recv(self._in_packet['to_process'])
1391              except socket.error as err:
1392                  if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
1393                      return MQTT_ERR_AGAIN
1394                  if err.errno == EAGAIN:
1395                      return MQTT_ERR_AGAIN
1396                  print(err)
1397                  return 1
1398              else:
1399                  self._in_packet['to_process'] = self._in_packet['to_process'] - len(data)
1400                  self._in_packet['packet'] = self._in_packet['packet'] + data
1401  
1402          # All data for this packet is read.
1403          self._in_packet['pos'] = 0
1404          rc = self._packet_handle()
1405  
1406          # Free data and reset values
1407          self._in_packet = dict(
1408              command=0,
1409              have_remaining=0,
1410              remaining_count=[],
1411              remaining_mult=1,
1412              remaining_length=0,
1413              packet=b"",
1414              to_process=0,
1415              pos=0)
1416  
1417          self._msgtime_mutex.acquire()
1418          self._last_msg_in = time.time()
1419          self._msgtime_mutex.release()
1420          return rc
1421  
1422      def _packet_write(self):
1423          self._current_out_packet_mutex.acquire()
1424  
1425          while self._current_out_packet:
1426              packet = self._current_out_packet
1427  
1428              try:
1429                  if self._ssl:
1430                      write_length = self._ssl.write(packet['packet'][packet['pos']:])
1431                  else:
1432                      write_length = self._sock.send(packet['packet'][packet['pos']:])
1433              except AttributeError:
1434                  self._current_out_packet_mutex.release()
1435                  return MQTT_ERR_SUCCESS
1436              except socket.error as err:
1437                  self._current_out_packet_mutex.release()
1438                  if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
1439                      return MQTT_ERR_AGAIN
1440                  if err.errno == EAGAIN:
1441                      return MQTT_ERR_AGAIN
1442                  print(err)
1443                  return 1
1444  
1445              if write_length > 0:
1446                  packet['to_process'] = packet['to_process'] - write_length
1447                  packet['pos'] = packet['pos'] + write_length
1448  
1449                  if packet['to_process'] == 0:
1450                      if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0:
1451                          self._callback_mutex.acquire()
1452                          if self.on_publish:
1453                              self._in_callback = True
1454                              self.on_publish(self, self._userdata, packet['mid'])
1455                              self._in_callback = False
1456  
1457                          self._callback_mutex.release()
1458  
1459                      if (packet['command'] & 0xF0) == DISCONNECT:
1460                          self._current_out_packet_mutex.release()
1461  
1462                          self._msgtime_mutex.acquire()
1463                          self._last_msg_out = time.time()
1464                          self._msgtime_mutex.release()
1465  
1466                          self._callback_mutex.acquire()
1467                          if self.on_disconnect:
1468                              self._in_callback = True
1469                              self.on_disconnect(self, self._userdata, 0)
1470                              self._in_callback = False
1471                          self._callback_mutex.release()
1472  
1473                          if self._ssl:
1474                              self._ssl.close()
1475                              self._ssl = None
1476                          if self._sock:
1477                              self._sock.close()
1478                              self._sock = None
1479                          return MQTT_ERR_SUCCESS
1480  
1481                      self._out_packet_mutex.acquire()
1482                      if len(self._out_packet) > 0:
1483                          self._current_out_packet = self._out_packet.pop(0)
1484                      else:
1485                          self._current_out_packet = None
1486                      self._out_packet_mutex.release()
1487              else:
1488                  pass  # FIXME
1489  
1490          self._current_out_packet_mutex.release()
1491  
1492          self._msgtime_mutex.acquire()
1493          self._last_msg_out = time.time()
1494          self._msgtime_mutex.release()
1495  
1496          return MQTT_ERR_SUCCESS
1497  
1498      def _easy_log(self, level, buf):
1499          if self.on_log:
1500              self.on_log(self, self._userdata, level, buf)
1501  
1502      def _check_keepalive(self):
1503          now = time.time()
1504          self._msgtime_mutex.acquire()
1505          last_msg_out = self._last_msg_out
1506          last_msg_in = self._last_msg_in
1507          self._msgtime_mutex.release()
1508          if (self._sock is not None or self._ssl is not None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
1509              if self._state == mqtt_cs_connected and self._ping_t == 0:
1510                  self._send_pingreq()
1511                  self._msgtime_mutex.acquire()
1512                  self._last_msg_out = now
1513                  self._last_msg_in = now
1514                  self._msgtime_mutex.release()
1515              else:
1516                  if self._ssl:
1517                      self._ssl.close()
1518                      self._ssl = None
1519                  elif self._sock:
1520                      self._sock.close()
1521                      self._sock = None
1522  
1523                  if self._state == mqtt_cs_disconnecting:
1524                      rc = MQTT_ERR_SUCCESS
1525                  else:
1526                      rc = 1
1527                  self._callback_mutex.acquire()
1528                  if self.on_disconnect:
1529                      self._in_callback = True
1530                      self.on_disconnect(self, self._userdata, rc)
1531                      self._in_callback = False
1532                  self._callback_mutex.release()
1533  
1534      def _mid_generate(self):
1535          self._last_mid = self._last_mid + 1
1536          if self._last_mid == 65536:
1537              self._last_mid = 1
1538          return self._last_mid
1539  
1540      def _topic_wildcard_len_check(self, topic):
1541          # Search for + or # in a topic. Return MQTT_ERR_INVAL if found.
1542           # Also returns MQTT_ERR_INVAL if the topic string is too long.
1543           # Returns MQTT_ERR_SUCCESS if everything is fine.
1544          if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535:
1545              return MQTT_ERR_INVAL
1546          else:
1547              return MQTT_ERR_SUCCESS
1548  
1549      def _send_pingreq(self):
1550          self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ")
1551          rc = self._send_simple_command(PINGREQ)
1552          if rc == MQTT_ERR_SUCCESS:
1553              self._ping_t = time.time()
1554          return rc
1555  
1556      def _send_pingresp(self):
1557          self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP")
1558          return self._send_simple_command(PINGRESP)
1559  
1560      def _send_puback(self, mid):
1561          self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")")
1562          return self._send_command_with_mid(PUBACK, mid, False)
1563  
1564      def _send_pubcomp(self, mid):
1565          self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")")
1566          return self._send_command_with_mid(PUBCOMP, mid, False)
1567  
1568      def _pack_remaining_length(self, packet, remaining_length):
1569          remaining_bytes = []
1570          while True:
1571              byte = remaining_length % 128
1572              remaining_length = remaining_length // 128
1573              # If there are more digits to encode, set the top bit of this digit
1574              if remaining_length > 0:
1575                  byte = byte | 0x80
1576  
1577              remaining_bytes.append(byte)
1578              packet.extend(struct.pack("!B", byte))
1579              if remaining_length == 0:
1580                  # FIXME - this doesn't deal with incorrectly large payloads
1581                  return packet
1582  
1583      def _pack_str16(self, packet, data):
1584          if sys.version_info[0] < 3:
1585              if isinstance(data, bytearray):
1586                  packet.extend(struct.pack("!H", len(data)))
1587                  packet.extend(data)
1588              elif isinstance(data, str):
1589                  udata = data.encode('utf-8')
1590                  pack_format = "!H" + str(len(udata)) + "s"
1591                  packet.extend(struct.pack(pack_format, len(udata), udata))
1592              elif isinstance(data, unicode):
1593                  udata = data.encode('utf-8')
1594                  pack_format = "!H" + str(len(udata)) + "s"
1595                  packet.extend(struct.pack(pack_format, len(udata), udata))
1596              else:
1597                  raise TypeError
1598          else:
1599              if isinstance(data, bytearray) or isinstance(data, bytes):
1600                  packet.extend(struct.pack("!H", len(data)))
1601                  packet.extend(data)
1602              elif isinstance(data, str):
1603                  udata = data.encode('utf-8')
1604                  pack_format = "!H" + str(len(udata)) + "s"
1605                  packet.extend(struct.pack(pack_format, len(udata), udata))
1606              else:
1607                  raise TypeError
1608  
1609      def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False):
1610          if self._sock is None and self._ssl is None:
1611              return MQTT_ERR_NO_CONN
1612  
1613          utopic = topic.encode('utf-8')
1614          command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain
1615          packet = bytearray()
1616          packet.extend(struct.pack("!B", command))
1617          if payload is None:
1618              remaining_length = 2+len(utopic)
1619              self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"' (NULL payload)")
1620          else:
1621              if isinstance(payload, str):
1622                  upayload = payload.encode('utf-8')
1623                  payloadlen = len(upayload)
1624              elif isinstance(payload, bytearray):
1625                  payloadlen = len(payload)
1626              elif isinstance(payload, unicode):
1627                  upayload = payload.encode('utf-8')
1628                  payloadlen = len(upayload)
1629  
1630              remaining_length = 2+len(utopic) + payloadlen
1631              self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"', ... ("+str(payloadlen)+" bytes)")
1632  
1633          if qos > 0:
1634              # For message id
1635              remaining_length = remaining_length + 2
1636  
1637          self._pack_remaining_length(packet, remaining_length)
1638          self._pack_str16(packet, topic)
1639  
1640          if qos > 0:
1641              # For message id
1642              packet.extend(struct.pack("!H", mid))
1643  
1644          if payload is not None:
1645              if isinstance(payload, str):
1646                  pack_format = str(payloadlen) + "s"
1647                  packet.extend(struct.pack(pack_format, upayload))
1648              elif isinstance(payload, bytearray):
1649                  packet.extend(payload)
1650              elif isinstance(payload, unicode):
1651                  pack_format = str(payloadlen) + "s"
1652                  packet.extend(struct.pack(pack_format, upayload))
1653              else:
1654                  raise TypeError('payload must be a string, unicode or a bytearray.')
1655  
1656          return self._packet_queue(PUBLISH, packet, mid, qos)
1657  
1658      def _send_pubrec(self, mid):
1659          self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")")
1660          return self._send_command_with_mid(PUBREC, mid, False)
1661  
1662      def _send_pubrel(self, mid, dup=False):
1663          self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")")
1664          return self._send_command_with_mid(PUBREL|2, mid, dup)
1665  
1666      def _send_command_with_mid(self, command, mid, dup):
1667          # For PUBACK, PUBCOMP, PUBREC, and PUBREL
1668          if dup:
1669              command = command | 8
1670  
1671          remaining_length = 2
1672          packet = struct.pack('!BBH', command, remaining_length, mid)
1673          return self._packet_queue(command, packet, mid, 1)
1674  
1675      def _send_simple_command(self, command):
1676          # For DISCONNECT, PINGREQ and PINGRESP
1677          remaining_length = 0
1678          packet = struct.pack('!BB', command, remaining_length)
1679          return self._packet_queue(command, packet, 0, 0)
1680  
1681      def _send_connect(self, keepalive, clean_session):
1682          if self._protocol == MQTTv31:
1683              protocol = PROTOCOL_NAMEv31
1684              proto_ver = 3
1685          else:
1686              protocol = PROTOCOL_NAMEv311
1687              proto_ver = 4
1688          remaining_length = 2+len(protocol) + 1+1+2 + 2+len(self._client_id)
1689          connect_flags = 0
1690          if clean_session:
1691              connect_flags = connect_flags | 0x02
1692  
1693          if self._will:
1694              if self._will_payload is not None:
1695                  remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload)
1696              else:
1697                  remaining_length = remaining_length + 2+len(self._will_topic) + 2
1698  
1699              connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5)
1700  
1701          if self._username:
1702              remaining_length = remaining_length + 2+len(self._username)
1703              connect_flags = connect_flags | 0x80
1704              if self._password:
1705                  connect_flags = connect_flags | 0x40
1706                  remaining_length = remaining_length + 2+len(self._password)
1707  
1708          command = CONNECT
1709          packet = bytearray()
1710          packet.extend(struct.pack("!B", command))
1711  
1712          self._pack_remaining_length(packet, remaining_length)
1713          packet.extend(struct.pack("!H"+str(len(protocol))+"sBBH", len(protocol), protocol, proto_ver, connect_flags, keepalive))
1714  
1715          self._pack_str16(packet, self._client_id)
1716  
1717          if self._will:
1718              self._pack_str16(packet, self._will_topic)
1719              if self._will_payload is None or len(self._will_payload) == 0:
1720                  packet.extend(struct.pack("!H", 0))
1721              else:
1722                  self._pack_str16(packet, self._will_payload)
1723  
1724          if self._username:
1725              self._pack_str16(packet, self._username)
1726  
1727              if self._password:
1728                  self._pack_str16(packet, self._password)
1729  
1730          self._keepalive = keepalive
1731          return self._packet_queue(command, packet, 0, 0)
1732  
1733      def _send_disconnect(self):
1734          return self._send_simple_command(DISCONNECT)
1735  
1736      def _send_subscribe(self, dup, topics):
1737          remaining_length = 2
1738          for t in topics:
1739              remaining_length = remaining_length + 2+len(t[0])+1
1740  
1741          command = SUBSCRIBE | (dup<<3) | (1<<1)
1742          packet = bytearray()
1743          packet.extend(struct.pack("!B", command))
1744          self._pack_remaining_length(packet, remaining_length)
1745          local_mid = self._mid_generate()
1746          packet.extend(struct.pack("!H", local_mid))
1747          for t in topics:
1748              self._pack_str16(packet, t[0])
1749              packet.extend(struct.pack("B", t[1]))
1750          return (self._packet_queue(command, packet, local_mid, 1), local_mid)
1751  
1752      def _send_unsubscribe(self, dup, topics):
1753          remaining_length = 2
1754          for t in topics:
1755              remaining_length = remaining_length + 2+len(t)
1756  
1757          command = UNSUBSCRIBE | (dup<<3) | (1<<1)
1758          packet = bytearray()
1759          packet.extend(struct.pack("!B", command))
1760          self._pack_remaining_length(packet, remaining_length)
1761          local_mid = self._mid_generate()
1762          packet.extend(struct.pack("!H", local_mid))
1763          for t in topics:
1764              self._pack_str16(packet, t)
1765          return (self._packet_queue(command, packet, local_mid, 1), local_mid)
1766  
1767      def _message_retry_check_actual(self, messages, mutex):
1768          mutex.acquire()
1769          now = time.time()
1770          for m in messages:
1771              if m.timestamp + self._message_retry < now:
1772                  if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec:
1773                      m.timestamp = now
1774                      m.dup = True
1775                      self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
1776                  elif m.state == mqtt_ms_wait_for_pubrel:
1777                      m.timestamp = now
1778                      m.dup = True
1779                      self._send_pubrec(m.mid)
1780                  elif m.state == mqtt_ms_wait_for_pubcomp:
1781                      m.timestamp = now
1782                      m.dup = True
1783                      self._send_pubrel(m.mid, True)
1784          mutex.release()
1785  
1786      def _message_retry_check(self):
1787          self._message_retry_check_actual(self._out_messages, self._out_message_mutex)
1788          self._message_retry_check_actual(self._in_messages, self._in_message_mutex)
1789  
1790      def _messages_reconnect_reset_out(self):
1791          self._out_message_mutex.acquire()
1792          self._inflight_messages = 0
1793          for m in self._out_messages:
1794              m.timestamp = 0
1795              if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
1796                  if m.qos == 0:
1797                      m.state = mqtt_ms_publish
1798                  elif m.qos == 1:
1799                      #self._inflight_messages = self._inflight_messages + 1
1800                      if m.state == mqtt_ms_wait_for_puback:
1801                          m.dup = True
1802                      m.state = mqtt_ms_publish
1803                  elif m.qos == 2:
1804                      #self._inflight_messages = self._inflight_messages + 1
1805                      if m.state == mqtt_ms_wait_for_pubcomp:
1806                          m.state = mqtt_ms_resend_pubrel
1807                          m.dup = True
1808                      else:
1809                          if m.state == mqtt_ms_wait_for_pubrec:
1810                              m.dup = True
1811                          m.state = mqtt_ms_publish
1812              else:
1813                  m.state = mqtt_ms_queued
1814          self._out_message_mutex.release()
1815  
1816      def _messages_reconnect_reset_in(self):
1817          self._in_message_mutex.acquire()
1818          for m in self._in_messages:
1819              m.timestamp = 0
1820              if m.qos != 2:
1821                  self._in_messages.pop(self._in_messages.index(m))
1822              else:
1823                  # Preserve current state
1824                  pass
1825          self._in_message_mutex.release()
1826  
1827      def _messages_reconnect_reset(self):
1828          self._messages_reconnect_reset_out()
1829          self._messages_reconnect_reset_in()
1830  
1831      def _packet_queue(self, command, packet, mid, qos):
1832          mpkt = dict(
1833              command = command,
1834              mid = mid,
1835              qos = qos,
1836              pos = 0,
1837              to_process = len(packet),
1838              packet = packet)
1839  
1840          self._out_packet_mutex.acquire()
1841          self._out_packet.append(mpkt)
1842          if self._current_out_packet_mutex.acquire(False):
1843              if self._current_out_packet is None and len(self._out_packet) > 0:
1844                  self._current_out_packet = self._out_packet.pop(0)
1845              self._current_out_packet_mutex.release()
1846          self._out_packet_mutex.release()
1847  
1848          # Write a single byte to sockpairW (connected to sockpairR) to break
1849          # out of select() if in threaded mode.
1850          try:
1851              self._sockpairW.send(sockpair_data)
1852          except socket.error as err:
1853              if err.errno != EAGAIN:
1854                  raise
1855  
1856          if not self._in_callback and self._thread is None:
1857              return self.loop_write()
1858          else:
1859              return MQTT_ERR_SUCCESS
1860  
1861      def _packet_handle(self):
1862          cmd = self._in_packet['command']&0xF0
1863          if cmd == PINGREQ:
1864              return self._handle_pingreq()
1865          elif cmd == PINGRESP:
1866              return self._handle_pingresp()
1867          elif cmd == PUBACK:
1868              return self._handle_pubackcomp("PUBACK")
1869          elif cmd == PUBCOMP:
1870              return self._handle_pubackcomp("PUBCOMP")
1871          elif cmd == PUBLISH:
1872              return self._handle_publish()
1873          elif cmd == PUBREC:
1874              return self._handle_pubrec()
1875          elif cmd == PUBREL:
1876              return self._handle_pubrel()
1877          elif cmd == CONNACK:
1878              return self._handle_connack()
1879          elif cmd == SUBACK:
1880              return self._handle_suback()
1881          elif cmd == UNSUBACK:
1882              return self._handle_unsuback()
1883          else:
1884              # If we don't recognise the command, return an error straight away.
1885              self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command "+str(cmd))
1886              return MQTT_ERR_PROTOCOL
1887  
1888      def _handle_pingreq(self):
1889          if self._strict_protocol:
1890              if self._in_packet['remaining_length'] != 0:
1891                  return MQTT_ERR_PROTOCOL
1892  
1893          self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ")
1894          return self._send_pingresp()
1895  
1896      def _handle_pingresp(self):
1897          if self._strict_protocol:
1898              if self._in_packet['remaining_length'] != 0:
1899                  return MQTT_ERR_PROTOCOL
1900  
1901          # No longer waiting for a PINGRESP.
1902          self._ping_t = 0
1903          self._easy_log(MQTT_LOG_DEBUG, "Received PINGRESP")
1904          return MQTT_ERR_SUCCESS
1905  
1906      def _handle_connack(self):
1907          if self._strict_protocol:
1908              if self._in_packet['remaining_length'] != 2:
1909                  return MQTT_ERR_PROTOCOL
1910  
1911          if len(self._in_packet['packet']) != 2:
1912              return MQTT_ERR_PROTOCOL
1913  
1914          (flags, result) = struct.unpack("!BB", self._in_packet['packet'])
1915          if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
1916              self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.")
1917              # Downgrade to MQTT v3.1
1918              self._protocol = MQTTv31
1919              return self.reconnect()
1920  
1921          if result == 0:
1922              self._state = mqtt_cs_connected
1923  
1924          self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
1925          self._callback_mutex.acquire()
1926          if self.on_connect:
1927              self._in_callback = True
1928  
1929              if sys.version_info[0] < 3:
1930                  argcount = self.on_connect.func_code.co_argcount
1931              else:
1932                  argcount = self.on_connect.__code__.co_argcount
1933  
1934              if argcount == 3:
1935                  self.on_connect(self, self._userdata, result)
1936              else:
1937                  flags_dict = dict()
1938                  flags_dict['session present'] = flags & 0x01
1939                  self.on_connect(self, self._userdata, flags_dict, result)
1940              self._in_callback = False
1941          self._callback_mutex.release()
1942          if result == 0:
1943              rc = 0
1944              self._out_message_mutex.acquire()
1945              for m in self._out_messages:
1946                  m.timestamp = time.time()
1947                  if m.state == mqtt_ms_queued:
1948                      self.loop_write() # Process outgoing messages that have just been queued up
1949                      self._out_message_mutex.release()
1950                      return MQTT_ERR_SUCCESS
1951  
1952                  if m.qos == 0:
1953                      self._in_callback = True # Don't call loop_write after _send_publish()
1954                      rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
1955                      self._in_callback = False
1956                      if rc != 0:
1957                          self._out_message_mutex.release()
1958                          return rc
1959                  elif m.qos == 1:
1960                      if m.state == mqtt_ms_publish:
1961                          self._inflight_messages = self._inflight_messages + 1
1962                          m.state = mqtt_ms_wait_for_puback
1963                          self._in_callback = True # Don't call loop_write after _send_publish()
1964                          rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
1965                          self._in_callback = False
1966                          if rc != 0:
1967                              self._out_message_mutex.release()
1968                              return rc
1969                  elif m.qos == 2:
1970                      if m.state == mqtt_ms_publish:
1971                          self._inflight_messages = self._inflight_messages + 1
1972                          m.state = mqtt_ms_wait_for_pubrec
1973                          self._in_callback = True # Don't call loop_write after _send_publish()
1974                          rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
1975                          self._in_callback = False
1976                          if rc != 0:
1977                              self._out_message_mutex.release()
1978                              return rc
1979                      elif m.state == mqtt_ms_resend_pubrel:
1980                          self._inflight_messages = self._inflight_messages + 1
1981                          m.state = mqtt_ms_wait_for_pubcomp
1982                          self._in_callback = True # Don't call loop_write after _send_pubrel()
1983                          rc = self._send_pubrel(m.mid, m.dup)
1984                          self._in_callback = False
1985                          if rc != 0:
1986                              self._out_message_mutex.release()
1987                              return rc
1988                  self.loop_write() # Process outgoing messages that have just been queued up
1989              self._out_message_mutex.release()
1990              return rc
1991          elif result > 0 and result < 6:
1992              return MQTT_ERR_CONN_REFUSED
1993          else:
1994              return MQTT_ERR_PROTOCOL
1995  
1996      def _handle_suback(self):
1997          self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK")
1998          pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
1999          (mid, packet) = struct.unpack(pack_format, self._in_packet['packet'])
2000          pack_format = "!" + "B"*len(packet)
2001          granted_qos = struct.unpack(pack_format, packet)
2002  
2003          self._callback_mutex.acquire()
2004          if self.on_subscribe:
2005              self._in_callback = True
2006              self.on_subscribe(self, self._userdata, mid, granted_qos)
2007              self._in_callback = False
2008          self._callback_mutex.release()
2009  
2010          return MQTT_ERR_SUCCESS
2011  
2012      def _handle_publish(self):
2013          rc = 0
2014  
2015          header = self._in_packet['command']
2016          message = MQTTMessage()
2017          message.dup = (header & 0x08)>>3
2018          message.qos = (header & 0x06)>>1
2019          message.retain = (header & 0x01)
2020  
2021          pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
2022          (slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
2023          pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's'
2024          (message.topic, packet) = struct.unpack(pack_format, packet)
2025  
2026          if len(message.topic) == 0:
2027              return MQTT_ERR_PROTOCOL
2028  
2029          if sys.version_info[0] >= 3:
2030              message.topic = message.topic.decode('utf-8')
2031  
2032          if message.qos > 0:
2033              pack_format = "!H" + str(len(packet)-2) + 's'
2034              (message.mid, packet) = struct.unpack(pack_format, packet)
2035  
2036          message.payload = packet
2037  
2038          self._easy_log(
2039              MQTT_LOG_DEBUG,
2040              "Received PUBLISH (d"+str(message.dup)+
2041              ", q"+str(message.qos)+", r"+str(message.retain)+
2042              ", m"+str(message.mid)+", '"+message.topic+
2043              "', ...  ("+str(len(message.payload))+" bytes)")
2044  
2045          message.timestamp = time.time()
2046          if message.qos == 0:
2047              self._handle_on_message(message)
2048              return MQTT_ERR_SUCCESS
2049          elif message.qos == 1:
2050              rc = self._send_puback(message.mid)
2051              self._handle_on_message(message)
2052              return rc
2053          elif message.qos == 2:
2054              rc = self._send_pubrec(message.mid)
2055              message.state = mqtt_ms_wait_for_pubrel
2056              self._in_message_mutex.acquire()
2057              self._in_messages.append(message)
2058              self._in_message_mutex.release()
2059              return rc
2060          else:
2061              return MQTT_ERR_PROTOCOL
2062  
2063      def _handle_pubrel(self):
2064          if self._strict_protocol:
2065              if self._in_packet['remaining_length'] != 2:
2066                  return MQTT_ERR_PROTOCOL
2067  
2068          if len(self._in_packet['packet']) != 2:
2069              return MQTT_ERR_PROTOCOL
2070  
2071          mid = struct.unpack("!H", self._in_packet['packet'])
2072          mid = mid[0]
2073          self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")")
2074  
2075          self._in_message_mutex.acquire()
2076          for i in range(len(self._in_messages)):
2077              if self._in_messages[i].mid == mid:
2078  
2079                  # Only pass the message on if we have removed it from the queue - this
2080                  # prevents multiple callbacks for the same message.
2081                  self._handle_on_message(self._in_messages[i])
2082                  self._in_messages.pop(i)
2083                  self._inflight_messages = self._inflight_messages - 1
2084                  if self._max_inflight_messages > 0:
2085                      self._out_message_mutex.acquire()
2086                      rc = self._update_inflight()
2087                      self._out_message_mutex.release()
2088                      if rc != MQTT_ERR_SUCCESS:
2089                          self._in_message_mutex.release()
2090                          return rc
2091  
2092                  self._in_message_mutex.release()
2093                  return self._send_pubcomp(mid)
2094  
2095          self._in_message_mutex.release()
2096          return MQTT_ERR_SUCCESS
2097  
2098      def _update_inflight(self):
2099          # Dont lock message_mutex here
2100          for m in self._out_messages:
2101              if self._inflight_messages < self._max_inflight_messages:
2102                  if m.qos > 0 and m.state == mqtt_ms_queued:
2103                      self._inflight_messages = self._inflight_messages + 1
2104                      if m.qos == 1:
2105                          m.state = mqtt_ms_wait_for_puback
2106                      elif m.qos == 2:
2107                          m.state = mqtt_ms_wait_for_pubrec
2108                      rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
2109                      if rc != 0:
2110                          return rc
2111              else:
2112                  return MQTT_ERR_SUCCESS
2113          return MQTT_ERR_SUCCESS
2114  
2115      def _handle_pubrec(self):
2116          if self._strict_protocol:
2117              if self._in_packet['remaining_length'] != 2:
2118                  return MQTT_ERR_PROTOCOL
2119  
2120          mid = struct.unpack("!H", self._in_packet['packet'])
2121          mid = mid[0]
2122          self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")")
2123  
2124          self._out_message_mutex.acquire()
2125          for m in self._out_messages:
2126              if m.mid == mid:
2127                  m.state = mqtt_ms_wait_for_pubcomp
2128                  m.timestamp = time.time()
2129                  self._out_message_mutex.release()
2130                  return self._send_pubrel(mid, False)
2131  
2132          self._out_message_mutex.release()
2133          return MQTT_ERR_SUCCESS
2134  
2135      def _handle_unsuback(self):
2136          if self._strict_protocol:
2137              if self._in_packet['remaining_length'] != 2:
2138                  return MQTT_ERR_PROTOCOL
2139  
2140          mid = struct.unpack("!H", self._in_packet['packet'])
2141          mid = mid[0]
2142          self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")")
2143          self._callback_mutex.acquire()
2144          if self.on_unsubscribe:
2145              self._in_callback = True
2146              self.on_unsubscribe(self, self._userdata, mid)
2147              self._in_callback = False
2148          self._callback_mutex.release()
2149          return MQTT_ERR_SUCCESS
2150  
2151      def _handle_pubackcomp(self, cmd):
2152          if self._strict_protocol:
2153              if self._in_packet['remaining_length'] != 2:
2154                  return MQTT_ERR_PROTOCOL
2155  
2156          mid = struct.unpack("!H", self._in_packet['packet'])
2157          mid = mid[0]
2158          self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")")
2159  
2160          self._out_message_mutex.acquire()
2161          for i in range(len(self._out_messages)):
2162              try:
2163                  if self._out_messages[i].mid == mid:
2164                      # Only inform the client the message has been sent once.
2165                      self._callback_mutex.acquire()
2166                      if self.on_publish:
2167                          self._out_message_mutex.release()
2168                          self._in_callback = True
2169                          self.on_publish(self, self._userdata, mid)
2170                          self._in_callback = False
2171                          self._out_message_mutex.acquire()
2172  
2173                      self._callback_mutex.release()
2174                      self._out_messages.pop(i)
2175                      self._inflight_messages = self._inflight_messages - 1
2176                      if self._max_inflight_messages > 0:
2177                          rc = self._update_inflight()
2178                          if rc != MQTT_ERR_SUCCESS:
2179                              self._out_message_mutex.release()
2180                              return rc
2181                      self._out_message_mutex.release()
2182                      return MQTT_ERR_SUCCESS
2183              except IndexError:
2184                  # Have removed item so i>count.
2185                  # Not really an error.
2186                  pass
2187  
2188          self._out_message_mutex.release()
2189          return MQTT_ERR_SUCCESS
2190  
2191      def _handle_on_message(self, message):
2192          self._callback_mutex.acquire()
2193          matched = False
2194          for t in self.on_message_filtered:
2195              if topic_matches_sub(t[0], message.topic):
2196                  self._in_callback = True
2197                  t[1](self, self._userdata, message)
2198                  self._in_callback = False
2199                  matched = True
2200  
2201          if matched == False and self.on_message:
2202              self._in_callback = True
2203              self.on_message(self, self._userdata, message)
2204              self._in_callback = False
2205  
2206          self._callback_mutex.release()
2207  
2208      def _thread_main(self):
2209          self._state_mutex.acquire()
2210          if self._state == mqtt_cs_connect_async:
2211              self._state_mutex.release()
2212              self.reconnect()
2213          else:
2214              self._state_mutex.release()
2215  
2216          self.loop_forever()
2217  
2218      def _host_matches_cert(self, host, cert_host):
2219          if cert_host[0:2] == "*.":
2220              if cert_host.count("*") != 1:
2221                  return False
2222  
2223              host_match = host.split(".", 1)[1]
2224              cert_match = cert_host.split(".", 1)[1]
2225              if host_match == cert_match:
2226                  return True
2227              else:
2228                  return False
2229          else:
2230              if host == cert_host:
2231                  return True
2232              else:
2233                  return False
2234  
2235      def _tls_match_hostname(self):
2236          cert = self._ssl.getpeercert()
2237          san = cert.get('subjectAltName')
2238          if san:
2239              have_san_dns = False
2240              for (key, value) in san:
2241                  if key == 'DNS':
2242                      have_san_dns = True
2243                      if self._host_matches_cert(self._host.lower(), value.lower()) == True:
2244                          return
2245                  if key == 'IP Address':
2246                      have_san_dns = True
2247                      if value.lower() == self._host.lower():
2248                          return
2249  
2250              if have_san_dns:
2251                  # Only check subject if subjectAltName dns not found.
2252                  raise ssl.SSLError('Certificate subject does not match remote hostname.')
2253          subject = cert.get('subject')
2254          if subject:
2255              for ((key, value),) in subject:
2256                  if key == 'commonName':
2257                      if self._host_matches_cert(self._host.lower(), value.lower()) == True:
2258                          return
2259  
2260          raise ssl.SSLError('Certificate subject does not match remote hostname.')
2261  
2262  
2263  # Compatibility class for easy porting from mosquitto.py. 
2264  class Mosquitto(Client):
2265      def __init__(self, client_id="", clean_session=True, userdata=None):
2266          super(Mosquitto, self).__init__(client_id, clean_session, userdata)