class_objectProcessor.py
1 """ 2 The objectProcessor thread, of which there is only one, 3 processes the network objects 4 """ 5 # pylint: disable=too-many-locals,too-many-return-statements 6 # pylint: disable=too-many-branches,too-many-statements 7 import hashlib 8 import logging 9 import os 10 import random 11 import subprocess # nosec B404 12 import threading 13 import time 14 from binascii import hexlify 15 16 from . import helper_bitcoin 17 from . import helper_inbox 18 from . import helper_msgcoding 19 from . import helper_sent 20 from . import highlevelcrypto 21 from . import l10n 22 from . import protocol 23 from . import queues 24 from . import shared 25 from . import state 26 from .addresses import ( 27 decodeAddress, decodeVarint, 28 encodeAddress, encodeVarint, varintDecodeError 29 ) 30 from .bmconfigparser import config 31 from .helper_sql import ( 32 sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery) 33 from .network import knownnodes, invQueue 34 from .network.node import Peer 35 from .tr import _translate 36 37 logger = logging.getLogger('default') 38 39 40 class objectProcessor(threading.Thread): 41 """ 42 The objectProcessor thread, of which there is only one, receives network 43 objects (msg, broadcast, pubkey, getpubkey) from the receiveDataThreads. 44 """ 45 def __init__(self): 46 threading.Thread.__init__(self, name="objectProcessor") 47 random.seed() 48 if sql_ready.wait(sql_timeout) is False: 49 logger.fatal('SQL thread is not started in %s sec', sql_timeout) 50 os._exit(1) # pylint: disable=protected-access 51 shared.reloadMyAddressHashes() 52 shared.reloadBroadcastSendersForWhichImWatching() 53 # It may be the case that the last time Bitmessage was running, 54 # the user closed it before it finished processing everything in the 55 # objectProcessorQueue. Assuming that Bitmessage wasn't closed 56 # forcefully, it should have saved the data in the queue into the 57 # objectprocessorqueue table. Let's pull it out. 58 queryreturn = sqlQuery( 59 'SELECT objecttype, data FROM objectprocessorqueue') 60 for objectType, data in queryreturn: 61 queues.objectProcessorQueue.put((objectType, data)) 62 sqlExecute('DELETE FROM objectprocessorqueue') 63 logger.debug( 64 'Loaded %s objects from disk into the objectProcessorQueue.', 65 len(queryreturn)) 66 self.successfullyDecryptMessageTimings = [] 67 68 def run(self): 69 """Process the objects from `.queues.objectProcessorQueue`""" 70 while True: 71 objectType, data = queues.objectProcessorQueue.get() 72 73 self.checkackdata(data) 74 75 try: 76 if objectType == protocol.OBJECT_GETPUBKEY: 77 self.processgetpubkey(data) 78 elif objectType == protocol.OBJECT_PUBKEY: 79 self.processpubkey(data) 80 elif objectType == protocol.OBJECT_MSG: 81 self.processmsg(data) 82 elif objectType == protocol.OBJECT_BROADCAST: 83 self.processbroadcast(data) 84 elif objectType == protocol.OBJECT_ONIONPEER: 85 self.processonion(data) 86 # is more of a command, not an object type. Is used to get 87 # this thread past the queue.get() so that it will check 88 # the shutdown variable. 89 elif objectType == 'checkShutdownVariable': 90 pass 91 else: 92 if isinstance(objectType, int): 93 logger.info( 94 'Don\'t know how to handle object type 0x%08X', 95 objectType) 96 else: 97 logger.info( 98 'Don\'t know how to handle object type %s', 99 objectType) 100 except helper_msgcoding.DecompressionSizeException as e: 101 logger.error( 102 'The object is too big after decompression (stopped' 103 ' decompressing at %ib, your configured limit %ib).' 104 ' Ignoring', 105 e.size, config.safeGetInt('zlib', 'maxsize')) 106 except varintDecodeError as e: 107 logger.debug( 108 'There was a problem with a varint while processing an' 109 ' object. Some details: %s', e) 110 except Exception: 111 logger.critical( 112 'Critical error within objectProcessorThread: \n', 113 exc_info=True) 114 115 if state.shutdown: 116 # Wait just a moment for most of the connections to close 117 time.sleep(.5) 118 numberOfObjectsThatWereInTheObjectProcessorQueue = 0 119 with SqlBulkExecute() as sql: 120 while queues.objectProcessorQueue.curSize > 0: 121 objectType, data = queues.objectProcessorQueue.get() 122 sql.execute( 123 'INSERT INTO objectprocessorqueue VALUES (?,?)', 124 objectType, data) 125 numberOfObjectsThatWereInTheObjectProcessorQueue += 1 126 logger.debug( 127 'Saved %s objects from the objectProcessorQueue to' 128 ' disk. objectProcessorThread exiting.', 129 numberOfObjectsThatWereInTheObjectProcessorQueue) 130 state.shutdown = 2 131 break 132 133 @staticmethod 134 def checkackdata(data): 135 """Checking Acknowledgement of message received or not?""" 136 # Let's check whether this is a message acknowledgement bound for us. 137 if len(data) < 32: 138 return 139 140 # bypass nonce and time, retain object type/version/stream + body 141 readPosition = 16 142 143 if data[readPosition:] in state.ackdataForWhichImWatching: 144 logger.info('This object is an acknowledgement bound for me.') 145 del state.ackdataForWhichImWatching[data[readPosition:]] 146 sqlExecute( 147 "UPDATE sent SET status='ackreceived', lastactiontime=?" 148 " WHERE ackdata=?", int(time.time()), data[readPosition:]) 149 queues.UISignalQueue.put(( 150 'updateSentItemStatusByAckdata', ( 151 data[readPosition:], 152 _translate( 153 "MainWindow", 154 "Acknowledgement of the message received %1" 155 ).arg(l10n.formatTimestamp())) 156 )) 157 else: 158 logger.debug('This object is not an acknowledgement bound for me.') 159 160 @staticmethod 161 def processonion(data): 162 """Process onionpeer object""" 163 readPosition = 20 # bypass the nonce, time, and object type 164 length = decodeVarint(data[readPosition:readPosition + 10])[1] 165 readPosition += length 166 stream, length = decodeVarint(data[readPosition:readPosition + 10]) 167 readPosition += length 168 # it seems that stream is checked in network.bmproto 169 port, length = decodeVarint(data[readPosition:readPosition + 10]) 170 host = protocol.checkIPAddress(data[readPosition + length:]) 171 172 if not host: 173 return 174 peer = Peer(host, port) 175 with knownnodes.knownNodesLock: 176 # FIXME: adjust expirestime 177 knownnodes.addKnownNode( 178 stream, peer, is_self=state.ownAddresses.get(peer)) 179 180 @staticmethod 181 def processgetpubkey(data): 182 """Process getpubkey object""" 183 if len(data) > 200: 184 return logger.info( 185 'getpubkey is abnormally long. Sanity check failed.' 186 ' Ignoring object.') 187 readPosition = 20 # bypass the nonce, time, and object type 188 requestedAddressVersionNumber, addressVersionLength = decodeVarint( 189 data[readPosition:readPosition + 10]) 190 readPosition += addressVersionLength 191 streamNumber, streamNumberLength = decodeVarint( 192 data[readPosition:readPosition + 10]) 193 readPosition += streamNumberLength 194 195 if requestedAddressVersionNumber == 0: 196 return logger.debug( 197 'The requestedAddressVersionNumber of the pubkey request' 198 ' is zero. That doesn\'t make any sense. Ignoring it.') 199 if requestedAddressVersionNumber == 1: 200 return logger.debug( 201 'The requestedAddressVersionNumber of the pubkey request' 202 ' is 1 which isn\'t supported anymore. Ignoring it.') 203 if requestedAddressVersionNumber > 4: 204 return logger.debug( 205 'The requestedAddressVersionNumber of the pubkey request' 206 ' is too high. Can\'t understand. Ignoring it.') 207 208 myAddress = '' 209 if requestedAddressVersionNumber <= 3: 210 requestedHash = data[readPosition:readPosition + 20] 211 if len(requestedHash) != 20: 212 return logger.debug( 213 'The length of the requested hash is not 20 bytes.' 214 ' Something is wrong. Ignoring.') 215 logger.info( 216 'the hash requested in this getpubkey request is: %s', 217 hexlify(requestedHash)) 218 # if this address hash is one of mine 219 if requestedHash in shared.myAddressesByHash: 220 myAddress = shared.myAddressesByHash[requestedHash] 221 elif requestedAddressVersionNumber >= 4: 222 requestedTag = data[readPosition:readPosition + 32] 223 if len(requestedTag) != 32: 224 return logger.debug( 225 'The length of the requested tag is not 32 bytes.' 226 ' Something is wrong. Ignoring.') 227 logger.debug( 228 'the tag requested in this getpubkey request is: %s', 229 hexlify(requestedTag)) 230 if requestedTag in shared.myAddressesByTag: 231 myAddress = shared.myAddressesByTag[requestedTag] 232 233 if myAddress == '': 234 logger.info('This getpubkey request is not for any of my keys.') 235 return 236 237 if decodeAddress(myAddress)[1] != requestedAddressVersionNumber: 238 return logger.warning( 239 '(Within the processgetpubkey function) Someone requested' 240 ' one of my pubkeys but the requestedAddressVersionNumber' 241 ' doesn\'t match my actual address version number.' 242 ' Ignoring.') 243 if decodeAddress(myAddress)[2] != streamNumber: 244 return logger.warning( 245 '(Within the processgetpubkey function) Someone requested' 246 ' one of my pubkeys but the stream number on which we' 247 ' heard this getpubkey object doesn\'t match this' 248 ' address\' stream number. Ignoring.') 249 if config.safeGetBoolean(myAddress, 'chan'): 250 return logger.info( 251 'Ignoring getpubkey request because it is for one of my' 252 ' chan addresses. The other party should already have' 253 ' the pubkey.') 254 lastPubkeySendTime = config.safeGetInt( 255 myAddress, 'lastpubkeysendtime') 256 # If the last time we sent our pubkey was more recent than 257 # 28 days ago... 258 if lastPubkeySendTime > time.time() - 2419200: 259 return logger.info( 260 'Found getpubkey-requested-item in my list of EC hashes' 261 ' BUT we already sent it recently. Ignoring request.' 262 ' The lastPubkeySendTime is: %s', lastPubkeySendTime) 263 logger.info( 264 'Found getpubkey-requested-hash in my list of EC hashes.' 265 ' Telling Worker thread to do the POW for a pubkey message' 266 ' and send it out.') 267 if requestedAddressVersionNumber == 2: 268 queues.workerQueue.put(('doPOWForMyV2Pubkey', requestedHash)) 269 elif requestedAddressVersionNumber == 3: 270 queues.workerQueue.put(('sendOutOrStoreMyV3Pubkey', requestedHash)) 271 elif requestedAddressVersionNumber == 4: 272 queues.workerQueue.put(('sendOutOrStoreMyV4Pubkey', myAddress)) 273 274 def processpubkey(self, data): 275 """Process a pubkey object""" 276 pubkeyProcessingStartTime = time.time() 277 state.numberOfPubkeysProcessed += 1 278 queues.UISignalQueue.put(( 279 'updateNumberOfPubkeysProcessed', 'no data')) 280 readPosition = 20 # bypass the nonce, time, and object type 281 addressVersion, varintLength = decodeVarint( 282 data[readPosition:readPosition + 10]) 283 readPosition += varintLength 284 streamNumber, varintLength = decodeVarint( 285 data[readPosition:readPosition + 10]) 286 readPosition += varintLength 287 if addressVersion == 0: 288 return logger.debug( 289 '(Within processpubkey) addressVersion of 0 doesn\'t' 290 ' make sense.') 291 if addressVersion > 4 or addressVersion == 1: 292 return logger.info( 293 'This version of Bitmessage cannot handle version %s' 294 ' addresses.', addressVersion) 295 if addressVersion == 2: 296 # sanity check. This is the minimum possible length. 297 if len(data) < 146: 298 return logger.debug( 299 '(within processpubkey) payloadLength less than 146.' 300 ' Sanity check failed.') 301 readPosition += 4 302 pubSigningKey = '\x04' + data[readPosition:readPosition + 64] 303 # Is it possible for a public key to be invalid such that trying to 304 # encrypt or sign with it will cause an error? If it is, it would 305 # be easiest to test them here. 306 readPosition += 64 307 pubEncryptionKey = '\x04' + data[readPosition:readPosition + 64] 308 if len(pubEncryptionKey) < 65: 309 return logger.debug( 310 'publicEncryptionKey length less than 64. Sanity check' 311 ' failed.') 312 readPosition += 64 313 # The data we'll store in the pubkeys table. 314 dataToStore = data[20:readPosition] 315 ripe = highlevelcrypto.to_ripe(pubSigningKey, pubEncryptionKey) 316 317 if logger.isEnabledFor(logging.DEBUG): 318 logger.debug( 319 'within recpubkey, addressVersion: %s, streamNumber: %s' 320 '\nripe %s\npublicSigningKey in hex: %s' 321 '\npublicEncryptionKey in hex: %s', 322 addressVersion, streamNumber, hexlify(ripe), 323 hexlify(pubSigningKey), hexlify(pubEncryptionKey) 324 ) 325 326 address = encodeAddress(addressVersion, streamNumber, ripe) 327 328 queryreturn = sqlQuery( 329 "SELECT usedpersonally FROM pubkeys WHERE address=?" 330 " AND usedpersonally='yes'", address) 331 # if this pubkey is already in our database and if we have 332 # used it personally: 333 if queryreturn != []: 334 logger.info( 335 'We HAVE used this pubkey personally. Updating time.') 336 t = (address, addressVersion, dataToStore, 337 int(time.time()), 'yes') 338 else: 339 logger.info( 340 'We have NOT used this pubkey personally. Inserting' 341 ' in database.') 342 t = (address, addressVersion, dataToStore, 343 int(time.time()), 'no') 344 sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) 345 self.possibleNewPubkey(address) 346 if addressVersion == 3: 347 if len(data) < 170: # sanity check. 348 logger.warning( 349 '(within processpubkey) payloadLength less than 170.' 350 ' Sanity check failed.') 351 return 352 readPosition += 4 353 pubSigningKey = '\x04' + data[readPosition:readPosition + 64] 354 readPosition += 64 355 pubEncryptionKey = '\x04' + data[readPosition:readPosition + 64] 356 readPosition += 64 357 specifiedNonceTrialsPerByteLength = decodeVarint( 358 data[readPosition:readPosition + 10])[1] 359 readPosition += specifiedNonceTrialsPerByteLength 360 specifiedPayloadLengthExtraBytesLength = decodeVarint( 361 data[readPosition:readPosition + 10])[1] 362 readPosition += specifiedPayloadLengthExtraBytesLength 363 endOfSignedDataPosition = readPosition 364 # The data we'll store in the pubkeys table. 365 dataToStore = data[20:readPosition] 366 signatureLength, signatureLengthLength = decodeVarint( 367 data[readPosition:readPosition + 10]) 368 readPosition += signatureLengthLength 369 signature = data[readPosition:readPosition + signatureLength] 370 if highlevelcrypto.verify( 371 data[8:endOfSignedDataPosition], 372 signature, hexlify(pubSigningKey)): 373 logger.debug('ECDSA verify passed (within processpubkey)') 374 else: 375 logger.warning('ECDSA verify failed (within processpubkey)') 376 return 377 378 ripe = highlevelcrypto.to_ripe(pubSigningKey, pubEncryptionKey) 379 380 if logger.isEnabledFor(logging.DEBUG): 381 logger.debug( 382 'within recpubkey, addressVersion: %s, streamNumber: %s' 383 '\nripe %s\npublicSigningKey in hex: %s' 384 '\npublicEncryptionKey in hex: %s', 385 addressVersion, streamNumber, hexlify(ripe), 386 hexlify(pubSigningKey), hexlify(pubEncryptionKey) 387 ) 388 389 address = encodeAddress(addressVersion, streamNumber, ripe) 390 queryreturn = sqlQuery( 391 "SELECT usedpersonally FROM pubkeys WHERE address=?" 392 " AND usedpersonally='yes'", address) 393 # if this pubkey is already in our database and if we have 394 # used it personally: 395 if queryreturn != []: 396 logger.info( 397 'We HAVE used this pubkey personally. Updating time.') 398 t = (address, addressVersion, dataToStore, 399 int(time.time()), 'yes') 400 else: 401 logger.info( 402 'We have NOT used this pubkey personally. Inserting' 403 ' in database.') 404 t = (address, addressVersion, dataToStore, 405 int(time.time()), 'no') 406 sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) 407 self.possibleNewPubkey(address) 408 409 if addressVersion == 4: 410 if len(data) < 350: # sanity check. 411 return logger.debug( 412 '(within processpubkey) payloadLength less than 350.' 413 ' Sanity check failed.') 414 415 tag = data[readPosition:readPosition + 32] 416 if tag not in state.neededPubkeys: 417 return logger.info( 418 'We don\'t need this v4 pubkey. We didn\'t ask for it.') 419 420 # Let us try to decrypt the pubkey 421 toAddress = state.neededPubkeys[tag][0] 422 if protocol.decryptAndCheckPubkeyPayload(data, toAddress) == \ 423 'successful': 424 # At this point we know that we have been waiting on this 425 # pubkey. This function will command the workerThread 426 # to start work on the messages that require it. 427 self.possibleNewPubkey(toAddress) 428 429 # Display timing data 430 logger.debug( 431 'Time required to process this pubkey: %s', 432 time.time() - pubkeyProcessingStartTime) 433 434 def processmsg(self, data): 435 """Process a message object""" 436 messageProcessingStartTime = time.time() 437 state.numberOfMessagesProcessed += 1 438 queues.UISignalQueue.put(( 439 'updateNumberOfMessagesProcessed', 'no data')) 440 readPosition = 20 # bypass the nonce, time, and object type 441 msgVersion, msgVersionLength = decodeVarint( 442 data[readPosition:readPosition + 9]) 443 if msgVersion != 1: 444 return logger.info( 445 'Cannot understand message versions other than one.' 446 ' Ignoring message.') 447 readPosition += msgVersionLength 448 449 streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = \ 450 decodeVarint(data[readPosition:readPosition + 9]) 451 readPosition += streamNumberAsClaimedByMsgLength 452 inventoryHash = highlevelcrypto.calculateInventoryHash(data) 453 initialDecryptionSuccessful = False 454 455 # This is not an acknowledgement bound for me. See if it is a message 456 # bound for me by trying to decrypt it with my private keys. 457 458 for key, cryptorObject in sorted( 459 list(shared.myECCryptorObjects.items()), 460 key=lambda x: random.random()): # nosec B311 461 try: 462 # continue decryption attempts to avoid timing attacks 463 if initialDecryptionSuccessful: 464 cryptorObject.decrypt(data[readPosition:]) 465 else: 466 decryptedData = cryptorObject.decrypt(data[readPosition:]) 467 # This is the RIPE hash of my pubkeys. We need this 468 # below to compare to the destination_ripe included 469 # in the encrypted data. 470 toRipe = key 471 initialDecryptionSuccessful = True 472 logger.info( 473 'EC decryption successful using key associated' 474 ' with ripe hash: %s.', hexlify(key)) 475 except Exception: # nosec B110 476 pass 477 if not initialDecryptionSuccessful: 478 # This is not a message bound for me. 479 return logger.info( 480 'Length of time program spent failing to decrypt this' 481 ' message: %s seconds.', 482 time.time() - messageProcessingStartTime) 483 484 # This is a message bound for me. 485 # Look up my address based on the RIPE hash. 486 toAddress = shared.myAddressesByHash[toRipe] 487 readPosition = 0 488 sendersAddressVersionNumber, sendersAddressVersionNumberLength = \ 489 decodeVarint(decryptedData[readPosition:readPosition + 10]) 490 readPosition += sendersAddressVersionNumberLength 491 if sendersAddressVersionNumber == 0: 492 return logger.info( 493 'Cannot understand sendersAddressVersionNumber = 0.' 494 ' Ignoring message.') 495 if sendersAddressVersionNumber > 4: 496 return logger.info( 497 'Sender\'s address version number %s not yet supported.' 498 ' Ignoring message.', sendersAddressVersionNumber) 499 if len(decryptedData) < 170: 500 return logger.info( 501 'Length of the unencrypted data is unreasonably short.' 502 ' Sanity check failed. Ignoring message.') 503 sendersStreamNumber, sendersStreamNumberLength = decodeVarint( 504 decryptedData[readPosition:readPosition + 10]) 505 if sendersStreamNumber == 0: 506 logger.info('sender\'s stream number is 0. Ignoring message.') 507 return 508 readPosition += sendersStreamNumberLength 509 readPosition += 4 510 pubSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64] 511 readPosition += 64 512 pubEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64] 513 readPosition += 64 514 if sendersAddressVersionNumber >= 3: 515 requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = \ 516 decodeVarint(decryptedData[readPosition:readPosition + 10]) 517 readPosition += varintLength 518 logger.info( 519 'sender\'s requiredAverageProofOfWorkNonceTrialsPerByte is %s', 520 requiredAverageProofOfWorkNonceTrialsPerByte) 521 requiredPayloadLengthExtraBytes, varintLength = decodeVarint( 522 decryptedData[readPosition:readPosition + 10]) 523 readPosition += varintLength 524 logger.info( 525 'sender\'s requiredPayloadLengthExtraBytes is %s', 526 requiredPayloadLengthExtraBytes) 527 # needed for when we store the pubkey in our database of pubkeys 528 # for later use. 529 endOfThePublicKeyPosition = readPosition 530 if toRipe != decryptedData[readPosition:readPosition + 20]: 531 return logger.info( 532 'The original sender of this message did not send it to' 533 ' you. Someone is attempting a Surreptitious Forwarding' 534 ' Attack.\nSee: ' 535 'http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html' 536 '\nyour toRipe: %s\nembedded destination toRipe: %s', 537 hexlify(toRipe), 538 hexlify(decryptedData[readPosition:readPosition + 20]) 539 ) 540 readPosition += 20 541 messageEncodingType, messageEncodingTypeLength = decodeVarint( 542 decryptedData[readPosition:readPosition + 10]) 543 readPosition += messageEncodingTypeLength 544 messageLength, messageLengthLength = decodeVarint( 545 decryptedData[readPosition:readPosition + 10]) 546 readPosition += messageLengthLength 547 message = decryptedData[readPosition:readPosition + messageLength] 548 readPosition += messageLength 549 ackLength, ackLengthLength = decodeVarint( 550 decryptedData[readPosition:readPosition + 10]) 551 readPosition += ackLengthLength 552 ackData = decryptedData[readPosition:readPosition + ackLength] 553 readPosition += ackLength 554 # needed to mark the end of what is covered by the signature 555 positionOfBottomOfAckData = readPosition 556 signatureLength, signatureLengthLength = decodeVarint( 557 decryptedData[readPosition:readPosition + 10]) 558 readPosition += signatureLengthLength 559 signature = decryptedData[ 560 readPosition:readPosition + signatureLength] 561 signedData = data[8:20] + encodeVarint(1) + encodeVarint( 562 streamNumberAsClaimedByMsg 563 ) + decryptedData[:positionOfBottomOfAckData] 564 565 if not highlevelcrypto.verify( 566 signedData, signature, hexlify(pubSigningKey)): 567 return logger.debug('ECDSA verify failed') 568 logger.debug('ECDSA verify passed') 569 if logger.isEnabledFor(logging.DEBUG): 570 logger.debug( 571 'As a matter of intellectual curiosity, here is the Bitcoin' 572 ' address associated with the keys owned by the other person:' 573 ' %s ..and here is the testnet address: %s. The other person' 574 ' must take their private signing key from Bitmessage and' 575 ' import it into Bitcoin (or a service like Blockchain.info)' 576 ' for it to be of any use. Do not use this unless you know' 577 ' what you are doing.', 578 helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), 579 helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey) 580 ) 581 # Used to detect and ignore duplicate messages in our inbox 582 sigHash = highlevelcrypto.double_sha512(signature)[32:] 583 584 # calculate the fromRipe. 585 ripe = highlevelcrypto.to_ripe(pubSigningKey, pubEncryptionKey) 586 fromAddress = encodeAddress( 587 sendersAddressVersionNumber, sendersStreamNumber, ripe) 588 589 # Let's store the public key in case we want to reply to this 590 # person. 591 sqlExecute( 592 '''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', 593 fromAddress, 594 sendersAddressVersionNumber, 595 decryptedData[:endOfThePublicKeyPosition], 596 int(time.time()), 597 'yes') 598 599 # Check to see whether we happen to be awaiting this 600 # pubkey in order to send a message. If we are, it will do the POW 601 # and send it. 602 self.possibleNewPubkey(fromAddress) 603 604 # If this message is bound for one of my version 3 addresses (or 605 # higher), then we must check to make sure it meets our demanded 606 # proof of work requirement. If this is bound for one of my chan 607 # addresses then we skip this check; the minimum network POW is 608 # fine. 609 # If the toAddress version number is 3 or higher and not one of 610 # my chan addresses: 611 if decodeAddress(toAddress)[1] >= 3 \ 612 and not config.safeGetBoolean(toAddress, 'chan'): 613 # If I'm not friendly with this person: 614 if not shared.isAddressInMyAddressBookSubscriptionsListOrWhitelist( 615 fromAddress): 616 requiredNonceTrialsPerByte = config.getint( 617 toAddress, 'noncetrialsperbyte') 618 requiredPayloadLengthExtraBytes = config.getint( 619 toAddress, 'payloadlengthextrabytes') 620 if not protocol.isProofOfWorkSufficient( 621 data, requiredNonceTrialsPerByte, 622 requiredPayloadLengthExtraBytes): 623 return logger.info( 624 'Proof of work in msg is insufficient only because' 625 ' it does not meet our higher requirement.') 626 # Gets set to True if the user shouldn't see the message according 627 # to black or white lists. 628 blockMessage = False 629 # If we are using a blacklist 630 if config.get( 631 'bitmessagesettings', 'blackwhitelist') == 'black': 632 queryreturn = sqlQuery( 633 "SELECT label FROM blacklist where address=? and enabled='1'", 634 fromAddress) 635 if queryreturn != []: 636 logger.info('Message ignored because address is in blacklist.') 637 638 blockMessage = True 639 else: # We're using a whitelist 640 queryreturn = sqlQuery( 641 "SELECT label FROM whitelist where address=? and enabled='1'", 642 fromAddress) 643 if queryreturn == []: 644 logger.info( 645 'Message ignored because address not in whitelist.') 646 blockMessage = True 647 648 # toLabel = config.safeGet(toAddress, 'label', toAddress) 649 try: 650 decodedMessage = helper_msgcoding.MsgDecode( 651 messageEncodingType, message) 652 except helper_msgcoding.MsgDecodeException: 653 return 654 subject = decodedMessage.subject 655 body = decodedMessage.body 656 657 # Let us make sure that we haven't already received this message 658 if helper_inbox.isMessageAlreadyInInbox(sigHash): 659 logger.info('This msg is already in our inbox. Ignoring it.') 660 blockMessage = True 661 if not blockMessage: 662 if messageEncodingType != 0: 663 t = (inventoryHash, toAddress, fromAddress, subject, 664 int(time.time()), body, 'inbox', messageEncodingType, 665 0, sigHash) 666 helper_inbox.insert(t) 667 668 queues.UISignalQueue.put(('displayNewInboxMessage', ( 669 inventoryHash, toAddress, fromAddress, subject, body))) 670 671 # If we are behaving as an API then we might need to run an 672 # outside command to let some program know that a new message 673 # has arrived. 674 if config.safeGetBoolean( 675 'bitmessagesettings', 'apienabled'): 676 apiNotifyPath = config.safeGet( 677 'bitmessagesettings', 'apinotifypath') 678 if apiNotifyPath: 679 subprocess.call([apiNotifyPath, "newMessage"]) # nosec B603 680 681 # Let us now check and see whether our receiving address is 682 # behaving as a mailing list 683 if config.safeGetBoolean(toAddress, 'mailinglist') \ 684 and messageEncodingType != 0: 685 mailingListName = config.safeGet( 686 toAddress, 'mailinglistname', '') 687 # Let us send out this message as a broadcast 688 subject = self.addMailingListNameToSubject( 689 subject, mailingListName) 690 # Let us now send this message out as a broadcast 691 message = time.strftime( 692 "%a, %Y-%m-%d %H:%M:%S UTC", time.gmtime() 693 ) + ' Message ostensibly from ' + fromAddress \ 694 + ':\n\n' + body 695 # The fromAddress for the broadcast that we are about to 696 # send is the toAddress (my address) for the msg message 697 # we are currently processing. 698 fromAddress = toAddress 699 # We don't actually need the ackdata for acknowledgement 700 # since this is a broadcast message but we can use it to 701 # update the user interface when the POW is done generating. 702 toAddress = '[Broadcast subscribers]' 703 704 ackdata = helper_sent.insert( 705 fromAddress=fromAddress, 706 status='broadcastqueued', 707 subject=subject, 708 message=message, 709 encoding=messageEncodingType) 710 711 queues.UISignalQueue.put(( 712 'displayNewSentMessage', ( 713 toAddress, '[Broadcast subscribers]', fromAddress, 714 subject, message, ackdata) 715 )) 716 queues.workerQueue.put(('sendbroadcast', '')) 717 718 # Don't send ACK if invalid, blacklisted senders, invisible 719 # messages, disabled or chan 720 if ( 721 self.ackDataHasAValidHeader(ackData) and not blockMessage 722 and messageEncodingType != 0 723 and not config.safeGetBoolean(toAddress, 'dontsendack') 724 and not config.safeGetBoolean(toAddress, 'chan') 725 ): 726 ackPayload = ackData[24:] 727 objectType, toStreamNumber, expiresTime = \ 728 protocol.decodeObjectParameters(ackPayload) 729 inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload) 730 state.Inventory[inventoryHash] = ( 731 objectType, toStreamNumber, ackPayload, expiresTime, b'') 732 invQueue.put((toStreamNumber, inventoryHash)) 733 734 # Display timing data 735 timeRequiredToAttemptToDecryptMessage = time.time( 736 ) - messageProcessingStartTime 737 self.successfullyDecryptMessageTimings.append( 738 timeRequiredToAttemptToDecryptMessage) 739 timing_sum = 0 740 for item in self.successfullyDecryptMessageTimings: 741 timing_sum += item 742 logger.debug( 743 'Time to decrypt this message successfully: %s' 744 '\nAverage time for all message decryption successes since' 745 ' startup: %s.', 746 timeRequiredToAttemptToDecryptMessage, 747 timing_sum / len(self.successfullyDecryptMessageTimings) 748 ) 749 750 def processbroadcast(self, data): 751 """Process a broadcast object""" 752 messageProcessingStartTime = time.time() 753 state.numberOfBroadcastsProcessed += 1 754 queues.UISignalQueue.put(( 755 'updateNumberOfBroadcastsProcessed', 'no data')) 756 inventoryHash = highlevelcrypto.calculateInventoryHash(data) 757 readPosition = 20 # bypass the nonce, time, and object type 758 broadcastVersion, broadcastVersionLength = decodeVarint( 759 data[readPosition:readPosition + 9]) 760 readPosition += broadcastVersionLength 761 if broadcastVersion < 4 or broadcastVersion > 5: 762 return logger.info( 763 'Cannot decode incoming broadcast versions less than 4' 764 ' or higher than 5. Assuming the sender isn\'t being silly,' 765 ' you should upgrade Bitmessage because this message shall' 766 ' be ignored.' 767 ) 768 cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint( 769 data[readPosition:readPosition + 10]) 770 readPosition += cleartextStreamNumberLength 771 if broadcastVersion == 4: 772 # v4 broadcasts are encrypted the same way the msgs are 773 # encrypted. To see if we are interested in a v4 broadcast, 774 # we try to decrypt it. This was replaced with v5 broadcasts 775 # which include a tag which we check instead, just like we do 776 # with v4 pubkeys. 777 signedData = data[8:readPosition] 778 initialDecryptionSuccessful = False 779 for key, cryptorObject in sorted( 780 list(shared.MyECSubscriptionCryptorObjects.items()), 781 key=lambda x: random.random()): # nosec B311 782 try: 783 # continue decryption attempts to avoid timing attacks 784 if initialDecryptionSuccessful: 785 cryptorObject.decrypt(data[readPosition:]) 786 else: 787 decryptedData = cryptorObject.decrypt( 788 data[readPosition:]) 789 # This is the RIPE hash of the sender's pubkey. 790 # We need this below to compare to the RIPE hash 791 # of the sender's address to verify that it was 792 # encrypted by with their key rather than some 793 # other key. 794 toRipe = key 795 initialDecryptionSuccessful = True 796 logger.info( 797 'EC decryption successful using key associated' 798 ' with ripe hash: %s', hexlify(key)) 799 except Exception: 800 logger.debug( 801 'cryptorObject.decrypt Exception:', exc_info=True) 802 if not initialDecryptionSuccessful: 803 # This is not a broadcast I am interested in. 804 return logger.debug( 805 'Length of time program spent failing to decrypt this' 806 ' v4 broadcast: %s seconds.', 807 time.time() - messageProcessingStartTime) 808 elif broadcastVersion == 5: 809 embeddedTag = data[readPosition:readPosition + 32] 810 readPosition += 32 811 if embeddedTag not in shared.MyECSubscriptionCryptorObjects: 812 logger.debug('We\'re not interested in this broadcast.') 813 return 814 # We are interested in this broadcast because of its tag. 815 # We're going to add some more data which is signed further down. 816 signedData = data[8:readPosition] 817 cryptorObject = shared.MyECSubscriptionCryptorObjects[embeddedTag] 818 try: 819 decryptedData = cryptorObject.decrypt(data[readPosition:]) 820 logger.debug('EC decryption successful') 821 except Exception: 822 return logger.debug( 823 'Broadcast version %s decryption Unsuccessful.', 824 broadcastVersion) 825 # At this point this is a broadcast I have decrypted and am 826 # interested in. 827 readPosition = 0 828 sendersAddressVersion, sendersAddressVersionLength = decodeVarint( 829 decryptedData[readPosition:readPosition + 9]) 830 if broadcastVersion == 4: 831 if sendersAddressVersion < 2 or sendersAddressVersion > 3: 832 return logger.warning( 833 'Cannot decode senderAddressVersion other than 2 or 3.' 834 ' Assuming the sender isn\'t being silly, you should' 835 ' upgrade Bitmessage because this message shall be' 836 ' ignored.' 837 ) 838 elif broadcastVersion == 5: 839 if sendersAddressVersion < 4: 840 return logger.info( 841 'Cannot decode senderAddressVersion less than 4 for' 842 ' broadcast version number 5. Assuming the sender' 843 ' isn\'t being silly, you should upgrade Bitmessage' 844 ' because this message shall be ignored.' 845 ) 846 readPosition += sendersAddressVersionLength 847 sendersStream, sendersStreamLength = decodeVarint( 848 decryptedData[readPosition:readPosition + 9]) 849 if sendersStream != cleartextStreamNumber: 850 return logger.info( 851 'The stream number outside of the encryption on which the' 852 ' POW was completed doesn\'t match the stream number' 853 ' inside the encryption. Ignoring broadcast.' 854 ) 855 readPosition += sendersStreamLength 856 readPosition += 4 857 sendersPubSigningKey = '\x04' + \ 858 decryptedData[readPosition:readPosition + 64] 859 readPosition += 64 860 sendersPubEncryptionKey = '\x04' + \ 861 decryptedData[readPosition:readPosition + 64] 862 readPosition += 64 863 if sendersAddressVersion >= 3: 864 requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = \ 865 decodeVarint(decryptedData[readPosition:readPosition + 10]) 866 readPosition += varintLength 867 logger.debug( 868 'sender\'s requiredAverageProofOfWorkNonceTrialsPerByte' 869 ' is %s', requiredAverageProofOfWorkNonceTrialsPerByte) 870 requiredPayloadLengthExtraBytes, varintLength = decodeVarint( 871 decryptedData[readPosition:readPosition + 10]) 872 readPosition += varintLength 873 logger.debug( 874 'sender\'s requiredPayloadLengthExtraBytes is %s', 875 requiredPayloadLengthExtraBytes) 876 endOfPubkeyPosition = readPosition 877 878 calculatedRipe = highlevelcrypto.to_ripe( 879 sendersPubSigningKey, sendersPubEncryptionKey) 880 881 if broadcastVersion == 4: 882 if toRipe != calculatedRipe: 883 return logger.info( 884 'The encryption key used to encrypt this message' 885 ' doesn\'t match the keys inbedded in the message' 886 ' itself. Ignoring message.' 887 ) 888 elif broadcastVersion == 5: 889 calculatedTag = highlevelcrypto.double_sha512( 890 encodeVarint(sendersAddressVersion) 891 + encodeVarint(sendersStream) + calculatedRipe 892 )[32:] 893 if calculatedTag != embeddedTag: 894 return logger.debug( 895 'The tag and encryption key used to encrypt this' 896 ' message doesn\'t match the keys inbedded in the' 897 ' message itself. Ignoring message.' 898 ) 899 messageEncodingType, messageEncodingTypeLength = decodeVarint( 900 decryptedData[readPosition:readPosition + 9]) 901 if messageEncodingType == 0: 902 return 903 readPosition += messageEncodingTypeLength 904 messageLength, messageLengthLength = decodeVarint( 905 decryptedData[readPosition:readPosition + 9]) 906 readPosition += messageLengthLength 907 message = decryptedData[readPosition:readPosition + messageLength] 908 readPosition += messageLength 909 readPositionAtBottomOfMessage = readPosition 910 signatureLength, signatureLengthLength = decodeVarint( 911 decryptedData[readPosition:readPosition + 9]) 912 readPosition += signatureLengthLength 913 signature = decryptedData[ 914 readPosition:readPosition + signatureLength] 915 signedData += decryptedData[:readPositionAtBottomOfMessage] 916 if not highlevelcrypto.verify( 917 signedData, signature, hexlify(sendersPubSigningKey)): 918 logger.debug('ECDSA verify failed') 919 return 920 logger.debug('ECDSA verify passed') 921 # Used to detect and ignore duplicate messages in our inbox 922 sigHash = highlevelcrypto.double_sha512(signature)[32:] 923 924 fromAddress = encodeAddress( 925 sendersAddressVersion, sendersStream, calculatedRipe) 926 logger.info('fromAddress: %s', fromAddress) 927 928 # Let's store the public key in case we want to reply to this person. 929 sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', 930 fromAddress, 931 sendersAddressVersion, 932 decryptedData[:endOfPubkeyPosition], 933 int(time.time()), 934 'yes') 935 936 # Check to see whether we happen to be awaiting this 937 # pubkey in order to send a message. If we are, it will do the POW 938 # and send it. 939 self.possibleNewPubkey(fromAddress) 940 941 try: 942 decodedMessage = helper_msgcoding.MsgDecode( 943 messageEncodingType, message) 944 except helper_msgcoding.MsgDecodeException: 945 return 946 subject = decodedMessage.subject 947 body = decodedMessage.body 948 949 toAddress = '[Broadcast subscribers]' 950 if helper_inbox.isMessageAlreadyInInbox(sigHash): 951 logger.info('This broadcast is already in our inbox. Ignoring it.') 952 return 953 t = (inventoryHash, toAddress, fromAddress, subject, int( 954 time.time()), body, 'inbox', messageEncodingType, 0, sigHash) 955 helper_inbox.insert(t) 956 957 queues.UISignalQueue.put(('displayNewInboxMessage', ( 958 inventoryHash, toAddress, fromAddress, subject, body))) 959 960 # If we are behaving as an API then we might need to run an 961 # outside command to let some program know that a new message 962 # has arrived. 963 if config.safeGetBoolean('bitmessagesettings', 'apienabled'): 964 apiNotifyPath = config.safeGet( 965 'bitmessagesettings', 'apinotifypath') 966 if apiNotifyPath: 967 subprocess.call([apiNotifyPath, "newBroadcast"]) # nosec B603 968 969 # Display timing data 970 logger.info( 971 'Time spent processing this interesting broadcast: %s', 972 time.time() - messageProcessingStartTime) 973 974 def possibleNewPubkey(self, address): 975 """ 976 We have inserted a pubkey into our pubkey table which we received 977 from a pubkey, msg, or broadcast message. It might be one that we 978 have been waiting for. Let's check. 979 """ 980 981 # For address versions <= 3, we wait on a key with the correct 982 # address version, stream number and RIPE hash. 983 addressVersion, streamNumber, ripe = decodeAddress(address)[1:] 984 if addressVersion <= 3: 985 if address in state.neededPubkeys: 986 del state.neededPubkeys[address] 987 self.sendMessages(address) 988 else: 989 logger.debug( 990 'We don\'t need this pub key. We didn\'t ask for it.' 991 ' For address: %s', address) 992 # For address versions >= 4, we wait on a pubkey with the correct tag. 993 # Let us create the tag from the address and see if we were waiting 994 # for it. 995 elif addressVersion >= 4: 996 tag = highlevelcrypto.double_sha512( 997 encodeVarint(addressVersion) + encodeVarint(streamNumber) 998 + ripe 999 )[32:] 1000 if tag in state.neededPubkeys: 1001 del state.neededPubkeys[tag] 1002 self.sendMessages(address) 1003 1004 @staticmethod 1005 def sendMessages(address): 1006 """ 1007 This method is called by the `possibleNewPubkey` when it sees 1008 that we now have the necessary pubkey to send one or more messages. 1009 """ 1010 logger.info('We have been awaiting the arrival of this pubkey.') 1011 sqlExecute( 1012 "UPDATE sent SET status='doingmsgpow', retrynumber=0" 1013 " WHERE toaddress=?" 1014 " AND (status='awaitingpubkey' OR status='doingpubkeypow')" 1015 " AND folder='sent'", address) 1016 queues.workerQueue.put(('sendmessage', '')) 1017 1018 @staticmethod 1019 def ackDataHasAValidHeader(ackData): 1020 """Checking ackData with valid Header, not sending ackData when false""" 1021 if len(ackData) < protocol.Header.size: 1022 logger.info( 1023 'The length of ackData is unreasonably short. Not sending' 1024 ' ackData.') 1025 return False 1026 1027 magic, command, payloadLength, checksum = protocol.Header.unpack( 1028 ackData[:protocol.Header.size]) 1029 if magic != protocol.magic: 1030 logger.info('Ackdata magic bytes were wrong. Not sending ackData.') 1031 return False 1032 payload = ackData[protocol.Header.size:] 1033 if len(payload) != payloadLength: 1034 logger.info( 1035 'ackData payload length doesn\'t match the payload length' 1036 ' specified in the header. Not sending ackdata.') 1037 return False 1038 # ~1.6 MB which is the maximum possible size of an inv message. 1039 if payloadLength > 1600100: 1040 # The largest message should be either an inv or a getdata 1041 # message at 1.6 MB in size. 1042 # That doesn't mean that the object may be that big. The 1043 # shared.checkAndShareObjectWithPeers function will verify 1044 # that it is no larger than 2^18 bytes. 1045 return False 1046 # test the checksum in the message. 1047 if checksum != hashlib.sha512(payload).digest()[0:4]: 1048 logger.info('ackdata checksum wrong. Not sending ackdata.') 1049 return False 1050 command = command.rstrip('\x00') 1051 if command != 'object': 1052 return False 1053 return True 1054 1055 @staticmethod 1056 def addMailingListNameToSubject(subject, mailingListName): 1057 """Adding mailingListName to subject""" 1058 subject = subject.strip() 1059 if subject[:3] == 'Re:' or subject[:3] == 'RE:': 1060 subject = subject[3:].strip() 1061 if '[' + mailingListName + ']' in subject: 1062 return subject 1063 return '[' + mailingListName + '] ' + subject