client.py
1 # SPDX-FileCopyrightText: 2014 Roger Light 2 # 3 # SPDX-License-Identifier: EPL-1.0 4 5 # Copyright (c) 2012-2014 Roger Light <roger@atchoo.org> 6 # 7 # All rights reserved. This program and the accompanying materials 8 # are made available under the terms of the Eclipse Public License v1.0 9 # and Eclipse Distribution License v1.0 which accompany this distribution. 10 # 11 # The Eclipse Public License is available at 12 # http://www.eclipse.org/legal/epl-v10.html 13 # and the Eclipse Distribution License is available at 14 # http://www.eclipse.org/org/documents/edl-v10.php. 15 # 16 # Contributors: 17 # Roger Light - initial API and implementation 18 19 """ 20 This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging 21 protocol that is easy to implement and suitable for low powered devices. 22 """ 23 import errno 24 import platform 25 import random 26 import select 27 import socket 28 HAVE_SSL = True 29 try: 30 import ssl 31 cert_reqs = ssl.CERT_REQUIRED 32 tls_version = ssl.PROTOCOL_TLSv1 33 except: 34 HAVE_SSL = False 35 cert_reqs = None 36 tls_version = None 37 import struct 38 import sys 39 import threading 40 import time 41 HAVE_DNS = True 42 try: 43 import dns.resolver 44 except ImportError: 45 HAVE_DNS = False 46 47 if platform.system() == 'Windows': 48 EAGAIN = errno.WSAEWOULDBLOCK 49 else: 50 EAGAIN = errno.EAGAIN 51 52 VERSION_MAJOR=1 53 VERSION_MINOR=0 54 VERSION_REVISION=0 55 VERSION_NUMBER=(VERSION_MAJOR*1000000+VERSION_MINOR*1000+VERSION_REVISION) 56 57 MQTTv31 = 3 58 MQTTv311 = 4 59 60 if sys.version_info[0] < 3: 61 PROTOCOL_NAMEv31 = "MQIsdp" 62 PROTOCOL_NAMEv311 = "MQTT" 63 else: 64 PROTOCOL_NAMEv31 = b"MQIsdp" 65 PROTOCOL_NAMEv311 = b"MQTT" 66 67 PROTOCOL_VERSION = 3 68 69 # Message types 70 CONNECT = 0x10 71 CONNACK = 0x20 72 PUBLISH = 0x30 73 PUBACK = 0x40 74 PUBREC = 0x50 75 PUBREL = 0x60 76 PUBCOMP = 0x70 77 SUBSCRIBE = 0x80 78 SUBACK = 0x90 79 UNSUBSCRIBE = 0xA0 80 UNSUBACK = 0xB0 81 PINGREQ = 0xC0 82 PINGRESP = 0xD0 83 DISCONNECT = 0xE0 84 85 # Log levels 86 MQTT_LOG_INFO = 0x01 87 MQTT_LOG_NOTICE = 0x02 88 MQTT_LOG_WARNING = 0x04 89 MQTT_LOG_ERR = 0x08 90 MQTT_LOG_DEBUG = 0x10 91 92 # CONNACK codes 93 CONNACK_ACCEPTED = 0 94 CONNACK_REFUSED_PROTOCOL_VERSION = 1 95 CONNACK_REFUSED_IDENTIFIER_REJECTED = 2 96 CONNACK_REFUSED_SERVER_UNAVAILABLE = 3 97 CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 98 CONNACK_REFUSED_NOT_AUTHORIZED = 5 99 100 # Connection state 101 mqtt_cs_new = 0 102 mqtt_cs_connected = 1 103 mqtt_cs_disconnecting = 2 104 mqtt_cs_connect_async = 3 105 106 # Message state 107 mqtt_ms_invalid = 0 108 mqtt_ms_publish= 1 109 mqtt_ms_wait_for_puback = 2 110 mqtt_ms_wait_for_pubrec = 3 111 mqtt_ms_resend_pubrel = 4 112 mqtt_ms_wait_for_pubrel = 5 113 mqtt_ms_resend_pubcomp = 6 114 mqtt_ms_wait_for_pubcomp = 7 115 mqtt_ms_send_pubrec = 8 116 mqtt_ms_queued = 9 117 118 # Error values 119 MQTT_ERR_AGAIN = -1 120 MQTT_ERR_SUCCESS = 0 121 MQTT_ERR_NOMEM = 1 122 MQTT_ERR_PROTOCOL = 2 123 MQTT_ERR_INVAL = 3 124 MQTT_ERR_NO_CONN = 4 125 MQTT_ERR_CONN_REFUSED = 5 126 MQTT_ERR_NOT_FOUND = 6 127 MQTT_ERR_CONN_LOST = 7 128 MQTT_ERR_TLS = 8 129 MQTT_ERR_PAYLOAD_SIZE = 9 130 MQTT_ERR_NOT_SUPPORTED = 10 131 MQTT_ERR_AUTH = 11 132 MQTT_ERR_ACL_DENIED = 12 133 MQTT_ERR_UNKNOWN = 13 134 MQTT_ERR_ERRNO = 14 135 136 if sys.version_info[0] < 3: 137 sockpair_data = "0" 138 else: 139 sockpair_data = b"0" 140 141 def error_string(mqtt_errno): 142 """Return the error string associated with an mqtt error number.""" 143 if mqtt_errno == MQTT_ERR_SUCCESS: 144 return "No error." 145 elif mqtt_errno == MQTT_ERR_NOMEM: 146 return "Out of memory." 147 elif mqtt_errno == MQTT_ERR_PROTOCOL: 148 return "A network protocol error occurred when communicating with the broker." 149 elif mqtt_errno == MQTT_ERR_INVAL: 150 return "Invalid function arguments provided." 151 elif mqtt_errno == MQTT_ERR_NO_CONN: 152 return "The client is not currently connected." 153 elif mqtt_errno == MQTT_ERR_CONN_REFUSED: 154 return "The connection was refused." 155 elif mqtt_errno == MQTT_ERR_NOT_FOUND: 156 return "Message not found (internal error)." 157 elif mqtt_errno == MQTT_ERR_CONN_LOST: 158 return "The connection was lost." 159 elif mqtt_errno == MQTT_ERR_TLS: 160 return "A TLS error occurred." 161 elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE: 162 return "Payload too large." 163 elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED: 164 return "This feature is not supported." 165 elif mqtt_errno == MQTT_ERR_AUTH: 166 return "Authorisation failed." 167 elif mqtt_errno == MQTT_ERR_ACL_DENIED: 168 return "Access denied by ACL." 169 elif mqtt_errno == MQTT_ERR_UNKNOWN: 170 return "Unknown error." 171 elif mqtt_errno == MQTT_ERR_ERRNO: 172 return "Error defined by errno." 173 else: 174 return "Unknown error." 175 176 177 def connack_string(connack_code): 178 """Return the string associated with a CONNACK result.""" 179 if connack_code == 0: 180 return "Connection Accepted." 181 elif connack_code == 1: 182 return "Connection Refused: unacceptable protocol version." 183 elif connack_code == 2: 184 return "Connection Refused: identifier rejected." 185 elif connack_code == 3: 186 return "Connection Refused: broker unavailable." 187 elif connack_code == 4: 188 return "Connection Refused: bad user name or password." 189 elif connack_code == 5: 190 return "Connection Refused: not authorised." 191 else: 192 return "Connection Refused: unknown reason." 193 194 195 def topic_matches_sub(sub, topic): 196 """Check whether a topic matches a subscription. 197 For example: 198 foo/bar would match the subscription foo/# or +/bar 199 non/matching would not match the subscription non/+/+ 200 """ 201 result = True 202 multilevel_wildcard = False 203 204 slen = len(sub) 205 tlen = len(topic) 206 207 if slen > 0 and tlen > 0: 208 if (sub[0] == '$' and topic[0] != '$') or (topic[0] == '$' and sub[0] != '$'): 209 return False 210 211 spos = 0 212 tpos = 0 213 214 while spos < slen and tpos < tlen: 215 if sub[spos] == topic[tpos]: 216 if tpos == tlen-1: 217 # Check for e.g. foo matching foo/# 218 if spos == slen-3 and sub[spos+1] == '/' and sub[spos+2] == '#': 219 result = True 220 multilevel_wildcard = True 221 break 222 223 spos += 1 224 tpos += 1 225 226 if tpos == tlen and spos == slen-1 and sub[spos] == '+': 227 spos += 1 228 result = True 229 break 230 else: 231 if sub[spos] == '+': 232 spos += 1 233 while tpos < tlen and topic[tpos] != '/': 234 tpos += 1 235 if tpos == tlen and spos == slen: 236 result = True 237 break 238 239 elif sub[spos] == '#': 240 multilevel_wildcard = True 241 if spos+1 != slen: 242 result = False 243 break 244 else: 245 result = True 246 break 247 248 else: 249 result = False 250 break 251 252 if not multilevel_wildcard and (tpos < tlen or spos < slen): 253 result = False 254 255 return result 256 257 258 def _socketpair_compat(): 259 """TCP/IP socketpair including Windows support""" 260 listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) 261 listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 262 listensock.bind(("127.0.0.1", 0)) 263 listensock.listen(1) 264 265 iface, port = listensock.getsockname() 266 sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP) 267 sock1.setblocking(0) 268 try: 269 sock1.connect(("localhost", port)) 270 except socket.error as err: 271 if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN: 272 raise 273 sock2, address = listensock.accept() 274 sock2.setblocking(0) 275 listensock.close() 276 return (sock1, sock2) 277 278 279 class MQTTMessage: 280 """ This is a class that describes an incoming message. It is passed to the 281 on_message callback as the message parameter. 282 Members: 283 topic : String. topic that the message was published on. 284 payload : String/bytes the message payload. 285 qos : Integer. The message Quality of Service 0, 1 or 2. 286 retain : Boolean. If true, the message is a retained message and not fresh. 287 mid : Integer. The message id. 288 """ 289 def __init__(self): 290 self.timestamp = 0 291 self.state = mqtt_ms_invalid 292 self.dup = False 293 self.mid = 0 294 self.topic = "" 295 self.payload = None 296 self.qos = 0 297 self.retain = False 298 299 300 class Client(object): 301 """MQTT version 3.1/3.1.1 client class. 302 This is the main class for use communicating with an MQTT broker. 303 General usage flow: 304 * Use connect()/connect_async() to connect to a broker 305 * Call loop() frequently to maintain network traffic flow with the broker 306 * Or use loop_start() to set a thread running to call loop() for you. 307 * Or use loop_forever() to handle calling loop() for you in a blocking 308 * function. 309 * Use subscribe() to subscribe to a topic and receive messages 310 * Use publish() to send messages 311 * Use disconnect() to disconnect from the broker 312 Data returned from the broker is made available with the use of callback 313 functions as described below. 314 Callbacks 315 ========= 316 A number of callback functions are available to receive data back from the 317 broker. To use a callback, define a function and then assign it to the 318 client: 319 def on_connect(client, userdata, flags, rc): 320 print("Connection returned " + str(rc)) 321 client.on_connect = on_connect 322 All of the callbacks as described below have a "client" and an "userdata" 323 argument. "client" is the Client instance that is calling the callback. 324 "userdata" is user data of any type and can be set when creating a new client 325 instance or with user_data_set(userdata). 326 The callbacks: 327 on_connect(client, userdata, flags, rc): called when the broker responds to our connection 328 request. 329 flags is a dict that contains response flags from the broker: 330 flags['session present'] - this flag is useful for clients that are 331 using clean session set to 0 only. If a client with clean 332 session=0, that reconnects to a broker that it has previously 333 connected to, this flag indicates whether the broker still has the 334 session information for the client. If 1, the session still exists. 335 The value of rc determines success or not: 336 0: Connection successful 337 1: Connection refused - incorrect protocol version 338 2: Connection refused - invalid client identifier 339 3: Connection refused - server unavailable 340 4: Connection refused - bad username or password 341 5: Connection refused - not authorised 342 6-255: Currently unused. 343 on_disconnect(client, userdata, rc): called when the client disconnects from the broker. 344 The rc parameter indicates the disconnection state. If MQTT_ERR_SUCCESS 345 (0), the callback was called in response to a disconnect() call. If any 346 other value the disconnection was unexpected, such as might be caused by 347 a network error. 348 on_message(client, userdata, message): called when a message has been received on a 349 topic that the client subscribes to. The message variable is a 350 MQTTMessage that describes all of the message parameters. 351 on_publish(client, userdata, mid): called when a message that was to be sent using the 352 publish() call has completed transmission to the broker. For messages 353 with QoS levels 1 and 2, this means that the appropriate handshakes have 354 completed. For QoS 0, this simply means that the message has left the 355 client. The mid variable matches the mid variable returned from the 356 corresponding publish() call, to allow outgoing messages to be tracked. 357 This callback is important because even if the publish() call returns 358 success, it does not always mean that the message has been sent. 359 on_subscribe(client, userdata, mid, granted_qos): called when the broker responds to a 360 subscribe request. The mid variable matches the mid variable returned 361 from the corresponding subscribe() call. The granted_qos variable is a 362 list of integers that give the QoS level the broker has granted for each 363 of the different subscription requests. 364 on_unsubscribe(client, userdata, mid): called when the broker responds to an unsubscribe 365 request. The mid variable matches the mid variable returned from the 366 corresponding unsubscribe() call. 367 on_log(client, userdata, level, buf): called when the client has log information. Define 368 to allow debugging. The level variable gives the severity of the message 369 and will be one of MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, 370 MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf. 371 """ 372 def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv31): 373 """client_id is the unique client id string used when connecting to the 374 broker. If client_id is zero length or None, then one will be randomly 375 generated. In this case, clean_session must be True. If this is not the 376 case a ValueError will be raised. 377 clean_session is a boolean that determines the client type. If True, 378 the broker will remove all information about this client when it 379 disconnects. If False, the client is a persistent client and 380 subscription information and queued messages will be retained when the 381 client disconnects. 382 Note that a client will never discard its own outgoing messages on 383 disconnect. Calling connect() or reconnect() will cause the messages to 384 be resent. Use reinitialise() to reset a client to its original state. 385 userdata is user defined data of any type that is passed as the "userdata" 386 parameter to callbacks. It may be updated at a later point with the 387 user_data_set() function. 388 The protocol argument allows explicit setting of the MQTT version to 389 use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1) or 390 paho.mqtt.client.MQTTv31 (v3.1), with the default being v3.1. If the 391 broker reports that the client connected with an invalid protocol 392 version, the client will automatically attempt to reconnect using v3.1 393 instead. 394 """ 395 if not clean_session and (client_id == "" or client_id is None): 396 raise ValueError('A client id must be provided if clean session is False.') 397 398 self._protocol = protocol 399 self._userdata = userdata 400 self._sock = None 401 self._sockpairR, self._sockpairW = _socketpair_compat() 402 self._keepalive = 60 403 self._message_retry = 20 404 self._last_retry_check = 0 405 self._clean_session = clean_session 406 if client_id == "" or client_id is None: 407 self._client_id = "paho/" + "".join(random.choice("0123456789ADCDEF") for x in range(23-5)) 408 else: 409 self._client_id = client_id 410 411 self._username = "" 412 self._password = "" 413 self._in_packet = { 414 "command": 0, 415 "have_remaining": 0, 416 "remaining_count": [], 417 "remaining_mult": 1, 418 "remaining_length": 0, 419 "packet": b"", 420 "to_process": 0, 421 "pos": 0} 422 self._out_packet = [] 423 self._current_out_packet = None 424 self._last_msg_in = time.time() 425 self._last_msg_out = time.time() 426 self._ping_t = 0 427 self._last_mid = 0 428 self._state = mqtt_cs_new 429 self._out_messages = [] 430 self._in_messages = [] 431 self._max_inflight_messages = 20 432 self._inflight_messages = 0 433 self._will = False 434 self._will_topic = "" 435 self._will_payload = None 436 self._will_qos = 0 437 self._will_retain = False 438 self.on_disconnect = None 439 self.on_connect = None 440 self.on_publish = None 441 self.on_message = None 442 self.on_message_filtered = [] 443 self.on_subscribe = None 444 self.on_unsubscribe = None 445 self.on_log = None 446 self._host = "" 447 self._port = 1883 448 self._bind_address = "" 449 self._in_callback = False 450 self._strict_protocol = False 451 self._callback_mutex = threading.Lock() 452 self._state_mutex = threading.Lock() 453 self._out_packet_mutex = threading.Lock() 454 self._current_out_packet_mutex = threading.Lock() 455 self._msgtime_mutex = threading.Lock() 456 self._out_message_mutex = threading.Lock() 457 self._in_message_mutex = threading.Lock() 458 self._thread = None 459 self._thread_terminate = False 460 self._ssl = None 461 self._tls_certfile = None 462 self._tls_keyfile = None 463 self._tls_ca_certs = None 464 self._tls_cert_reqs = None 465 self._tls_ciphers = None 466 self._tls_version = tls_version 467 self._tls_insecure = False 468 469 def __del__(self): 470 pass 471 472 def reinitialise(self, client_id="", clean_session=True, userdata=None): 473 if self._ssl: 474 self._ssl.close() 475 self._ssl = None 476 self._sock = None 477 elif self._sock: 478 self._sock.close() 479 self._sock = None 480 if self._sockpairR: 481 self._sockpairR.close() 482 self._sockpairR = None 483 if self._sockpairW: 484 self._sockpairW.close() 485 self._sockpairW = None 486 487 self.__init__(client_id, clean_session, userdata) 488 489 def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=cert_reqs, tls_version=tls_version, ciphers=None): 490 """Configure network encryption and authentication options. Enables SSL/TLS support. 491 ca_certs : a string path to the Certificate Authority certificate files 492 that are to be treated as trusted by this client. If this is the only 493 option given then the client will operate in a similar manner to a web 494 browser. That is to say it will require the broker to have a 495 certificate signed by the Certificate Authorities in ca_certs and will 496 communicate using TLS v1, but will not attempt any form of 497 authentication. This provides basic network encryption but may not be 498 sufficient depending on how the broker is configured. 499 certfile and keyfile are strings pointing to the PEM encoded client 500 certificate and private keys respectively. If these arguments are not 501 None then they will be used as client information for TLS based 502 authentication. Support for this feature is broker dependent. Note 503 that if either of these files in encrypted and needs a password to 504 decrypt it, Python will ask for the password at the command line. It is 505 not currently possible to define a callback to provide the password. 506 cert_reqs allows the certificate requirements that the client imposes 507 on the broker to be changed. By default this is ssl.CERT_REQUIRED, 508 which means that the broker must provide a certificate. See the ssl 509 pydoc for more information on this parameter. 510 tls_version allows the version of the SSL/TLS protocol used to be 511 specified. By default TLS v1 is used. Previous versions (all versions 512 beginning with SSL) are possible but not recommended due to possible 513 security problems. 514 ciphers is a string specifying which encryption ciphers are allowable 515 for this connection, or None to use the defaults. See the ssl pydoc for 516 more information. 517 Must be called before connect() or connect_async().""" 518 if HAVE_SSL is False: 519 raise ValueError('This platform has no SSL/TLS.') 520 521 if sys.version < '2.7': 522 raise ValueError('Python 2.7 is the minimum supported version for TLS.') 523 524 if ca_certs is None: 525 raise ValueError('ca_certs must not be None.') 526 527 try: 528 f = open(ca_certs, "r") 529 except IOError as err: 530 raise IOError(ca_certs+": "+err.strerror) 531 else: 532 f.close() 533 if certfile is not None: 534 try: 535 f = open(certfile, "r") 536 except IOError as err: 537 raise IOError(certfile+": "+err.strerror) 538 else: 539 f.close() 540 if keyfile is not None: 541 try: 542 f = open(keyfile, "r") 543 except IOError as err: 544 raise IOError(keyfile+": "+err.strerror) 545 else: 546 f.close() 547 548 self._tls_ca_certs = ca_certs 549 self._tls_certfile = certfile 550 self._tls_keyfile = keyfile 551 self._tls_cert_reqs = cert_reqs 552 self._tls_version = tls_version 553 self._tls_ciphers = ciphers 554 555 def tls_insecure_set(self, value): 556 """Configure verification of the server hostname in the server certificate. 557 If value is set to true, it is impossible to guarantee that the host 558 you are connecting to is not impersonating your server. This can be 559 useful in initial server testing, but makes it possible for a malicious 560 third party to impersonate your server through DNS spoofing, for 561 example. 562 Do not use this function in a real system. Setting value to true means 563 there is no point using encryption. 564 Must be called before connect().""" 565 if HAVE_SSL is False: 566 raise ValueError('This platform has no SSL/TLS.') 567 568 self._tls_insecure = value 569 570 def connect(self, host, port=1883, keepalive=60, bind_address=""): 571 """Connect to a remote broker. 572 host is the hostname or IP address of the remote broker. 573 port is the network port of the server host to connect to. Defaults to 574 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you 575 are using tls_set() the port may need providing. 576 keepalive: Maximum period in seconds between communications with the 577 broker. If no other messages are being exchanged, this controls the 578 rate at which the client will send ping messages to the broker. 579 """ 580 self.connect_async(host, port, keepalive, bind_address) 581 return self.reconnect() 582 583 def connect_srv(self, domain=None, keepalive=60, bind_address=""): 584 """Connect to a remote broker. 585 domain is the DNS domain to search for SRV records; if None, 586 try to determine local domain name. 587 keepalive and bind_address are as for connect() 588 """ 589 590 if HAVE_DNS is False: 591 raise ValueError('No DNS resolver library found.') 592 593 if domain is None: 594 domain = socket.getfqdn() 595 domain = domain[domain.find('.') + 1:] 596 597 try: 598 rr = '_mqtt._tcp.%s' % domain 599 if self._ssl is not None: 600 # IANA specifies secure-mqtt (not mqtts) for port 8883 601 rr = '_secure-mqtt._tcp.%s' % domain 602 answers = [] 603 for answer in dns.resolver.query(rr, dns.rdatatype.SRV): 604 addr = answer.target.to_text()[:-1] 605 answers.append((addr, answer.port, answer.priority, answer.weight)) 606 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers): 607 raise ValueError("No answer/NXDOMAIN for SRV in %s" % (domain)) 608 609 # FIXME: doesn't account for weight 610 for answer in answers: 611 host, port, prio, weight = answer 612 613 try: 614 return self.connect(host, port, keepalive, bind_address) 615 except: 616 pass 617 618 raise ValueError("No SRV hosts responded") 619 620 def connect_async(self, host, port=1883, keepalive=60, bind_address=""): 621 """Connect to a remote broker asynchronously. This is a non-blocking 622 connect call that can be used with loop_start() to provide very quick 623 start. 624 host is the hostname or IP address of the remote broker. 625 port is the network port of the server host to connect to. Defaults to 626 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you 627 are using tls_set() the port may need providing. 628 keepalive: Maximum period in seconds between communications with the 629 broker. If no other messages are being exchanged, this controls the 630 rate at which the client will send ping messages to the broker. 631 """ 632 if host is None or len(host) == 0: 633 raise ValueError('Invalid host.') 634 if port <= 0: 635 raise ValueError('Invalid port number.') 636 if keepalive < 0: 637 raise ValueError('Keepalive must be >=0.') 638 if bind_address != "" and bind_address is not None: 639 if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2): 640 raise ValueError('bind_address requires Python 2.7 or 3.2.') 641 642 self._host = host 643 self._port = port 644 self._keepalive = keepalive 645 self._bind_address = bind_address 646 647 self._state_mutex.acquire() 648 self._state = mqtt_cs_connect_async 649 self._state_mutex.release() 650 651 def reconnect(self): 652 """Reconnect the client after a disconnect. Can only be called after 653 connect()/connect_async().""" 654 if len(self._host) == 0: 655 raise ValueError('Invalid host.') 656 if self._port <= 0: 657 raise ValueError('Invalid port number.') 658 659 self._in_packet = { 660 "command": 0, 661 "have_remaining": 0, 662 "remaining_count": [], 663 "remaining_mult": 1, 664 "remaining_length": 0, 665 "packet": b"", 666 "to_process": 0, 667 "pos": 0} 668 669 self._out_packet_mutex.acquire() 670 self._out_packet = [] 671 self._out_packet_mutex.release() 672 673 self._current_out_packet_mutex.acquire() 674 self._current_out_packet = None 675 self._current_out_packet_mutex.release() 676 677 self._msgtime_mutex.acquire() 678 self._last_msg_in = time.time() 679 self._last_msg_out = time.time() 680 self._msgtime_mutex.release() 681 682 self._ping_t = 0 683 self._state_mutex.acquire() 684 self._state = mqtt_cs_new 685 self._state_mutex.release() 686 if self._ssl: 687 self._ssl.close() 688 self._ssl = None 689 self._sock = None 690 elif self._sock: 691 self._sock.close() 692 self._sock = None 693 694 # Put messages in progress in a valid state. 695 self._messages_reconnect_reset() 696 697 try: 698 if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2): 699 sock = socket.create_connection((self._host, self._port)) 700 else: 701 sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0)) 702 except socket.error as err: 703 if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN: 704 raise 705 706 if self._tls_ca_certs is not None: 707 self._ssl = ssl.wrap_socket( 708 sock, 709 certfile=self._tls_certfile, 710 keyfile=self._tls_keyfile, 711 ca_certs=self._tls_ca_certs, 712 cert_reqs=self._tls_cert_reqs, 713 ssl_version=self._tls_version, 714 ciphers=self._tls_ciphers) 715 716 if self._tls_insecure is False: 717 if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and sys.version_info[1] < 2): 718 self._tls_match_hostname() 719 else: 720 ssl.match_hostname(self._ssl.getpeercert(), self._host) 721 722 self._sock = sock 723 self._sock.setblocking(0) 724 725 return self._send_connect(self._keepalive, self._clean_session) 726 727 def loop(self, timeout=1.0, max_packets=1): 728 """Process network events. 729 This function must be called regularly to ensure communication with the 730 broker is carried out. It calls select() on the network socket to wait 731 for network events. If incoming data is present it will then be 732 processed. Outgoing commands, from e.g. publish(), are normally sent 733 immediately that their function is called, but this is not always 734 possible. loop() will also attempt to send any remaining outgoing 735 messages, which also includes commands that are part of the flow for 736 messages with QoS>0. 737 timeout: The time in seconds to wait for incoming/outgoing network 738 traffic before timing out and returning. 739 max_packets: Not currently used. 740 Returns MQTT_ERR_SUCCESS on success. 741 Returns >0 on error. 742 A ValueError will be raised if timeout < 0""" 743 if timeout < 0.0: 744 raise ValueError('Invalid timeout.') 745 746 self._current_out_packet_mutex.acquire() 747 self._out_packet_mutex.acquire() 748 if self._current_out_packet is None and len(self._out_packet) > 0: 749 self._current_out_packet = self._out_packet.pop(0) 750 751 if self._current_out_packet: 752 wlist = [self.socket()] 753 else: 754 wlist = [] 755 self._out_packet_mutex.release() 756 self._current_out_packet_mutex.release() 757 758 # sockpairR is used to break out of select() before the timeout, on a 759 # call to publish() etc. 760 rlist = [self.socket(), self._sockpairR] 761 try: 762 socklist = select.select(rlist, wlist, [], timeout) 763 except TypeError: 764 # Socket isn't correct type, in likelihood connection is lost 765 return MQTT_ERR_CONN_LOST 766 except ValueError: 767 # Can occur if we just reconnected but rlist/wlist contain a -1 for 768 # some reason. 769 return MQTT_ERR_CONN_LOST 770 except: 771 return MQTT_ERR_UNKNOWN 772 773 if self.socket() in socklist[0]: 774 rc = self.loop_read(max_packets) 775 if rc or (self._ssl is None and self._sock is None): 776 return rc 777 778 if self._sockpairR in socklist[0]: 779 # Stimulate output write even though we didn't ask for it, because 780 # at that point the publish or other command wasn't present. 781 socklist[1].insert(0, self.socket()) 782 # Clear sockpairR - only ever a single byte written. 783 try: 784 self._sockpairR.recv(1) 785 except socket.error as err: 786 if err.errno != EAGAIN: 787 raise 788 789 if self.socket() in socklist[1]: 790 rc = self.loop_write(max_packets) 791 if rc or (self._ssl is None and self._sock is None): 792 return rc 793 794 return self.loop_misc() 795 796 def publish(self, topic, payload=None, qos=0, retain=False): 797 """Publish a message on a topic. 798 This causes a message to be sent to the broker and subsequently from 799 the broker to any clients subscribing to matching topics. 800 topic: The topic that the message should be published on. 801 payload: The actual message to send. If not given, or set to None a 802 zero length message will be used. Passing an int or float will result 803 in the payload being converted to a string representing that number. If 804 you wish to send a true int/float, use struct.pack() to create the 805 payload you require. 806 qos: The quality of service level to use. 807 retain: If set to true, the message will be set as the "last known 808 good"/retained message for the topic. 809 Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to 810 indicate success or MQTT_ERR_NO_CONN if the client is not currently 811 connected. mid is the message ID for the publish request. The mid 812 value can be used to track the publish request by checking against the 813 mid argument in the on_publish() callback if it is defined. 814 A ValueError will be raised if topic is None, has zero length or is 815 invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if 816 the length of the payload is greater than 268435455 bytes.""" 817 if topic is None or len(topic) == 0: 818 raise ValueError('Invalid topic.') 819 if qos<0 or qos>2: 820 raise ValueError('Invalid QoS level.') 821 if isinstance(payload, str) or isinstance(payload, bytearray): 822 local_payload = payload 823 elif sys.version_info[0] < 3 and isinstance(payload, unicode): 824 local_payload = payload 825 elif isinstance(payload, int) or isinstance(payload, float): 826 local_payload = str(payload) 827 elif payload is None: 828 local_payload = None 829 else: 830 raise TypeError('payload must be a string, bytearray, int, float or None.') 831 832 if local_payload is not None and len(local_payload) > 268435455: 833 raise ValueError('Payload too large.') 834 835 if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS: 836 raise ValueError('Publish topic cannot contain wildcards.') 837 838 local_mid = self._mid_generate() 839 840 if qos == 0: 841 rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False) 842 return (rc, local_mid) 843 else: 844 message = MQTTMessage() 845 message.timestamp = time.time() 846 847 message.mid = local_mid 848 message.topic = topic 849 if local_payload is None or len(local_payload) == 0: 850 message.payload = None 851 else: 852 message.payload = local_payload 853 854 message.qos = qos 855 message.retain = retain 856 message.dup = False 857 858 self._out_message_mutex.acquire() 859 self._out_messages.append(message) 860 if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: 861 self._inflight_messages = self._inflight_messages+1 862 if qos == 1: 863 message.state = mqtt_ms_wait_for_puback 864 elif qos == 2: 865 message.state = mqtt_ms_wait_for_pubrec 866 self._out_message_mutex.release() 867 868 rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup) 869 870 # remove from inflight messages so it will be send after a connection is made 871 if rc is MQTT_ERR_NO_CONN: 872 with self._out_message_mutex: 873 self._inflight_messages -= 1 874 message.state = mqtt_ms_publish 875 876 return (rc, local_mid) 877 else: 878 message.state = mqtt_ms_queued; 879 self._out_message_mutex.release() 880 return (MQTT_ERR_SUCCESS, local_mid) 881 882 def username_pw_set(self, username, password=None): 883 """Set a username and optionally a password for broker authentication. 884 Must be called before connect() to have any effect. 885 Requires a broker that supports MQTT v3.1. 886 username: The username to authenticate with. Need have no relationship to the client id. 887 password: The password to authenticate with. Optional, set to None if not required. 888 """ 889 self._username = username.encode('utf-8') 890 self._password = password 891 892 def disconnect(self): 893 """Disconnect a connected client from the broker.""" 894 self._state_mutex.acquire() 895 self._state = mqtt_cs_disconnecting 896 self._state_mutex.release() 897 898 if self._sock is None and self._ssl is None: 899 return MQTT_ERR_NO_CONN 900 901 return self._send_disconnect() 902 903 def subscribe(self, topic, qos=0): 904 """Subscribe the client to one or more topics. 905 This function may be called in three different ways: 906 Simple string and integer 907 ------------------------- 908 e.g. subscribe("my/topic", 2) 909 topic: A string specifying the subscription topic to subscribe to. 910 qos: The desired quality of service level for the subscription. 911 Defaults to 0. 912 String and integer tuple 913 ------------------------ 914 e.g. subscribe(("my/topic", 1)) 915 topic: A tuple of (topic, qos). Both topic and qos must be present in 916 the tuple. 917 qos: Not used. 918 List of string and integer tuples 919 ------------------------ 920 e.g. subscribe([("my/topic", 0), ("another/topic", 2)]) 921 This allows multiple topic subscriptions in a single SUBSCRIPTION 922 command, which is more efficient than using multiple calls to 923 subscribe(). 924 topic: A list of tuple of format (topic, qos). Both topic and qos must 925 be present in all of the tuples. 926 qos: Not used. 927 The function returns a tuple (result, mid), where result is 928 MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the 929 client is not currently connected. mid is the message ID for the 930 subscribe request. The mid value can be used to track the subscribe 931 request by checking against the mid argument in the on_subscribe() 932 callback if it is defined. 933 Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has 934 zero string length, or if topic is not a string, tuple or list. 935 """ 936 topic_qos_list = None 937 if isinstance(topic, str): 938 if qos<0 or qos>2: 939 raise ValueError('Invalid QoS level.') 940 if topic is None or len(topic) == 0: 941 raise ValueError('Invalid topic.') 942 topic_qos_list = [(topic.encode('utf-8'), qos)] 943 elif isinstance(topic, tuple): 944 if topic[1]<0 or topic[1]>2: 945 raise ValueError('Invalid QoS level.') 946 if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str): 947 raise ValueError('Invalid topic.') 948 topic_qos_list = [(topic[0].encode('utf-8'), topic[1])] 949 elif isinstance(topic, list): 950 topic_qos_list = [] 951 for t in topic: 952 if t[1]<0 or t[1]>2: 953 raise ValueError('Invalid QoS level.') 954 if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str): 955 raise ValueError('Invalid topic.') 956 topic_qos_list.append((t[0].encode('utf-8'), t[1])) 957 958 if topic_qos_list is None: 959 raise ValueError("No topic specified, or incorrect topic type.") 960 961 if self._sock is None and self._ssl is None: 962 return (MQTT_ERR_NO_CONN, None) 963 964 return self._send_subscribe(False, topic_qos_list) 965 966 def unsubscribe(self, topic): 967 """Unsubscribe the client from one or more topics. 968 topic: A single string, or list of strings that are the subscription 969 topics to unsubscribe from. 970 Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS 971 to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not 972 currently connected. 973 mid is the message ID for the unsubscribe request. The mid value can be 974 used to track the unsubscribe request by checking against the mid 975 argument in the on_unsubscribe() callback if it is defined. 976 Raises a ValueError if topic is None or has zero string length, or is 977 not a string or list. 978 """ 979 topic_list = None 980 if topic is None: 981 raise ValueError('Invalid topic.') 982 if isinstance(topic, str): 983 if len(topic) == 0: 984 raise ValueError('Invalid topic.') 985 topic_list = [topic.encode('utf-8')] 986 elif isinstance(topic, list): 987 topic_list = [] 988 for t in topic: 989 if len(t) == 0 or not isinstance(t, str): 990 raise ValueError('Invalid topic.') 991 topic_list.append(t.encode('utf-8')) 992 993 if topic_list is None: 994 raise ValueError("No topic specified, or incorrect topic type.") 995 996 if self._sock is None and self._ssl is None: 997 return (MQTT_ERR_NO_CONN, None) 998 999 return self._send_unsubscribe(False, topic_list) 1000 1001 def loop_read(self, max_packets=1): 1002 """Process read network events. Use in place of calling loop() if you 1003 wish to handle your client reads as part of your own application. 1004 Use socket() to obtain the client socket to call select() or equivalent 1005 on. 1006 Do not use if you are using the threaded interface loop_start().""" 1007 if self._sock is None and self._ssl is None: 1008 return MQTT_ERR_NO_CONN 1009 1010 max_packets = len(self._out_messages) + len(self._in_messages) 1011 if max_packets < 1: 1012 max_packets = 1 1013 1014 for i in range(0, max_packets): 1015 rc = self._packet_read() 1016 if rc > 0: 1017 return self._loop_rc_handle(rc) 1018 elif rc == MQTT_ERR_AGAIN: 1019 return MQTT_ERR_SUCCESS 1020 return MQTT_ERR_SUCCESS 1021 1022 def loop_write(self, max_packets=1): 1023 """Process read network events. Use in place of calling loop() if you 1024 wish to handle your client reads as part of your own application. 1025 Use socket() to obtain the client socket to call select() or equivalent 1026 on. 1027 Use want_write() to determine if there is data waiting to be written. 1028 Do not use if you are using the threaded interface loop_start().""" 1029 if self._sock is None and self._ssl is None: 1030 return MQTT_ERR_NO_CONN 1031 1032 max_packets = len(self._out_packet) + 1 1033 if max_packets < 1: 1034 max_packets = 1 1035 1036 for i in range(0, max_packets): 1037 rc = self._packet_write() 1038 if rc > 0: 1039 return self._loop_rc_handle(rc) 1040 elif rc == MQTT_ERR_AGAIN: 1041 return MQTT_ERR_SUCCESS 1042 return MQTT_ERR_SUCCESS 1043 1044 def want_write(self): 1045 """Call to determine if there is network data waiting to be written. 1046 Useful if you are calling select() yourself rather than using loop(). 1047 """ 1048 if self._current_out_packet or len(self._out_packet) > 0: 1049 return True 1050 else: 1051 return False 1052 1053 def loop_misc(self): 1054 """Process miscellaneous network events. Use in place of calling loop() if you 1055 wish to call select() or equivalent on. 1056 Do not use if you are using the threaded interface loop_start().""" 1057 if self._sock is None and self._ssl is None: 1058 return MQTT_ERR_NO_CONN 1059 1060 now = time.time() 1061 self._check_keepalive() 1062 if self._last_retry_check+1 < now: 1063 # Only check once a second at most 1064 self._message_retry_check() 1065 self._last_retry_check = now 1066 1067 if self._ping_t > 0 and now - self._ping_t >= self._keepalive: 1068 # client->ping_t != 0 means we are waiting for a pingresp. 1069 # This hasn't happened in the keepalive time so we should disconnect. 1070 if self._ssl: 1071 self._ssl.close() 1072 self._ssl = None 1073 elif self._sock: 1074 self._sock.close() 1075 self._sock = None 1076 1077 self._callback_mutex.acquire() 1078 if self._state == mqtt_cs_disconnecting: 1079 rc = MQTT_ERR_SUCCESS 1080 else: 1081 rc = 1 1082 if self.on_disconnect: 1083 self._in_callback = True 1084 self.on_disconnect(self, self._userdata, rc) 1085 self._in_callback = False 1086 self._callback_mutex.release() 1087 return MQTT_ERR_CONN_LOST 1088 1089 return MQTT_ERR_SUCCESS 1090 1091 def max_inflight_messages_set(self, inflight): 1092 """Set the maximum number of messages with QoS>0 that can be part way 1093 through their network flow at once. Defaults to 20.""" 1094 if inflight < 0: 1095 raise ValueError('Invalid inflight.') 1096 self._max_inflight_messages = inflight 1097 1098 def message_retry_set(self, retry): 1099 """Set the timeout in seconds before a message with QoS>0 is retried. 1100 20 seconds by default.""" 1101 if retry < 0: 1102 raise ValueError('Invalid retry.') 1103 1104 self._message_retry = retry 1105 1106 def user_data_set(self, userdata): 1107 """Set the user data variable passed to callbacks. May be any data type.""" 1108 self._userdata = userdata 1109 1110 def will_set(self, topic, payload=None, qos=0, retain=False): 1111 """Set a Will to be sent by the broker in case the client disconnects unexpectedly. 1112 This must be called before connect() to have any effect. 1113 topic: The topic that the will message should be published on. 1114 payload: The message to send as a will. If not given, or set to None a 1115 zero length message will be used as the will. Passing an int or float 1116 will result in the payload being converted to a string representing 1117 that number. If you wish to send a true int/float, use struct.pack() to 1118 create the payload you require. 1119 qos: The quality of service level to use for the will. 1120 retain: If set to true, the will message will be set as the "last known 1121 good"/retained message for the topic. 1122 Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has 1123 zero string length. 1124 """ 1125 if topic is None or len(topic) == 0: 1126 raise ValueError('Invalid topic.') 1127 if qos<0 or qos>2: 1128 raise ValueError('Invalid QoS level.') 1129 if isinstance(payload, str): 1130 self._will_payload = payload.encode('utf-8') 1131 elif isinstance(payload, bytearray): 1132 self._will_payload = payload 1133 elif isinstance(payload, int) or isinstance(payload, float): 1134 self._will_payload = str(payload) 1135 elif payload is None: 1136 self._will_payload = None 1137 else: 1138 raise TypeError('payload must be a string, bytearray, int, float or None.') 1139 1140 self._will = True 1141 self._will_topic = topic.encode('utf-8') 1142 self._will_qos = qos 1143 self._will_retain = retain 1144 1145 def will_clear(self): 1146 """ Removes a will that was previously configured with will_set(). 1147 Must be called before connect() to have any effect.""" 1148 self._will = False 1149 self._will_topic = "" 1150 self._will_payload = None 1151 self._will_qos = 0 1152 self._will_retain = False 1153 1154 def socket(self): 1155 """Return the socket or ssl object for this client.""" 1156 if self._ssl: 1157 return self._ssl 1158 else: 1159 return self._sock 1160 1161 def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False): 1162 """This function call loop() for you in an infinite blocking loop. It 1163 is useful for the case where you only want to run the MQTT client loop 1164 in your program. 1165 loop_forever() will handle reconnecting for you. If you call 1166 disconnect() in a callback it will return. 1167 timeout: The time in seconds to wait for incoming/outgoing network 1168 traffic before timing out and returning. 1169 max_packets: Not currently used. 1170 retry_first_connection: Should the first connection attempt be retried on failure. 1171 Raises socket.error on first connection failures unless retry_first_connection=True 1172 """ 1173 1174 run = True 1175 1176 while run: 1177 if self._state == mqtt_cs_connect_async: 1178 try: 1179 self.reconnect() 1180 except socket.error: 1181 if not retry_first_connection: 1182 raise 1183 self._easy_log(MQTT_LOG_DEBUG, "Connection failed, retrying") 1184 time.sleep(1) 1185 else: 1186 break 1187 1188 while run: 1189 rc = MQTT_ERR_SUCCESS 1190 while rc == MQTT_ERR_SUCCESS: 1191 rc = self.loop(timeout, max_packets) 1192 # We don't need to worry about locking here, because we've 1193 # either called loop_forever() when in single threaded mode, or 1194 # in multi threaded mode when loop_stop() has been called and 1195 # so no other threads can access _current_out_packet, 1196 # _out_packet or _messages. 1197 if (self._thread_terminate is True 1198 and self._current_out_packet is None 1199 and len(self._out_packet) == 0 1200 and len(self._out_messages) == 0): 1201 1202 rc = 1 1203 run = False 1204 1205 self._state_mutex.acquire() 1206 if self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True: 1207 run = False 1208 self._state_mutex.release() 1209 else: 1210 self._state_mutex.release() 1211 time.sleep(1) 1212 1213 self._state_mutex.acquire() 1214 if self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True: 1215 run = False 1216 self._state_mutex.release() 1217 else: 1218 self._state_mutex.release() 1219 try: 1220 self.reconnect() 1221 except socket.error as err: 1222 pass 1223 1224 return rc 1225 1226 def loop_start(self): 1227 """This is part of the threaded client interface. Call this once to 1228 start a new thread to process network traffic. This provides an 1229 alternative to repeatedly calling loop() yourself. 1230 """ 1231 if self._thread is not None: 1232 return MQTT_ERR_INVAL 1233 1234 self._thread_terminate = False 1235 self._thread = threading.Thread(target=self._thread_main) 1236 self._thread.daemon = True 1237 self._thread.start() 1238 1239 def loop_stop(self, force=False): 1240 """This is part of the threaded client interface. Call this once to 1241 stop the network thread previously created with loop_start(). This call 1242 will block until the network thread finishes. 1243 The force parameter is currently ignored. 1244 """ 1245 if self._thread is None: 1246 return MQTT_ERR_INVAL 1247 1248 self._thread_terminate = True 1249 self._thread.join() 1250 self._thread = None 1251 1252 def message_callback_add(self, sub, callback): 1253 """Register a message callback for a specific topic. 1254 Messages that match 'sub' will be passed to 'callback'. Any 1255 non-matching messages will be passed to the default on_message 1256 callback. 1257 1258 Call multiple times with different 'sub' to define multiple topic 1259 specific callbacks. 1260 1261 Topic specific callbacks may be removed with 1262 message_callback_remove().""" 1263 if callback is None or sub is None: 1264 raise ValueError("sub and callback must both be defined.") 1265 1266 self._callback_mutex.acquire() 1267 1268 for i in range(0, len(self.on_message_filtered)): 1269 if self.on_message_filtered[i][0] == sub: 1270 self.on_message_filtered[i] = (sub, callback) 1271 self._callback_mutex.release() 1272 return 1273 1274 self.on_message_filtered.append((sub, callback)) 1275 self._callback_mutex.release() 1276 1277 def message_callback_remove(self, sub): 1278 """Remove a message callback previously registered with 1279 message_callback_add().""" 1280 if sub is None: 1281 raise ValueError("sub must defined.") 1282 1283 self._callback_mutex.acquire() 1284 for i in range(0, len(self.on_message_filtered)): 1285 if self.on_message_filtered[i][0] == sub: 1286 self.on_message_filtered.pop(i) 1287 self._callback_mutex.release() 1288 return 1289 self._callback_mutex.release() 1290 1291 # ============================================================ 1292 # Private functions 1293 # ============================================================ 1294 1295 def _loop_rc_handle(self, rc): 1296 if rc: 1297 if self._ssl: 1298 self._ssl.close() 1299 self._ssl = None 1300 elif self._sock: 1301 self._sock.close() 1302 self._sock = None 1303 1304 self._state_mutex.acquire() 1305 if self._state == mqtt_cs_disconnecting: 1306 rc = MQTT_ERR_SUCCESS 1307 self._state_mutex.release() 1308 self._callback_mutex.acquire() 1309 if self.on_disconnect: 1310 self._in_callback = True 1311 self.on_disconnect(self, self._userdata, rc) 1312 self._in_callback = False 1313 1314 self._callback_mutex.release() 1315 return rc 1316 1317 def _packet_read(self): 1318 # This gets called if pselect() indicates that there is network data 1319 # available - ie. at least one byte. What we do depends on what data we 1320 # already have. 1321 # If we've not got a command, attempt to read one and save it. This should 1322 # always work because it's only a single byte. 1323 # Then try to read the remaining length. This may fail because it is may 1324 # be more than one byte - will need to save data pending next read if it 1325 # does fail. 1326 # Then try to read the remaining payload, where 'payload' here means the 1327 # combined variable header and actual payload. This is the most likely to 1328 # fail due to longer length, so save current data and current position. 1329 # After all data is read, send to _mqtt_handle_packet() to deal with. 1330 # Finally, free the memory and reset everything to starting conditions. 1331 if self._in_packet['command'] == 0: 1332 try: 1333 if self._ssl: 1334 command = self._ssl.read(1) 1335 else: 1336 command = self._sock.recv(1) 1337 except socket.error as err: 1338 if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE): 1339 return MQTT_ERR_AGAIN 1340 if err.errno == EAGAIN: 1341 return MQTT_ERR_AGAIN 1342 print(err) 1343 return 1 1344 else: 1345 if len(command) == 0: 1346 return 1 1347 command = struct.unpack("!B", command) 1348 self._in_packet['command'] = command[0] 1349 1350 if self._in_packet['have_remaining'] == 0: 1351 # Read remaining 1352 # Algorithm for decoding taken from pseudo code at 1353 # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm 1354 while True: 1355 try: 1356 if self._ssl: 1357 byte = self._ssl.read(1) 1358 else: 1359 byte = self._sock.recv(1) 1360 except socket.error as err: 1361 if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE): 1362 return MQTT_ERR_AGAIN 1363 if err.errno == EAGAIN: 1364 return MQTT_ERR_AGAIN 1365 print(err) 1366 return 1 1367 else: 1368 byte = struct.unpack("!B", byte) 1369 byte = byte[0] 1370 self._in_packet['remaining_count'].append(byte) 1371 # Max 4 bytes length for remaining length as defined by protocol. 1372 # Anything more likely means a broken/malicious client. 1373 if len(self._in_packet['remaining_count']) > 4: 1374 return MQTT_ERR_PROTOCOL 1375 1376 self._in_packet['remaining_length'] = self._in_packet['remaining_length'] + (byte & 127)*self._in_packet['remaining_mult'] 1377 self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128 1378 1379 if (byte & 128) == 0: 1380 break 1381 1382 self._in_packet['have_remaining'] = 1 1383 self._in_packet['to_process'] = self._in_packet['remaining_length'] 1384 1385 while self._in_packet['to_process'] > 0: 1386 try: 1387 if self._ssl: 1388 data = self._ssl.read(self._in_packet['to_process']) 1389 else: 1390 data = self._sock.recv(self._in_packet['to_process']) 1391 except socket.error as err: 1392 if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE): 1393 return MQTT_ERR_AGAIN 1394 if err.errno == EAGAIN: 1395 return MQTT_ERR_AGAIN 1396 print(err) 1397 return 1 1398 else: 1399 self._in_packet['to_process'] = self._in_packet['to_process'] - len(data) 1400 self._in_packet['packet'] = self._in_packet['packet'] + data 1401 1402 # All data for this packet is read. 1403 self._in_packet['pos'] = 0 1404 rc = self._packet_handle() 1405 1406 # Free data and reset values 1407 self._in_packet = dict( 1408 command=0, 1409 have_remaining=0, 1410 remaining_count=[], 1411 remaining_mult=1, 1412 remaining_length=0, 1413 packet=b"", 1414 to_process=0, 1415 pos=0) 1416 1417 self._msgtime_mutex.acquire() 1418 self._last_msg_in = time.time() 1419 self._msgtime_mutex.release() 1420 return rc 1421 1422 def _packet_write(self): 1423 self._current_out_packet_mutex.acquire() 1424 1425 while self._current_out_packet: 1426 packet = self._current_out_packet 1427 1428 try: 1429 if self._ssl: 1430 write_length = self._ssl.write(packet['packet'][packet['pos']:]) 1431 else: 1432 write_length = self._sock.send(packet['packet'][packet['pos']:]) 1433 except AttributeError: 1434 self._current_out_packet_mutex.release() 1435 return MQTT_ERR_SUCCESS 1436 except socket.error as err: 1437 self._current_out_packet_mutex.release() 1438 if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE): 1439 return MQTT_ERR_AGAIN 1440 if err.errno == EAGAIN: 1441 return MQTT_ERR_AGAIN 1442 print(err) 1443 return 1 1444 1445 if write_length > 0: 1446 packet['to_process'] = packet['to_process'] - write_length 1447 packet['pos'] = packet['pos'] + write_length 1448 1449 if packet['to_process'] == 0: 1450 if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0: 1451 self._callback_mutex.acquire() 1452 if self.on_publish: 1453 self._in_callback = True 1454 self.on_publish(self, self._userdata, packet['mid']) 1455 self._in_callback = False 1456 1457 self._callback_mutex.release() 1458 1459 if (packet['command'] & 0xF0) == DISCONNECT: 1460 self._current_out_packet_mutex.release() 1461 1462 self._msgtime_mutex.acquire() 1463 self._last_msg_out = time.time() 1464 self._msgtime_mutex.release() 1465 1466 self._callback_mutex.acquire() 1467 if self.on_disconnect: 1468 self._in_callback = True 1469 self.on_disconnect(self, self._userdata, 0) 1470 self._in_callback = False 1471 self._callback_mutex.release() 1472 1473 if self._ssl: 1474 self._ssl.close() 1475 self._ssl = None 1476 if self._sock: 1477 self._sock.close() 1478 self._sock = None 1479 return MQTT_ERR_SUCCESS 1480 1481 self._out_packet_mutex.acquire() 1482 if len(self._out_packet) > 0: 1483 self._current_out_packet = self._out_packet.pop(0) 1484 else: 1485 self._current_out_packet = None 1486 self._out_packet_mutex.release() 1487 else: 1488 pass # FIXME 1489 1490 self._current_out_packet_mutex.release() 1491 1492 self._msgtime_mutex.acquire() 1493 self._last_msg_out = time.time() 1494 self._msgtime_mutex.release() 1495 1496 return MQTT_ERR_SUCCESS 1497 1498 def _easy_log(self, level, buf): 1499 if self.on_log: 1500 self.on_log(self, self._userdata, level, buf) 1501 1502 def _check_keepalive(self): 1503 now = time.time() 1504 self._msgtime_mutex.acquire() 1505 last_msg_out = self._last_msg_out 1506 last_msg_in = self._last_msg_in 1507 self._msgtime_mutex.release() 1508 if (self._sock is not None or self._ssl is not None) and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive): 1509 if self._state == mqtt_cs_connected and self._ping_t == 0: 1510 self._send_pingreq() 1511 self._msgtime_mutex.acquire() 1512 self._last_msg_out = now 1513 self._last_msg_in = now 1514 self._msgtime_mutex.release() 1515 else: 1516 if self._ssl: 1517 self._ssl.close() 1518 self._ssl = None 1519 elif self._sock: 1520 self._sock.close() 1521 self._sock = None 1522 1523 if self._state == mqtt_cs_disconnecting: 1524 rc = MQTT_ERR_SUCCESS 1525 else: 1526 rc = 1 1527 self._callback_mutex.acquire() 1528 if self.on_disconnect: 1529 self._in_callback = True 1530 self.on_disconnect(self, self._userdata, rc) 1531 self._in_callback = False 1532 self._callback_mutex.release() 1533 1534 def _mid_generate(self): 1535 self._last_mid = self._last_mid + 1 1536 if self._last_mid == 65536: 1537 self._last_mid = 1 1538 return self._last_mid 1539 1540 def _topic_wildcard_len_check(self, topic): 1541 # Search for + or # in a topic. Return MQTT_ERR_INVAL if found. 1542 # Also returns MQTT_ERR_INVAL if the topic string is too long. 1543 # Returns MQTT_ERR_SUCCESS if everything is fine. 1544 if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535: 1545 return MQTT_ERR_INVAL 1546 else: 1547 return MQTT_ERR_SUCCESS 1548 1549 def _send_pingreq(self): 1550 self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ") 1551 rc = self._send_simple_command(PINGREQ) 1552 if rc == MQTT_ERR_SUCCESS: 1553 self._ping_t = time.time() 1554 return rc 1555 1556 def _send_pingresp(self): 1557 self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP") 1558 return self._send_simple_command(PINGRESP) 1559 1560 def _send_puback(self, mid): 1561 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: "+str(mid)+")") 1562 return self._send_command_with_mid(PUBACK, mid, False) 1563 1564 def _send_pubcomp(self, mid): 1565 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: "+str(mid)+")") 1566 return self._send_command_with_mid(PUBCOMP, mid, False) 1567 1568 def _pack_remaining_length(self, packet, remaining_length): 1569 remaining_bytes = [] 1570 while True: 1571 byte = remaining_length % 128 1572 remaining_length = remaining_length // 128 1573 # If there are more digits to encode, set the top bit of this digit 1574 if remaining_length > 0: 1575 byte = byte | 0x80 1576 1577 remaining_bytes.append(byte) 1578 packet.extend(struct.pack("!B", byte)) 1579 if remaining_length == 0: 1580 # FIXME - this doesn't deal with incorrectly large payloads 1581 return packet 1582 1583 def _pack_str16(self, packet, data): 1584 if sys.version_info[0] < 3: 1585 if isinstance(data, bytearray): 1586 packet.extend(struct.pack("!H", len(data))) 1587 packet.extend(data) 1588 elif isinstance(data, str): 1589 udata = data.encode('utf-8') 1590 pack_format = "!H" + str(len(udata)) + "s" 1591 packet.extend(struct.pack(pack_format, len(udata), udata)) 1592 elif isinstance(data, unicode): 1593 udata = data.encode('utf-8') 1594 pack_format = "!H" + str(len(udata)) + "s" 1595 packet.extend(struct.pack(pack_format, len(udata), udata)) 1596 else: 1597 raise TypeError 1598 else: 1599 if isinstance(data, bytearray) or isinstance(data, bytes): 1600 packet.extend(struct.pack("!H", len(data))) 1601 packet.extend(data) 1602 elif isinstance(data, str): 1603 udata = data.encode('utf-8') 1604 pack_format = "!H" + str(len(udata)) + "s" 1605 packet.extend(struct.pack(pack_format, len(udata), udata)) 1606 else: 1607 raise TypeError 1608 1609 def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False): 1610 if self._sock is None and self._ssl is None: 1611 return MQTT_ERR_NO_CONN 1612 1613 utopic = topic.encode('utf-8') 1614 command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain 1615 packet = bytearray() 1616 packet.extend(struct.pack("!B", command)) 1617 if payload is None: 1618 remaining_length = 2+len(utopic) 1619 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"' (NULL payload)") 1620 else: 1621 if isinstance(payload, str): 1622 upayload = payload.encode('utf-8') 1623 payloadlen = len(upayload) 1624 elif isinstance(payload, bytearray): 1625 payloadlen = len(payload) 1626 elif isinstance(payload, unicode): 1627 upayload = payload.encode('utf-8') 1628 payloadlen = len(upayload) 1629 1630 remaining_length = 2+len(utopic) + payloadlen 1631 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBLISH (d"+str(dup)+", q"+str(qos)+", r"+str(int(retain))+", m"+str(mid)+", '"+topic+"', ... ("+str(payloadlen)+" bytes)") 1632 1633 if qos > 0: 1634 # For message id 1635 remaining_length = remaining_length + 2 1636 1637 self._pack_remaining_length(packet, remaining_length) 1638 self._pack_str16(packet, topic) 1639 1640 if qos > 0: 1641 # For message id 1642 packet.extend(struct.pack("!H", mid)) 1643 1644 if payload is not None: 1645 if isinstance(payload, str): 1646 pack_format = str(payloadlen) + "s" 1647 packet.extend(struct.pack(pack_format, upayload)) 1648 elif isinstance(payload, bytearray): 1649 packet.extend(payload) 1650 elif isinstance(payload, unicode): 1651 pack_format = str(payloadlen) + "s" 1652 packet.extend(struct.pack(pack_format, upayload)) 1653 else: 1654 raise TypeError('payload must be a string, unicode or a bytearray.') 1655 1656 return self._packet_queue(PUBLISH, packet, mid, qos) 1657 1658 def _send_pubrec(self, mid): 1659 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")") 1660 return self._send_command_with_mid(PUBREC, mid, False) 1661 1662 def _send_pubrel(self, mid, dup=False): 1663 self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: "+str(mid)+")") 1664 return self._send_command_with_mid(PUBREL|2, mid, dup) 1665 1666 def _send_command_with_mid(self, command, mid, dup): 1667 # For PUBACK, PUBCOMP, PUBREC, and PUBREL 1668 if dup: 1669 command = command | 8 1670 1671 remaining_length = 2 1672 packet = struct.pack('!BBH', command, remaining_length, mid) 1673 return self._packet_queue(command, packet, mid, 1) 1674 1675 def _send_simple_command(self, command): 1676 # For DISCONNECT, PINGREQ and PINGRESP 1677 remaining_length = 0 1678 packet = struct.pack('!BB', command, remaining_length) 1679 return self._packet_queue(command, packet, 0, 0) 1680 1681 def _send_connect(self, keepalive, clean_session): 1682 if self._protocol == MQTTv31: 1683 protocol = PROTOCOL_NAMEv31 1684 proto_ver = 3 1685 else: 1686 protocol = PROTOCOL_NAMEv311 1687 proto_ver = 4 1688 remaining_length = 2+len(protocol) + 1+1+2 + 2+len(self._client_id) 1689 connect_flags = 0 1690 if clean_session: 1691 connect_flags = connect_flags | 0x02 1692 1693 if self._will: 1694 if self._will_payload is not None: 1695 remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload) 1696 else: 1697 remaining_length = remaining_length + 2+len(self._will_topic) + 2 1698 1699 connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5) 1700 1701 if self._username: 1702 remaining_length = remaining_length + 2+len(self._username) 1703 connect_flags = connect_flags | 0x80 1704 if self._password: 1705 connect_flags = connect_flags | 0x40 1706 remaining_length = remaining_length + 2+len(self._password) 1707 1708 command = CONNECT 1709 packet = bytearray() 1710 packet.extend(struct.pack("!B", command)) 1711 1712 self._pack_remaining_length(packet, remaining_length) 1713 packet.extend(struct.pack("!H"+str(len(protocol))+"sBBH", len(protocol), protocol, proto_ver, connect_flags, keepalive)) 1714 1715 self._pack_str16(packet, self._client_id) 1716 1717 if self._will: 1718 self._pack_str16(packet, self._will_topic) 1719 if self._will_payload is None or len(self._will_payload) == 0: 1720 packet.extend(struct.pack("!H", 0)) 1721 else: 1722 self._pack_str16(packet, self._will_payload) 1723 1724 if self._username: 1725 self._pack_str16(packet, self._username) 1726 1727 if self._password: 1728 self._pack_str16(packet, self._password) 1729 1730 self._keepalive = keepalive 1731 return self._packet_queue(command, packet, 0, 0) 1732 1733 def _send_disconnect(self): 1734 return self._send_simple_command(DISCONNECT) 1735 1736 def _send_subscribe(self, dup, topics): 1737 remaining_length = 2 1738 for t in topics: 1739 remaining_length = remaining_length + 2+len(t[0])+1 1740 1741 command = SUBSCRIBE | (dup<<3) | (1<<1) 1742 packet = bytearray() 1743 packet.extend(struct.pack("!B", command)) 1744 self._pack_remaining_length(packet, remaining_length) 1745 local_mid = self._mid_generate() 1746 packet.extend(struct.pack("!H", local_mid)) 1747 for t in topics: 1748 self._pack_str16(packet, t[0]) 1749 packet.extend(struct.pack("B", t[1])) 1750 return (self._packet_queue(command, packet, local_mid, 1), local_mid) 1751 1752 def _send_unsubscribe(self, dup, topics): 1753 remaining_length = 2 1754 for t in topics: 1755 remaining_length = remaining_length + 2+len(t) 1756 1757 command = UNSUBSCRIBE | (dup<<3) | (1<<1) 1758 packet = bytearray() 1759 packet.extend(struct.pack("!B", command)) 1760 self._pack_remaining_length(packet, remaining_length) 1761 local_mid = self._mid_generate() 1762 packet.extend(struct.pack("!H", local_mid)) 1763 for t in topics: 1764 self._pack_str16(packet, t) 1765 return (self._packet_queue(command, packet, local_mid, 1), local_mid) 1766 1767 def _message_retry_check_actual(self, messages, mutex): 1768 mutex.acquire() 1769 now = time.time() 1770 for m in messages: 1771 if m.timestamp + self._message_retry < now: 1772 if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec: 1773 m.timestamp = now 1774 m.dup = True 1775 self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) 1776 elif m.state == mqtt_ms_wait_for_pubrel: 1777 m.timestamp = now 1778 m.dup = True 1779 self._send_pubrec(m.mid) 1780 elif m.state == mqtt_ms_wait_for_pubcomp: 1781 m.timestamp = now 1782 m.dup = True 1783 self._send_pubrel(m.mid, True) 1784 mutex.release() 1785 1786 def _message_retry_check(self): 1787 self._message_retry_check_actual(self._out_messages, self._out_message_mutex) 1788 self._message_retry_check_actual(self._in_messages, self._in_message_mutex) 1789 1790 def _messages_reconnect_reset_out(self): 1791 self._out_message_mutex.acquire() 1792 self._inflight_messages = 0 1793 for m in self._out_messages: 1794 m.timestamp = 0 1795 if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: 1796 if m.qos == 0: 1797 m.state = mqtt_ms_publish 1798 elif m.qos == 1: 1799 #self._inflight_messages = self._inflight_messages + 1 1800 if m.state == mqtt_ms_wait_for_puback: 1801 m.dup = True 1802 m.state = mqtt_ms_publish 1803 elif m.qos == 2: 1804 #self._inflight_messages = self._inflight_messages + 1 1805 if m.state == mqtt_ms_wait_for_pubcomp: 1806 m.state = mqtt_ms_resend_pubrel 1807 m.dup = True 1808 else: 1809 if m.state == mqtt_ms_wait_for_pubrec: 1810 m.dup = True 1811 m.state = mqtt_ms_publish 1812 else: 1813 m.state = mqtt_ms_queued 1814 self._out_message_mutex.release() 1815 1816 def _messages_reconnect_reset_in(self): 1817 self._in_message_mutex.acquire() 1818 for m in self._in_messages: 1819 m.timestamp = 0 1820 if m.qos != 2: 1821 self._in_messages.pop(self._in_messages.index(m)) 1822 else: 1823 # Preserve current state 1824 pass 1825 self._in_message_mutex.release() 1826 1827 def _messages_reconnect_reset(self): 1828 self._messages_reconnect_reset_out() 1829 self._messages_reconnect_reset_in() 1830 1831 def _packet_queue(self, command, packet, mid, qos): 1832 mpkt = dict( 1833 command = command, 1834 mid = mid, 1835 qos = qos, 1836 pos = 0, 1837 to_process = len(packet), 1838 packet = packet) 1839 1840 self._out_packet_mutex.acquire() 1841 self._out_packet.append(mpkt) 1842 if self._current_out_packet_mutex.acquire(False): 1843 if self._current_out_packet is None and len(self._out_packet) > 0: 1844 self._current_out_packet = self._out_packet.pop(0) 1845 self._current_out_packet_mutex.release() 1846 self._out_packet_mutex.release() 1847 1848 # Write a single byte to sockpairW (connected to sockpairR) to break 1849 # out of select() if in threaded mode. 1850 try: 1851 self._sockpairW.send(sockpair_data) 1852 except socket.error as err: 1853 if err.errno != EAGAIN: 1854 raise 1855 1856 if not self._in_callback and self._thread is None: 1857 return self.loop_write() 1858 else: 1859 return MQTT_ERR_SUCCESS 1860 1861 def _packet_handle(self): 1862 cmd = self._in_packet['command']&0xF0 1863 if cmd == PINGREQ: 1864 return self._handle_pingreq() 1865 elif cmd == PINGRESP: 1866 return self._handle_pingresp() 1867 elif cmd == PUBACK: 1868 return self._handle_pubackcomp("PUBACK") 1869 elif cmd == PUBCOMP: 1870 return self._handle_pubackcomp("PUBCOMP") 1871 elif cmd == PUBLISH: 1872 return self._handle_publish() 1873 elif cmd == PUBREC: 1874 return self._handle_pubrec() 1875 elif cmd == PUBREL: 1876 return self._handle_pubrel() 1877 elif cmd == CONNACK: 1878 return self._handle_connack() 1879 elif cmd == SUBACK: 1880 return self._handle_suback() 1881 elif cmd == UNSUBACK: 1882 return self._handle_unsuback() 1883 else: 1884 # If we don't recognise the command, return an error straight away. 1885 self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command "+str(cmd)) 1886 return MQTT_ERR_PROTOCOL 1887 1888 def _handle_pingreq(self): 1889 if self._strict_protocol: 1890 if self._in_packet['remaining_length'] != 0: 1891 return MQTT_ERR_PROTOCOL 1892 1893 self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ") 1894 return self._send_pingresp() 1895 1896 def _handle_pingresp(self): 1897 if self._strict_protocol: 1898 if self._in_packet['remaining_length'] != 0: 1899 return MQTT_ERR_PROTOCOL 1900 1901 # No longer waiting for a PINGRESP. 1902 self._ping_t = 0 1903 self._easy_log(MQTT_LOG_DEBUG, "Received PINGRESP") 1904 return MQTT_ERR_SUCCESS 1905 1906 def _handle_connack(self): 1907 if self._strict_protocol: 1908 if self._in_packet['remaining_length'] != 2: 1909 return MQTT_ERR_PROTOCOL 1910 1911 if len(self._in_packet['packet']) != 2: 1912 return MQTT_ERR_PROTOCOL 1913 1914 (flags, result) = struct.unpack("!BB", self._in_packet['packet']) 1915 if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311: 1916 self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.") 1917 # Downgrade to MQTT v3.1 1918 self._protocol = MQTTv31 1919 return self.reconnect() 1920 1921 if result == 0: 1922 self._state = mqtt_cs_connected 1923 1924 self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")") 1925 self._callback_mutex.acquire() 1926 if self.on_connect: 1927 self._in_callback = True 1928 1929 if sys.version_info[0] < 3: 1930 argcount = self.on_connect.func_code.co_argcount 1931 else: 1932 argcount = self.on_connect.__code__.co_argcount 1933 1934 if argcount == 3: 1935 self.on_connect(self, self._userdata, result) 1936 else: 1937 flags_dict = dict() 1938 flags_dict['session present'] = flags & 0x01 1939 self.on_connect(self, self._userdata, flags_dict, result) 1940 self._in_callback = False 1941 self._callback_mutex.release() 1942 if result == 0: 1943 rc = 0 1944 self._out_message_mutex.acquire() 1945 for m in self._out_messages: 1946 m.timestamp = time.time() 1947 if m.state == mqtt_ms_queued: 1948 self.loop_write() # Process outgoing messages that have just been queued up 1949 self._out_message_mutex.release() 1950 return MQTT_ERR_SUCCESS 1951 1952 if m.qos == 0: 1953 self._in_callback = True # Don't call loop_write after _send_publish() 1954 rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) 1955 self._in_callback = False 1956 if rc != 0: 1957 self._out_message_mutex.release() 1958 return rc 1959 elif m.qos == 1: 1960 if m.state == mqtt_ms_publish: 1961 self._inflight_messages = self._inflight_messages + 1 1962 m.state = mqtt_ms_wait_for_puback 1963 self._in_callback = True # Don't call loop_write after _send_publish() 1964 rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) 1965 self._in_callback = False 1966 if rc != 0: 1967 self._out_message_mutex.release() 1968 return rc 1969 elif m.qos == 2: 1970 if m.state == mqtt_ms_publish: 1971 self._inflight_messages = self._inflight_messages + 1 1972 m.state = mqtt_ms_wait_for_pubrec 1973 self._in_callback = True # Don't call loop_write after _send_publish() 1974 rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) 1975 self._in_callback = False 1976 if rc != 0: 1977 self._out_message_mutex.release() 1978 return rc 1979 elif m.state == mqtt_ms_resend_pubrel: 1980 self._inflight_messages = self._inflight_messages + 1 1981 m.state = mqtt_ms_wait_for_pubcomp 1982 self._in_callback = True # Don't call loop_write after _send_pubrel() 1983 rc = self._send_pubrel(m.mid, m.dup) 1984 self._in_callback = False 1985 if rc != 0: 1986 self._out_message_mutex.release() 1987 return rc 1988 self.loop_write() # Process outgoing messages that have just been queued up 1989 self._out_message_mutex.release() 1990 return rc 1991 elif result > 0 and result < 6: 1992 return MQTT_ERR_CONN_REFUSED 1993 else: 1994 return MQTT_ERR_PROTOCOL 1995 1996 def _handle_suback(self): 1997 self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK") 1998 pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's' 1999 (mid, packet) = struct.unpack(pack_format, self._in_packet['packet']) 2000 pack_format = "!" + "B"*len(packet) 2001 granted_qos = struct.unpack(pack_format, packet) 2002 2003 self._callback_mutex.acquire() 2004 if self.on_subscribe: 2005 self._in_callback = True 2006 self.on_subscribe(self, self._userdata, mid, granted_qos) 2007 self._in_callback = False 2008 self._callback_mutex.release() 2009 2010 return MQTT_ERR_SUCCESS 2011 2012 def _handle_publish(self): 2013 rc = 0 2014 2015 header = self._in_packet['command'] 2016 message = MQTTMessage() 2017 message.dup = (header & 0x08)>>3 2018 message.qos = (header & 0x06)>>1 2019 message.retain = (header & 0x01) 2020 2021 pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's' 2022 (slen, packet) = struct.unpack(pack_format, self._in_packet['packet']) 2023 pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's' 2024 (message.topic, packet) = struct.unpack(pack_format, packet) 2025 2026 if len(message.topic) == 0: 2027 return MQTT_ERR_PROTOCOL 2028 2029 if sys.version_info[0] >= 3: 2030 message.topic = message.topic.decode('utf-8') 2031 2032 if message.qos > 0: 2033 pack_format = "!H" + str(len(packet)-2) + 's' 2034 (message.mid, packet) = struct.unpack(pack_format, packet) 2035 2036 message.payload = packet 2037 2038 self._easy_log( 2039 MQTT_LOG_DEBUG, 2040 "Received PUBLISH (d"+str(message.dup)+ 2041 ", q"+str(message.qos)+", r"+str(message.retain)+ 2042 ", m"+str(message.mid)+", '"+message.topic+ 2043 "', ... ("+str(len(message.payload))+" bytes)") 2044 2045 message.timestamp = time.time() 2046 if message.qos == 0: 2047 self._handle_on_message(message) 2048 return MQTT_ERR_SUCCESS 2049 elif message.qos == 1: 2050 rc = self._send_puback(message.mid) 2051 self._handle_on_message(message) 2052 return rc 2053 elif message.qos == 2: 2054 rc = self._send_pubrec(message.mid) 2055 message.state = mqtt_ms_wait_for_pubrel 2056 self._in_message_mutex.acquire() 2057 self._in_messages.append(message) 2058 self._in_message_mutex.release() 2059 return rc 2060 else: 2061 return MQTT_ERR_PROTOCOL 2062 2063 def _handle_pubrel(self): 2064 if self._strict_protocol: 2065 if self._in_packet['remaining_length'] != 2: 2066 return MQTT_ERR_PROTOCOL 2067 2068 if len(self._in_packet['packet']) != 2: 2069 return MQTT_ERR_PROTOCOL 2070 2071 mid = struct.unpack("!H", self._in_packet['packet']) 2072 mid = mid[0] 2073 self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: "+str(mid)+")") 2074 2075 self._in_message_mutex.acquire() 2076 for i in range(len(self._in_messages)): 2077 if self._in_messages[i].mid == mid: 2078 2079 # Only pass the message on if we have removed it from the queue - this 2080 # prevents multiple callbacks for the same message. 2081 self._handle_on_message(self._in_messages[i]) 2082 self._in_messages.pop(i) 2083 self._inflight_messages = self._inflight_messages - 1 2084 if self._max_inflight_messages > 0: 2085 self._out_message_mutex.acquire() 2086 rc = self._update_inflight() 2087 self._out_message_mutex.release() 2088 if rc != MQTT_ERR_SUCCESS: 2089 self._in_message_mutex.release() 2090 return rc 2091 2092 self._in_message_mutex.release() 2093 return self._send_pubcomp(mid) 2094 2095 self._in_message_mutex.release() 2096 return MQTT_ERR_SUCCESS 2097 2098 def _update_inflight(self): 2099 # Dont lock message_mutex here 2100 for m in self._out_messages: 2101 if self._inflight_messages < self._max_inflight_messages: 2102 if m.qos > 0 and m.state == mqtt_ms_queued: 2103 self._inflight_messages = self._inflight_messages + 1 2104 if m.qos == 1: 2105 m.state = mqtt_ms_wait_for_puback 2106 elif m.qos == 2: 2107 m.state = mqtt_ms_wait_for_pubrec 2108 rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) 2109 if rc != 0: 2110 return rc 2111 else: 2112 return MQTT_ERR_SUCCESS 2113 return MQTT_ERR_SUCCESS 2114 2115 def _handle_pubrec(self): 2116 if self._strict_protocol: 2117 if self._in_packet['remaining_length'] != 2: 2118 return MQTT_ERR_PROTOCOL 2119 2120 mid = struct.unpack("!H", self._in_packet['packet']) 2121 mid = mid[0] 2122 self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: "+str(mid)+")") 2123 2124 self._out_message_mutex.acquire() 2125 for m in self._out_messages: 2126 if m.mid == mid: 2127 m.state = mqtt_ms_wait_for_pubcomp 2128 m.timestamp = time.time() 2129 self._out_message_mutex.release() 2130 return self._send_pubrel(mid, False) 2131 2132 self._out_message_mutex.release() 2133 return MQTT_ERR_SUCCESS 2134 2135 def _handle_unsuback(self): 2136 if self._strict_protocol: 2137 if self._in_packet['remaining_length'] != 2: 2138 return MQTT_ERR_PROTOCOL 2139 2140 mid = struct.unpack("!H", self._in_packet['packet']) 2141 mid = mid[0] 2142 self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")") 2143 self._callback_mutex.acquire() 2144 if self.on_unsubscribe: 2145 self._in_callback = True 2146 self.on_unsubscribe(self, self._userdata, mid) 2147 self._in_callback = False 2148 self._callback_mutex.release() 2149 return MQTT_ERR_SUCCESS 2150 2151 def _handle_pubackcomp(self, cmd): 2152 if self._strict_protocol: 2153 if self._in_packet['remaining_length'] != 2: 2154 return MQTT_ERR_PROTOCOL 2155 2156 mid = struct.unpack("!H", self._in_packet['packet']) 2157 mid = mid[0] 2158 self._easy_log(MQTT_LOG_DEBUG, "Received "+cmd+" (Mid: "+str(mid)+")") 2159 2160 self._out_message_mutex.acquire() 2161 for i in range(len(self._out_messages)): 2162 try: 2163 if self._out_messages[i].mid == mid: 2164 # Only inform the client the message has been sent once. 2165 self._callback_mutex.acquire() 2166 if self.on_publish: 2167 self._out_message_mutex.release() 2168 self._in_callback = True 2169 self.on_publish(self, self._userdata, mid) 2170 self._in_callback = False 2171 self._out_message_mutex.acquire() 2172 2173 self._callback_mutex.release() 2174 self._out_messages.pop(i) 2175 self._inflight_messages = self._inflight_messages - 1 2176 if self._max_inflight_messages > 0: 2177 rc = self._update_inflight() 2178 if rc != MQTT_ERR_SUCCESS: 2179 self._out_message_mutex.release() 2180 return rc 2181 self._out_message_mutex.release() 2182 return MQTT_ERR_SUCCESS 2183 except IndexError: 2184 # Have removed item so i>count. 2185 # Not really an error. 2186 pass 2187 2188 self._out_message_mutex.release() 2189 return MQTT_ERR_SUCCESS 2190 2191 def _handle_on_message(self, message): 2192 self._callback_mutex.acquire() 2193 matched = False 2194 for t in self.on_message_filtered: 2195 if topic_matches_sub(t[0], message.topic): 2196 self._in_callback = True 2197 t[1](self, self._userdata, message) 2198 self._in_callback = False 2199 matched = True 2200 2201 if matched == False and self.on_message: 2202 self._in_callback = True 2203 self.on_message(self, self._userdata, message) 2204 self._in_callback = False 2205 2206 self._callback_mutex.release() 2207 2208 def _thread_main(self): 2209 self._state_mutex.acquire() 2210 if self._state == mqtt_cs_connect_async: 2211 self._state_mutex.release() 2212 self.reconnect() 2213 else: 2214 self._state_mutex.release() 2215 2216 self.loop_forever() 2217 2218 def _host_matches_cert(self, host, cert_host): 2219 if cert_host[0:2] == "*.": 2220 if cert_host.count("*") != 1: 2221 return False 2222 2223 host_match = host.split(".", 1)[1] 2224 cert_match = cert_host.split(".", 1)[1] 2225 if host_match == cert_match: 2226 return True 2227 else: 2228 return False 2229 else: 2230 if host == cert_host: 2231 return True 2232 else: 2233 return False 2234 2235 def _tls_match_hostname(self): 2236 cert = self._ssl.getpeercert() 2237 san = cert.get('subjectAltName') 2238 if san: 2239 have_san_dns = False 2240 for (key, value) in san: 2241 if key == 'DNS': 2242 have_san_dns = True 2243 if self._host_matches_cert(self._host.lower(), value.lower()) == True: 2244 return 2245 if key == 'IP Address': 2246 have_san_dns = True 2247 if value.lower() == self._host.lower(): 2248 return 2249 2250 if have_san_dns: 2251 # Only check subject if subjectAltName dns not found. 2252 raise ssl.SSLError('Certificate subject does not match remote hostname.') 2253 subject = cert.get('subject') 2254 if subject: 2255 for ((key, value),) in subject: 2256 if key == 'commonName': 2257 if self._host_matches_cert(self._host.lower(), value.lower()) == True: 2258 return 2259 2260 raise ssl.SSLError('Certificate subject does not match remote hostname.') 2261 2262 2263 # Compatibility class for easy porting from mosquitto.py. 2264 class Mosquitto(Client): 2265 def __init__(self, client_id="", clean_session=True, userdata=None): 2266 super(Mosquitto, self).__init__(client_id, clean_session, userdata)