/ src / class_objectProcessor.py.bak
class_objectProcessor.py.bak
   1  """
   2  The objectProcessor thread, of which there is only one,
   3  processes the network objects
   4  """
   5  # pylint: disable=too-many-locals,too-many-return-statements
   6  # pylint: disable=too-many-branches,too-many-statements
   7  import hashlib
   8  import logging
   9  import os
  10  import random
  11  import subprocess  # nosec B404
  12  import threading
  13  import time
  14  from binascii import hexlify
  15  
  16  import helper_bitcoin
  17  import helper_inbox
  18  import helper_msgcoding
  19  import helper_sent
  20  import highlevelcrypto
  21  import l10n
  22  import protocol
  23  import queues
  24  import shared
  25  import state
  26  from addresses import (
  27      decodeAddress, decodeVarint,
  28      encodeAddress, encodeVarint, varintDecodeError
  29  )
  30  from bmconfigparser import config
  31  from helper_sql import (
  32      sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
  33  from network import knownnodes, invQueue
  34  from network.node import Peer
  35  from tr import _translate
  36  
  37  logger = logging.getLogger('default')
  38  
  39  
  40  class objectProcessor(threading.Thread):
  41      """
  42      The objectProcessor thread, of which there is only one, receives network
  43      objects (msg, broadcast, pubkey, getpubkey) from the receiveDataThreads.
  44      """
  45      def __init__(self):
  46          threading.Thread.__init__(self, name="objectProcessor")
  47          random.seed()
  48          if sql_ready.wait(sql_timeout) is False:
  49              logger.fatal('SQL thread is not started in %s sec', sql_timeout)
  50              os._exit(1)  # pylint: disable=protected-access
  51          shared.reloadMyAddressHashes()
  52          shared.reloadBroadcastSendersForWhichImWatching()
  53          # It may be the case that the last time Bitmessage was running,
  54          # the user closed it before it finished processing everything in the
  55          # objectProcessorQueue. Assuming that Bitmessage wasn't closed
  56          # forcefully, it should have saved the data in the queue into the
  57          # objectprocessorqueue table. Let's pull it out.
  58          queryreturn = sqlQuery(
  59              'SELECT objecttype, data FROM objectprocessorqueue')
  60          for objectType, data in queryreturn:
  61              queues.objectProcessorQueue.put((objectType, data))
  62          sqlExecute('DELETE FROM objectprocessorqueue')
  63          logger.debug(
  64              'Loaded %s objects from disk into the objectProcessorQueue.',
  65              len(queryreturn))
  66          self.successfullyDecryptMessageTimings = []
  67  
  68      def run(self):
  69          """Process the objects from `.queues.objectProcessorQueue`"""
  70          while True:
  71              objectType, data = queues.objectProcessorQueue.get()
  72  
  73              self.checkackdata(data)
  74  
  75              try:
  76                  if objectType == protocol.OBJECT_GETPUBKEY:
  77                      self.processgetpubkey(data)
  78                  elif objectType == protocol.OBJECT_PUBKEY:
  79                      self.processpubkey(data)
  80                  elif objectType == protocol.OBJECT_MSG:
  81                      self.processmsg(data)
  82                  elif objectType == protocol.OBJECT_BROADCAST:
  83                      self.processbroadcast(data)
  84                  elif objectType == protocol.OBJECT_ONIONPEER:
  85                      self.processonion(data)
  86                  # is more of a command, not an object type. Is used to get
  87                  # this thread past the queue.get() so that it will check
  88                  # the shutdown variable.
  89                  elif objectType == 'checkShutdownVariable':
  90                      pass
  91                  else:
  92                      if isinstance(objectType, int):
  93                          logger.info(
  94                              'Don\'t know how to handle object type 0x%08X',
  95                              objectType)
  96                      else:
  97                          logger.info(
  98                              'Don\'t know how to handle object type %s',
  99                              objectType)
 100              except helper_msgcoding.DecompressionSizeException as e:
 101                  logger.error(
 102                      'The object is too big after decompression (stopped'
 103                      ' decompressing at %ib, your configured limit %ib).'
 104                      ' Ignoring',
 105                      e.size, config.safeGetInt('zlib', 'maxsize'))
 106              except varintDecodeError as e:
 107                  logger.debug(
 108                      'There was a problem with a varint while processing an'
 109                      ' object. Some details: %s', e)
 110              except Exception:
 111                  logger.critical(
 112                      'Critical error within objectProcessorThread: \n',
 113                      exc_info=True)
 114  
 115              if state.shutdown:
 116                  # Wait just a moment for most of the connections to close
 117                  time.sleep(.5)
 118                  numberOfObjectsThatWereInTheObjectProcessorQueue = 0
 119                  with SqlBulkExecute() as sql:
 120                      while queues.objectProcessorQueue.curSize > 0:
 121                          objectType, data = queues.objectProcessorQueue.get()
 122                          sql.execute(
 123                              'INSERT INTO objectprocessorqueue VALUES (?,?)',
 124                              objectType, data)
 125                          numberOfObjectsThatWereInTheObjectProcessorQueue += 1
 126                  logger.debug(
 127                      'Saved %s objects from the objectProcessorQueue to'
 128                      ' disk. objectProcessorThread exiting.',
 129                      numberOfObjectsThatWereInTheObjectProcessorQueue)
 130                  state.shutdown = 2
 131                  break
 132  
 133      @staticmethod
 134      def checkackdata(data):
 135          """Checking Acknowledgement of message received or not?"""
 136          # Let's check whether this is a message acknowledgement bound for us.
 137          if len(data) < 32:
 138              return
 139  
 140          # bypass nonce and time, retain object type/version/stream + body
 141          readPosition = 16
 142  
 143          if data[readPosition:] in state.ackdataForWhichImWatching:
 144              logger.info('This object is an acknowledgement bound for me.')
 145              del state.ackdataForWhichImWatching[data[readPosition:]]
 146              sqlExecute(
 147                  "UPDATE sent SET status='ackreceived', lastactiontime=?"
 148                  " WHERE ackdata=?", int(time.time()), data[readPosition:])
 149              queues.UISignalQueue.put((
 150                  'updateSentItemStatusByAckdata', (
 151                      data[readPosition:],
 152                      _translate(
 153                          "MainWindow",
 154                          "Acknowledgement of the message received %1"
 155                      ).arg(l10n.formatTimestamp()))
 156              ))
 157          else:
 158              logger.debug('This object is not an acknowledgement bound for me.')
 159  
 160      @staticmethod
 161      def processonion(data):
 162          """Process onionpeer object"""
 163          readPosition = 20  # bypass the nonce, time, and object type
 164          length = decodeVarint(data[readPosition:readPosition + 10])[1]
 165          readPosition += length
 166          stream, length = decodeVarint(data[readPosition:readPosition + 10])
 167          readPosition += length
 168          # it seems that stream is checked in network.bmproto
 169          port, length = decodeVarint(data[readPosition:readPosition + 10])
 170          host = protocol.checkIPAddress(data[readPosition + length:])
 171  
 172          if not host:
 173              return
 174          peer = Peer(host, port)
 175          with knownnodes.knownNodesLock:
 176              # FIXME: adjust expirestime
 177              knownnodes.addKnownNode(
 178                  stream, peer, is_self=state.ownAddresses.get(peer))
 179  
 180      @staticmethod
 181      def processgetpubkey(data):
 182          """Process getpubkey object"""
 183          if len(data) > 200:
 184              return logger.info(
 185                  'getpubkey is abnormally long. Sanity check failed.'
 186                  ' Ignoring object.')
 187          readPosition = 20  # bypass the nonce, time, and object type
 188          requestedAddressVersionNumber, addressVersionLength = decodeVarint(
 189              data[readPosition:readPosition + 10])
 190          readPosition += addressVersionLength
 191          streamNumber, streamNumberLength = decodeVarint(
 192              data[readPosition:readPosition + 10])
 193          readPosition += streamNumberLength
 194  
 195          if requestedAddressVersionNumber == 0:
 196              return logger.debug(
 197                  'The requestedAddressVersionNumber of the pubkey request'
 198                  ' is zero. That doesn\'t make any sense. Ignoring it.')
 199          if requestedAddressVersionNumber == 1:
 200              return logger.debug(
 201                  'The requestedAddressVersionNumber of the pubkey request'
 202                  ' is 1 which isn\'t supported anymore. Ignoring it.')
 203          if requestedAddressVersionNumber > 4:
 204              return logger.debug(
 205                  'The requestedAddressVersionNumber of the pubkey request'
 206                  ' is too high. Can\'t understand. Ignoring it.')
 207  
 208          myAddress = ''
 209          if requestedAddressVersionNumber <= 3:
 210              requestedHash = data[readPosition:readPosition + 20]
 211              if len(requestedHash) != 20:
 212                  return logger.debug(
 213                      'The length of the requested hash is not 20 bytes.'
 214                      ' Something is wrong. Ignoring.')
 215              logger.info(
 216                  'the hash requested in this getpubkey request is: %s',
 217                  hexlify(requestedHash))
 218              # if this address hash is one of mine
 219              if requestedHash in shared.myAddressesByHash:
 220                  myAddress = shared.myAddressesByHash[requestedHash]
 221          elif requestedAddressVersionNumber >= 4:
 222              requestedTag = data[readPosition:readPosition + 32]
 223              if len(requestedTag) != 32:
 224                  return logger.debug(
 225                      'The length of the requested tag is not 32 bytes.'
 226                      ' Something is wrong. Ignoring.')
 227              logger.debug(
 228                  'the tag requested in this getpubkey request is: %s',
 229                  hexlify(requestedTag))
 230              if requestedTag in shared.myAddressesByTag:
 231                  myAddress = shared.myAddressesByTag[requestedTag]
 232  
 233          if myAddress == '':
 234              logger.info('This getpubkey request is not for any of my keys.')
 235              return
 236  
 237          if decodeAddress(myAddress)[1] != requestedAddressVersionNumber:
 238              return logger.warning(
 239                  '(Within the processgetpubkey function) Someone requested'
 240                  ' one of my pubkeys but the requestedAddressVersionNumber'
 241                  ' doesn\'t match my actual address version number.'
 242                  ' Ignoring.')
 243          if decodeAddress(myAddress)[2] != streamNumber:
 244              return logger.warning(
 245                  '(Within the processgetpubkey function) Someone requested'
 246                  ' one of my pubkeys but the stream number on which we'
 247                  ' heard this getpubkey object doesn\'t match this'
 248                  ' address\' stream number. Ignoring.')
 249          if config.safeGetBoolean(myAddress, 'chan'):
 250              return logger.info(
 251                  'Ignoring getpubkey request because it is for one of my'
 252                  ' chan addresses. The other party should already have'
 253                  ' the pubkey.')
 254          lastPubkeySendTime = config.safeGetInt(
 255              myAddress, 'lastpubkeysendtime')
 256          # If the last time we sent our pubkey was more recent than
 257          # 28 days ago...
 258          if lastPubkeySendTime > time.time() - 2419200:
 259              return logger.info(
 260                  'Found getpubkey-requested-item in my list of EC hashes'
 261                  ' BUT we already sent it recently. Ignoring request.'
 262                  ' The lastPubkeySendTime is: %s', lastPubkeySendTime)
 263          logger.info(
 264              'Found getpubkey-requested-hash in my list of EC hashes.'
 265              ' Telling Worker thread to do the POW for a pubkey message'
 266              ' and send it out.')
 267          if requestedAddressVersionNumber == 2:
 268              queues.workerQueue.put(('doPOWForMyV2Pubkey', requestedHash))
 269          elif requestedAddressVersionNumber == 3:
 270              queues.workerQueue.put(('sendOutOrStoreMyV3Pubkey', requestedHash))
 271          elif requestedAddressVersionNumber == 4:
 272              queues.workerQueue.put(('sendOutOrStoreMyV4Pubkey', myAddress))
 273  
 274      def processpubkey(self, data):
 275          """Process a pubkey object"""
 276          pubkeyProcessingStartTime = time.time()
 277          state.numberOfPubkeysProcessed += 1
 278          queues.UISignalQueue.put((
 279              'updateNumberOfPubkeysProcessed', 'no data'))
 280          readPosition = 20  # bypass the nonce, time, and object type
 281          addressVersion, varintLength = decodeVarint(
 282              data[readPosition:readPosition + 10])
 283          readPosition += varintLength
 284          streamNumber, varintLength = decodeVarint(
 285              data[readPosition:readPosition + 10])
 286          readPosition += varintLength
 287          if addressVersion == 0:
 288              return logger.debug(
 289                  '(Within processpubkey) addressVersion of 0 doesn\'t'
 290                  ' make sense.')
 291          if addressVersion > 4 or addressVersion == 1:
 292              return logger.info(
 293                  'This version of Bitmessage cannot handle version %s'
 294                  ' addresses.', addressVersion)
 295          if addressVersion == 2:
 296              # sanity check. This is the minimum possible length.
 297              if len(data) < 146:
 298                  return logger.debug(
 299                      '(within processpubkey) payloadLength less than 146.'
 300                      ' Sanity check failed.')
 301              readPosition += 4
 302              pubSigningKey = '\x04' + data[readPosition:readPosition + 64]
 303              # Is it possible for a public key to be invalid such that trying to
 304              # encrypt or sign with it will cause an error? If it is, it would
 305              # be easiest to test them here.
 306              readPosition += 64
 307              pubEncryptionKey = '\x04' + data[readPosition:readPosition + 64]
 308              if len(pubEncryptionKey) < 65:
 309                  return logger.debug(
 310                      'publicEncryptionKey length less than 64. Sanity check'
 311                      ' failed.')
 312              readPosition += 64
 313              # The data we'll store in the pubkeys table.
 314              dataToStore = data[20:readPosition]
 315              ripe = highlevelcrypto.to_ripe(pubSigningKey, pubEncryptionKey)
 316  
 317              if logger.isEnabledFor(logging.DEBUG):
 318                  logger.debug(
 319                      'within recpubkey, addressVersion: %s, streamNumber: %s'
 320                      '\nripe %s\npublicSigningKey in hex: %s'
 321                      '\npublicEncryptionKey in hex: %s',
 322                      addressVersion, streamNumber, hexlify(ripe),
 323                      hexlify(pubSigningKey), hexlify(pubEncryptionKey)
 324                  )
 325  
 326              address = encodeAddress(addressVersion, streamNumber, ripe)
 327  
 328              queryreturn = sqlQuery(
 329                  "SELECT usedpersonally FROM pubkeys WHERE address=?"
 330                  " AND usedpersonally='yes'", address)
 331              # if this pubkey is already in our database and if we have
 332              # used it personally:
 333              if queryreturn != []:
 334                  logger.info(
 335                      'We HAVE used this pubkey personally. Updating time.')
 336                  t = (address, addressVersion, dataToStore,
 337                       int(time.time()), 'yes')
 338              else:
 339                  logger.info(
 340                      'We have NOT used this pubkey personally. Inserting'
 341                      ' in database.')
 342                  t = (address, addressVersion, dataToStore,
 343                       int(time.time()), 'no')
 344              sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
 345              self.possibleNewPubkey(address)
 346          if addressVersion == 3:
 347              if len(data) < 170:  # sanity check.
 348                  logger.warning(
 349                      '(within processpubkey) payloadLength less than 170.'
 350                      ' Sanity check failed.')
 351                  return
 352              readPosition += 4
 353              pubSigningKey = '\x04' + data[readPosition:readPosition + 64]
 354              readPosition += 64
 355              pubEncryptionKey = '\x04' + data[readPosition:readPosition + 64]
 356              readPosition += 64
 357              specifiedNonceTrialsPerByteLength = decodeVarint(
 358                  data[readPosition:readPosition + 10])[1]
 359              readPosition += specifiedNonceTrialsPerByteLength
 360              specifiedPayloadLengthExtraBytesLength = decodeVarint(
 361                  data[readPosition:readPosition + 10])[1]
 362              readPosition += specifiedPayloadLengthExtraBytesLength
 363              endOfSignedDataPosition = readPosition
 364              # The data we'll store in the pubkeys table.
 365              dataToStore = data[20:readPosition]
 366              signatureLength, signatureLengthLength = decodeVarint(
 367                  data[readPosition:readPosition + 10])
 368              readPosition += signatureLengthLength
 369              signature = data[readPosition:readPosition + signatureLength]
 370              if highlevelcrypto.verify(
 371                      data[8:endOfSignedDataPosition],
 372                      signature, hexlify(pubSigningKey)):
 373                  logger.debug('ECDSA verify passed (within processpubkey)')
 374              else:
 375                  logger.warning('ECDSA verify failed (within processpubkey)')
 376                  return
 377  
 378              ripe = highlevelcrypto.to_ripe(pubSigningKey, pubEncryptionKey)
 379  
 380              if logger.isEnabledFor(logging.DEBUG):
 381                  logger.debug(
 382                      'within recpubkey, addressVersion: %s, streamNumber: %s'
 383                      '\nripe %s\npublicSigningKey in hex: %s'
 384                      '\npublicEncryptionKey in hex: %s',
 385                      addressVersion, streamNumber, hexlify(ripe),
 386                      hexlify(pubSigningKey), hexlify(pubEncryptionKey)
 387                  )
 388  
 389              address = encodeAddress(addressVersion, streamNumber, ripe)
 390              queryreturn = sqlQuery(
 391                  "SELECT usedpersonally FROM pubkeys WHERE address=?"
 392                  " AND usedpersonally='yes'", address)
 393              # if this pubkey is already in our database and if we have
 394              # used it personally:
 395              if queryreturn != []:
 396                  logger.info(
 397                      'We HAVE used this pubkey personally. Updating time.')
 398                  t = (address, addressVersion, dataToStore,
 399                       int(time.time()), 'yes')
 400              else:
 401                  logger.info(
 402                      'We have NOT used this pubkey personally. Inserting'
 403                      ' in database.')
 404                  t = (address, addressVersion, dataToStore,
 405                       int(time.time()), 'no')
 406              sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
 407              self.possibleNewPubkey(address)
 408  
 409          if addressVersion == 4:
 410              if len(data) < 350:  # sanity check.
 411                  return logger.debug(
 412                      '(within processpubkey) payloadLength less than 350.'
 413                      ' Sanity check failed.')
 414  
 415              tag = data[readPosition:readPosition + 32]
 416              if tag not in state.neededPubkeys:
 417                  return logger.info(
 418                      'We don\'t need this v4 pubkey. We didn\'t ask for it.')
 419  
 420              # Let us try to decrypt the pubkey
 421              toAddress = state.neededPubkeys[tag][0]
 422              if protocol.decryptAndCheckPubkeyPayload(data, toAddress) == \
 423                      'successful':
 424                  # At this point we know that we have been waiting on this
 425                  # pubkey. This function will command the workerThread
 426                  # to start work on the messages that require it.
 427                  self.possibleNewPubkey(toAddress)
 428  
 429          # Display timing data
 430          logger.debug(
 431              'Time required to process this pubkey: %s',
 432              time.time() - pubkeyProcessingStartTime)
 433  
 434      def processmsg(self, data):
 435          """Process a message object"""
 436          messageProcessingStartTime = time.time()
 437          state.numberOfMessagesProcessed += 1
 438          queues.UISignalQueue.put((
 439              'updateNumberOfMessagesProcessed', 'no data'))
 440          readPosition = 20  # bypass the nonce, time, and object type
 441          msgVersion, msgVersionLength = decodeVarint(
 442              data[readPosition:readPosition + 9])
 443          if msgVersion != 1:
 444              return logger.info(
 445                  'Cannot understand message versions other than one.'
 446                  ' Ignoring message.')
 447          readPosition += msgVersionLength
 448  
 449          streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = \
 450              decodeVarint(data[readPosition:readPosition + 9])
 451          readPosition += streamNumberAsClaimedByMsgLength
 452          inventoryHash = highlevelcrypto.calculateInventoryHash(data)
 453          initialDecryptionSuccessful = False
 454  
 455          # This is not an acknowledgement bound for me. See if it is a message
 456          # bound for me by trying to decrypt it with my private keys.
 457  
 458          for key, cryptorObject in sorted(
 459                  shared.myECCryptorObjects.items(),
 460                  key=lambda x: random.random()):  # nosec B311
 461              try:
 462                  # continue decryption attempts to avoid timing attacks
 463                  if initialDecryptionSuccessful:
 464                      cryptorObject.decrypt(data[readPosition:])
 465                  else:
 466                      decryptedData = cryptorObject.decrypt(data[readPosition:])
 467                      # This is the RIPE hash of my pubkeys. We need this
 468                      # below to compare to the destination_ripe included
 469                      # in the encrypted data.
 470                      toRipe = key
 471                      initialDecryptionSuccessful = True
 472                      logger.info(
 473                          'EC decryption successful using key associated'
 474                          ' with ripe hash: %s.', hexlify(key))
 475              except Exception:  # nosec B110
 476                  pass
 477          if not initialDecryptionSuccessful:
 478              # This is not a message bound for me.
 479              return logger.info(
 480                  'Length of time program spent failing to decrypt this'
 481                  ' message: %s seconds.',
 482                  time.time() - messageProcessingStartTime)
 483  
 484          # This is a message bound for me.
 485          # Look up my address based on the RIPE hash.
 486          toAddress = shared.myAddressesByHash[toRipe]
 487          readPosition = 0
 488          sendersAddressVersionNumber, sendersAddressVersionNumberLength = \
 489              decodeVarint(decryptedData[readPosition:readPosition + 10])
 490          readPosition += sendersAddressVersionNumberLength
 491          if sendersAddressVersionNumber == 0:
 492              return logger.info(
 493                  'Cannot understand sendersAddressVersionNumber = 0.'
 494                  ' Ignoring message.')
 495          if sendersAddressVersionNumber > 4:
 496              return logger.info(
 497                  'Sender\'s address version number %s not yet supported.'
 498                  ' Ignoring message.', sendersAddressVersionNumber)
 499          if len(decryptedData) < 170:
 500              return logger.info(
 501                  'Length of the unencrypted data is unreasonably short.'
 502                  ' Sanity check failed. Ignoring message.')
 503          sendersStreamNumber, sendersStreamNumberLength = decodeVarint(
 504              decryptedData[readPosition:readPosition + 10])
 505          if sendersStreamNumber == 0:
 506              logger.info('sender\'s stream number is 0. Ignoring message.')
 507              return
 508          readPosition += sendersStreamNumberLength
 509          readPosition += 4
 510          pubSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64]
 511          readPosition += 64
 512          pubEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64]
 513          readPosition += 64
 514          if sendersAddressVersionNumber >= 3:
 515              requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = \
 516                  decodeVarint(decryptedData[readPosition:readPosition + 10])
 517              readPosition += varintLength
 518              logger.info(
 519                  'sender\'s requiredAverageProofOfWorkNonceTrialsPerByte is %s',
 520                  requiredAverageProofOfWorkNonceTrialsPerByte)
 521              requiredPayloadLengthExtraBytes, varintLength = decodeVarint(
 522                  decryptedData[readPosition:readPosition + 10])
 523              readPosition += varintLength
 524              logger.info(
 525                  'sender\'s requiredPayloadLengthExtraBytes is %s',
 526                  requiredPayloadLengthExtraBytes)
 527          # needed for when we store the pubkey in our database of pubkeys
 528          # for later use.
 529          endOfThePublicKeyPosition = readPosition
 530          if toRipe != decryptedData[readPosition:readPosition + 20]:
 531              return logger.info(
 532                  'The original sender of this message did not send it to'
 533                  ' you. Someone is attempting a Surreptitious Forwarding'
 534                  ' Attack.\nSee: '
 535                  'http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html'
 536                  '\nyour toRipe: %s\nembedded destination toRipe: %s',
 537                  hexlify(toRipe),
 538                  hexlify(decryptedData[readPosition:readPosition + 20])
 539              )
 540          readPosition += 20
 541          messageEncodingType, messageEncodingTypeLength = decodeVarint(
 542              decryptedData[readPosition:readPosition + 10])
 543          readPosition += messageEncodingTypeLength
 544          messageLength, messageLengthLength = decodeVarint(
 545              decryptedData[readPosition:readPosition + 10])
 546          readPosition += messageLengthLength
 547          message = decryptedData[readPosition:readPosition + messageLength]
 548          readPosition += messageLength
 549          ackLength, ackLengthLength = decodeVarint(
 550              decryptedData[readPosition:readPosition + 10])
 551          readPosition += ackLengthLength
 552          ackData = decryptedData[readPosition:readPosition + ackLength]
 553          readPosition += ackLength
 554          # needed to mark the end of what is covered by the signature
 555          positionOfBottomOfAckData = readPosition
 556          signatureLength, signatureLengthLength = decodeVarint(
 557              decryptedData[readPosition:readPosition + 10])
 558          readPosition += signatureLengthLength
 559          signature = decryptedData[
 560              readPosition:readPosition + signatureLength]
 561          signedData = data[8:20] + encodeVarint(1) + encodeVarint(
 562              streamNumberAsClaimedByMsg
 563          ) + decryptedData[:positionOfBottomOfAckData]
 564  
 565          if not highlevelcrypto.verify(
 566                  signedData, signature, hexlify(pubSigningKey)):
 567              return logger.debug('ECDSA verify failed')
 568          logger.debug('ECDSA verify passed')
 569          if logger.isEnabledFor(logging.DEBUG):
 570              logger.debug(
 571                  'As a matter of intellectual curiosity, here is the Bitcoin'
 572                  ' address associated with the keys owned by the other person:'
 573                  ' %s  ..and here is the testnet address: %s. The other person'
 574                  ' must take their private signing key from Bitmessage and'
 575                  ' import it into Bitcoin (or a service like Blockchain.info)'
 576                  ' for it to be of any use. Do not use this unless you know'
 577                  ' what you are doing.',
 578                  helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey),
 579                  helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey)
 580              )
 581          # Used to detect and ignore duplicate messages in our inbox
 582          sigHash = highlevelcrypto.double_sha512(signature)[32:]
 583  
 584          # calculate the fromRipe.
 585          ripe = highlevelcrypto.to_ripe(pubSigningKey, pubEncryptionKey)
 586          fromAddress = encodeAddress(
 587              sendersAddressVersionNumber, sendersStreamNumber, ripe)
 588  
 589          # Let's store the public key in case we want to reply to this
 590          # person.
 591          sqlExecute(
 592              '''INSERT INTO pubkeys VALUES (?,?,?,?,?)''',
 593              fromAddress,
 594              sendersAddressVersionNumber,
 595              decryptedData[:endOfThePublicKeyPosition],
 596              int(time.time()),
 597              'yes')
 598  
 599          # Check to see whether we happen to be awaiting this
 600          # pubkey in order to send a message. If we are, it will do the POW
 601          # and send it.
 602          self.possibleNewPubkey(fromAddress)
 603  
 604          # If this message is bound for one of my version 3 addresses (or
 605          # higher), then we must check to make sure it meets our demanded
 606          # proof of work requirement. If this is bound for one of my chan
 607          # addresses then we skip this check; the minimum network POW is
 608          # fine.
 609          # If the toAddress version number is 3 or higher and not one of
 610          # my chan addresses:
 611          if decodeAddress(toAddress)[1] >= 3 \
 612                  and not config.safeGetBoolean(toAddress, 'chan'):
 613              # If I'm not friendly with this person:
 614              if not shared.isAddressInMyAddressBookSubscriptionsListOrWhitelist(
 615                      fromAddress):
 616                  requiredNonceTrialsPerByte = config.getint(
 617                      toAddress, 'noncetrialsperbyte')
 618                  requiredPayloadLengthExtraBytes = config.getint(
 619                      toAddress, 'payloadlengthextrabytes')
 620                  if not protocol.isProofOfWorkSufficient(
 621                          data, requiredNonceTrialsPerByte,
 622                          requiredPayloadLengthExtraBytes):
 623                      return logger.info(
 624                          'Proof of work in msg is insufficient only because'
 625                          ' it does not meet our higher requirement.')
 626          # Gets set to True if the user shouldn't see the message according
 627          # to black or white lists.
 628          blockMessage = False
 629          # If we are using a blacklist
 630          if config.get(
 631                  'bitmessagesettings', 'blackwhitelist') == 'black':
 632              queryreturn = sqlQuery(
 633                  "SELECT label FROM blacklist where address=? and enabled='1'",
 634                  fromAddress)
 635              if queryreturn != []:
 636                  logger.info('Message ignored because address is in blacklist.')
 637  
 638                  blockMessage = True
 639          else:  # We're using a whitelist
 640              queryreturn = sqlQuery(
 641                  "SELECT label FROM whitelist where address=? and enabled='1'",
 642                  fromAddress)
 643              if queryreturn == []:
 644                  logger.info(
 645                      'Message ignored because address not in whitelist.')
 646                  blockMessage = True
 647  
 648          # toLabel = config.safeGet(toAddress, 'label', toAddress)
 649          try:
 650              decodedMessage = helper_msgcoding.MsgDecode(
 651                  messageEncodingType, message)
 652          except helper_msgcoding.MsgDecodeException:
 653              return
 654          subject = decodedMessage.subject
 655          body = decodedMessage.body
 656  
 657          # Let us make sure that we haven't already received this message
 658          if helper_inbox.isMessageAlreadyInInbox(sigHash):
 659              logger.info('This msg is already in our inbox. Ignoring it.')
 660              blockMessage = True
 661          if not blockMessage:
 662              if messageEncodingType != 0:
 663                  t = (inventoryHash, toAddress, fromAddress, subject,
 664                       int(time.time()), body, 'inbox', messageEncodingType,
 665                       0, sigHash)
 666                  helper_inbox.insert(t)
 667  
 668                  queues.UISignalQueue.put(('displayNewInboxMessage', (
 669                      inventoryHash, toAddress, fromAddress, subject, body)))
 670  
 671              # If we are behaving as an API then we might need to run an
 672              # outside command to let some program know that a new message
 673              # has arrived.
 674              if config.safeGetBoolean(
 675                      'bitmessagesettings', 'apienabled'):
 676                  apiNotifyPath = config.safeGet(
 677                      'bitmessagesettings', 'apinotifypath')
 678                  if apiNotifyPath:
 679                      subprocess.call([apiNotifyPath, "newMessage"])  # nosec B603
 680  
 681              # Let us now check and see whether our receiving address is
 682              # behaving as a mailing list
 683              if config.safeGetBoolean(toAddress, 'mailinglist') \
 684                      and messageEncodingType != 0:
 685                  mailingListName = config.safeGet(
 686                      toAddress, 'mailinglistname', '')
 687                  # Let us send out this message as a broadcast
 688                  subject = self.addMailingListNameToSubject(
 689                      subject, mailingListName)
 690                  # Let us now send this message out as a broadcast
 691                  message = time.strftime(
 692                      "%a, %Y-%m-%d %H:%M:%S UTC", time.gmtime()
 693                  ) + '   Message ostensibly from ' + fromAddress \
 694                      + ':\n\n' + body
 695                  # The fromAddress for the broadcast that we are about to
 696                  # send is the toAddress (my address) for the msg message
 697                  # we are currently processing.
 698                  fromAddress = toAddress
 699                  # We don't actually need the ackdata for acknowledgement
 700                  # since this is a broadcast message but we can use it to
 701                  # update the user interface when the POW is done generating.
 702                  toAddress = '[Broadcast subscribers]'
 703  
 704                  ackdata = helper_sent.insert(
 705                      fromAddress=fromAddress,
 706                      status='broadcastqueued',
 707                      subject=subject,
 708                      message=message,
 709                      encoding=messageEncodingType)
 710  
 711                  queues.UISignalQueue.put((
 712                      'displayNewSentMessage', (
 713                          toAddress, '[Broadcast subscribers]', fromAddress,
 714                          subject, message, ackdata)
 715                  ))
 716                  queues.workerQueue.put(('sendbroadcast', ''))
 717  
 718          # Don't send ACK if invalid, blacklisted senders, invisible
 719          # messages, disabled or chan
 720          if (
 721              self.ackDataHasAValidHeader(ackData) and not blockMessage
 722              and messageEncodingType != 0
 723              and not config.safeGetBoolean(toAddress, 'dontsendack')
 724              and not config.safeGetBoolean(toAddress, 'chan')
 725          ):
 726              ackPayload = ackData[24:]
 727              objectType, toStreamNumber, expiresTime = \
 728                  protocol.decodeObjectParameters(ackPayload)
 729              inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
 730              state.Inventory[inventoryHash] = (
 731                  objectType, toStreamNumber, ackPayload, expiresTime, b'')
 732              invQueue.put((toStreamNumber, inventoryHash))
 733  
 734          # Display timing data
 735          timeRequiredToAttemptToDecryptMessage = time.time(
 736          ) - messageProcessingStartTime
 737          self.successfullyDecryptMessageTimings.append(
 738              timeRequiredToAttemptToDecryptMessage)
 739          timing_sum = 0
 740          for item in self.successfullyDecryptMessageTimings:
 741              timing_sum += item
 742          logger.debug(
 743              'Time to decrypt this message successfully: %s'
 744              '\nAverage time for all message decryption successes since'
 745              ' startup: %s.',
 746              timeRequiredToAttemptToDecryptMessage,
 747              timing_sum / len(self.successfullyDecryptMessageTimings)
 748          )
 749  
 750      def processbroadcast(self, data):
 751          """Process a broadcast object"""
 752          messageProcessingStartTime = time.time()
 753          state.numberOfBroadcastsProcessed += 1
 754          queues.UISignalQueue.put((
 755              'updateNumberOfBroadcastsProcessed', 'no data'))
 756          inventoryHash = highlevelcrypto.calculateInventoryHash(data)
 757          readPosition = 20  # bypass the nonce, time, and object type
 758          broadcastVersion, broadcastVersionLength = decodeVarint(
 759              data[readPosition:readPosition + 9])
 760          readPosition += broadcastVersionLength
 761          if broadcastVersion < 4 or broadcastVersion > 5:
 762              return logger.info(
 763                  'Cannot decode incoming broadcast versions less than 4'
 764                  ' or higher than 5. Assuming the sender isn\'t being silly,'
 765                  ' you should upgrade Bitmessage because this message shall'
 766                  ' be ignored.'
 767              )
 768          cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint(
 769              data[readPosition:readPosition + 10])
 770          readPosition += cleartextStreamNumberLength
 771          if broadcastVersion == 4:
 772              # v4 broadcasts are encrypted the same way the msgs are
 773              # encrypted. To see if we are interested in a v4 broadcast,
 774              # we try to decrypt it. This was replaced with v5 broadcasts
 775              # which include a tag which we check instead, just like we do
 776              # with v4 pubkeys.
 777              signedData = data[8:readPosition]
 778              initialDecryptionSuccessful = False
 779              for key, cryptorObject in sorted(
 780                      shared.MyECSubscriptionCryptorObjects.items(),
 781                      key=lambda x: random.random()):  # nosec B311
 782                  try:
 783                      # continue decryption attempts to avoid timing attacks
 784                      if initialDecryptionSuccessful:
 785                          cryptorObject.decrypt(data[readPosition:])
 786                      else:
 787                          decryptedData = cryptorObject.decrypt(
 788                              data[readPosition:])
 789                          # This is the RIPE hash of the sender's pubkey.
 790                          # We need this below to compare to the RIPE hash
 791                          # of the sender's address to verify that it was
 792                          # encrypted by with their key rather than some
 793                          # other key.
 794                          toRipe = key
 795                          initialDecryptionSuccessful = True
 796                          logger.info(
 797                              'EC decryption successful using key associated'
 798                              ' with ripe hash: %s', hexlify(key))
 799                  except Exception:
 800                      logger.debug(
 801                          'cryptorObject.decrypt Exception:', exc_info=True)
 802              if not initialDecryptionSuccessful:
 803                  # This is not a broadcast I am interested in.
 804                  return logger.debug(
 805                      'Length of time program spent failing to decrypt this'
 806                      ' v4 broadcast: %s seconds.',
 807                      time.time() - messageProcessingStartTime)
 808          elif broadcastVersion == 5:
 809              embeddedTag = data[readPosition:readPosition + 32]
 810              readPosition += 32
 811              if embeddedTag not in shared.MyECSubscriptionCryptorObjects:
 812                  logger.debug('We\'re not interested in this broadcast.')
 813                  return
 814              # We are interested in this broadcast because of its tag.
 815              # We're going to add some more data which is signed further down.
 816              signedData = data[8:readPosition]
 817              cryptorObject = shared.MyECSubscriptionCryptorObjects[embeddedTag]
 818              try:
 819                  decryptedData = cryptorObject.decrypt(data[readPosition:])
 820                  logger.debug('EC decryption successful')
 821              except Exception:
 822                  return logger.debug(
 823                      'Broadcast version %s decryption Unsuccessful.',
 824                      broadcastVersion)
 825          # At this point this is a broadcast I have decrypted and am
 826          # interested in.
 827          readPosition = 0
 828          sendersAddressVersion, sendersAddressVersionLength = decodeVarint(
 829              decryptedData[readPosition:readPosition + 9])
 830          if broadcastVersion == 4:
 831              if sendersAddressVersion < 2 or sendersAddressVersion > 3:
 832                  return logger.warning(
 833                      'Cannot decode senderAddressVersion other than 2 or 3.'
 834                      ' Assuming the sender isn\'t being silly, you should'
 835                      ' upgrade Bitmessage because this message shall be'
 836                      ' ignored.'
 837                  )
 838          elif broadcastVersion == 5:
 839              if sendersAddressVersion < 4:
 840                  return logger.info(
 841                      'Cannot decode senderAddressVersion less than 4 for'
 842                      ' broadcast version number 5. Assuming the sender'
 843                      ' isn\'t being silly, you should upgrade Bitmessage'
 844                      ' because this message shall be ignored.'
 845                  )
 846          readPosition += sendersAddressVersionLength
 847          sendersStream, sendersStreamLength = decodeVarint(
 848              decryptedData[readPosition:readPosition + 9])
 849          if sendersStream != cleartextStreamNumber:
 850              return logger.info(
 851                  'The stream number outside of the encryption on which the'
 852                  ' POW was completed doesn\'t match the stream number'
 853                  ' inside the encryption. Ignoring broadcast.'
 854              )
 855          readPosition += sendersStreamLength
 856          readPosition += 4
 857          sendersPubSigningKey = '\x04' + \
 858              decryptedData[readPosition:readPosition + 64]
 859          readPosition += 64
 860          sendersPubEncryptionKey = '\x04' + \
 861              decryptedData[readPosition:readPosition + 64]
 862          readPosition += 64
 863          if sendersAddressVersion >= 3:
 864              requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = \
 865                  decodeVarint(decryptedData[readPosition:readPosition + 10])
 866              readPosition += varintLength
 867              logger.debug(
 868                  'sender\'s requiredAverageProofOfWorkNonceTrialsPerByte'
 869                  ' is %s', requiredAverageProofOfWorkNonceTrialsPerByte)
 870              requiredPayloadLengthExtraBytes, varintLength = decodeVarint(
 871                  decryptedData[readPosition:readPosition + 10])
 872              readPosition += varintLength
 873              logger.debug(
 874                  'sender\'s requiredPayloadLengthExtraBytes is %s',
 875                  requiredPayloadLengthExtraBytes)
 876          endOfPubkeyPosition = readPosition
 877  
 878          calculatedRipe = highlevelcrypto.to_ripe(
 879              sendersPubSigningKey, sendersPubEncryptionKey)
 880  
 881          if broadcastVersion == 4:
 882              if toRipe != calculatedRipe:
 883                  return logger.info(
 884                      'The encryption key used to encrypt this message'
 885                      ' doesn\'t match the keys inbedded in the message'
 886                      ' itself. Ignoring message.'
 887                  )
 888          elif broadcastVersion == 5:
 889              calculatedTag = highlevelcrypto.double_sha512(
 890                  encodeVarint(sendersAddressVersion)
 891                  + encodeVarint(sendersStream) + calculatedRipe
 892              )[32:]
 893              if calculatedTag != embeddedTag:
 894                  return logger.debug(
 895                      'The tag and encryption key used to encrypt this'
 896                      ' message doesn\'t match the keys inbedded in the'
 897                      ' message itself. Ignoring message.'
 898                  )
 899          messageEncodingType, messageEncodingTypeLength = decodeVarint(
 900              decryptedData[readPosition:readPosition + 9])
 901          if messageEncodingType == 0:
 902              return
 903          readPosition += messageEncodingTypeLength
 904          messageLength, messageLengthLength = decodeVarint(
 905              decryptedData[readPosition:readPosition + 9])
 906          readPosition += messageLengthLength
 907          message = decryptedData[readPosition:readPosition + messageLength]
 908          readPosition += messageLength
 909          readPositionAtBottomOfMessage = readPosition
 910          signatureLength, signatureLengthLength = decodeVarint(
 911              decryptedData[readPosition:readPosition + 9])
 912          readPosition += signatureLengthLength
 913          signature = decryptedData[
 914              readPosition:readPosition + signatureLength]
 915          signedData += decryptedData[:readPositionAtBottomOfMessage]
 916          if not highlevelcrypto.verify(
 917                  signedData, signature, hexlify(sendersPubSigningKey)):
 918              logger.debug('ECDSA verify failed')
 919              return
 920          logger.debug('ECDSA verify passed')
 921          # Used to detect and ignore duplicate messages in our inbox
 922          sigHash = highlevelcrypto.double_sha512(signature)[32:]
 923  
 924          fromAddress = encodeAddress(
 925              sendersAddressVersion, sendersStream, calculatedRipe)
 926          logger.info('fromAddress: %s', fromAddress)
 927  
 928          # Let's store the public key in case we want to reply to this person.
 929          sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''',
 930                     fromAddress,
 931                     sendersAddressVersion,
 932                     decryptedData[:endOfPubkeyPosition],
 933                     int(time.time()),
 934                     'yes')
 935  
 936          # Check to see whether we happen to be awaiting this
 937          # pubkey in order to send a message. If we are, it will do the POW
 938          # and send it.
 939          self.possibleNewPubkey(fromAddress)
 940  
 941          try:
 942              decodedMessage = helper_msgcoding.MsgDecode(
 943                  messageEncodingType, message)
 944          except helper_msgcoding.MsgDecodeException:
 945              return
 946          subject = decodedMessage.subject
 947          body = decodedMessage.body
 948  
 949          toAddress = '[Broadcast subscribers]'
 950          if helper_inbox.isMessageAlreadyInInbox(sigHash):
 951              logger.info('This broadcast is already in our inbox. Ignoring it.')
 952              return
 953          t = (inventoryHash, toAddress, fromAddress, subject, int(
 954              time.time()), body, 'inbox', messageEncodingType, 0, sigHash)
 955          helper_inbox.insert(t)
 956  
 957          queues.UISignalQueue.put(('displayNewInboxMessage', (
 958              inventoryHash, toAddress, fromAddress, subject, body)))
 959  
 960          # If we are behaving as an API then we might need to run an
 961          # outside command to let some program know that a new message
 962          # has arrived.
 963          if config.safeGetBoolean('bitmessagesettings', 'apienabled'):
 964              apiNotifyPath = config.safeGet(
 965                  'bitmessagesettings', 'apinotifypath')
 966              if apiNotifyPath:
 967                  subprocess.call([apiNotifyPath, "newBroadcast"])  # nosec B603
 968  
 969          # Display timing data
 970          logger.info(
 971              'Time spent processing this interesting broadcast: %s',
 972              time.time() - messageProcessingStartTime)
 973  
 974      def possibleNewPubkey(self, address):
 975          """
 976          We have inserted a pubkey into our pubkey table which we received
 977          from a pubkey, msg, or broadcast message. It might be one that we
 978          have been waiting for. Let's check.
 979          """
 980  
 981          # For address versions <= 3, we wait on a key with the correct
 982          # address version, stream number and RIPE hash.
 983          addressVersion, streamNumber, ripe = decodeAddress(address)[1:]
 984          if addressVersion <= 3:
 985              if address in state.neededPubkeys:
 986                  del state.neededPubkeys[address]
 987                  self.sendMessages(address)
 988              else:
 989                  logger.debug(
 990                      'We don\'t need this pub key. We didn\'t ask for it.'
 991                      ' For address: %s', address)
 992          # For address versions >= 4, we wait on a pubkey with the correct tag.
 993          # Let us create the tag from the address and see if we were waiting
 994          # for it.
 995          elif addressVersion >= 4:
 996              tag = highlevelcrypto.double_sha512(
 997                  encodeVarint(addressVersion) + encodeVarint(streamNumber)
 998                  + ripe
 999              )[32:]
1000              if tag in state.neededPubkeys:
1001                  del state.neededPubkeys[tag]
1002                  self.sendMessages(address)
1003  
1004      @staticmethod
1005      def sendMessages(address):
1006          """
1007          This method is called by the `possibleNewPubkey` when it sees
1008          that we now have the necessary pubkey to send one or more messages.
1009          """
1010          logger.info('We have been awaiting the arrival of this pubkey.')
1011          sqlExecute(
1012              "UPDATE sent SET status='doingmsgpow', retrynumber=0"
1013              " WHERE toaddress=?"
1014              " AND (status='awaitingpubkey' OR status='doingpubkeypow')"
1015              " AND folder='sent'", address)
1016          queues.workerQueue.put(('sendmessage', ''))
1017  
1018      @staticmethod
1019      def ackDataHasAValidHeader(ackData):
1020          """Checking ackData with valid Header, not sending ackData when false"""
1021          if len(ackData) < protocol.Header.size:
1022              logger.info(
1023                  'The length of ackData is unreasonably short. Not sending'
1024                  ' ackData.')
1025              return False
1026  
1027          magic, command, payloadLength, checksum = protocol.Header.unpack(
1028              ackData[:protocol.Header.size])
1029          if magic != protocol.magic:
1030              logger.info('Ackdata magic bytes were wrong. Not sending ackData.')
1031              return False
1032          payload = ackData[protocol.Header.size:]
1033          if len(payload) != payloadLength:
1034              logger.info(
1035                  'ackData payload length doesn\'t match the payload length'
1036                  ' specified in the header. Not sending ackdata.')
1037              return False
1038          # ~1.6 MB which is the maximum possible size of an inv message.
1039          if payloadLength > 1600100:
1040              # The largest message should be either an inv or a getdata
1041              # message at 1.6 MB in size.
1042              # That doesn't mean that the object may be that big. The
1043              # shared.checkAndShareObjectWithPeers function will verify
1044              # that it is no larger than 2^18 bytes.
1045              return False
1046          # test the checksum in the message.
1047          if checksum != hashlib.sha512(payload).digest()[0:4]:
1048              logger.info('ackdata checksum wrong. Not sending ackdata.')
1049              return False
1050          command = command.rstrip('\x00')
1051          if command != 'object':
1052              return False
1053          return True
1054  
1055      @staticmethod
1056      def addMailingListNameToSubject(subject, mailingListName):
1057          """Adding mailingListName to subject"""
1058          subject = subject.strip()
1059          if subject[:3] == 'Re:' or subject[:3] == 'RE:':
1060              subject = subject[3:].strip()
1061          if '[' + mailingListName + ']' in subject:
1062              return subject
1063          return '[' + mailingListName + '] ' + subject