/ src / network / invthread.py
invthread.py
  1  """
  2  Thread to send inv annoucements
  3  """
  4  import queue
  5  import random
  6  from time import time
  7  
  8  import addresses
  9  import protocol
 10  import state
 11  from . import connectionpool
 12  from network import dandelion_ins, invQueue
 13  from .threads import StoppableThread
 14  
 15  
 16  def handleExpiredDandelion(expired):
 17      """For expired dandelion objects, mark all remotes as not having
 18         the object"""
 19      if not expired:
 20          return
 21      for i in connectionpool.pool.connections():
 22          if not i.fullyEstablished:
 23              continue
 24          for x in expired:
 25              streamNumber, hashid, _ = x
 26              try:
 27                  del i.objectsNewToMe[hashid]
 28              except KeyError:
 29                  if streamNumber in i.streams:
 30                      with i.objectsNewToThemLock:
 31                          i.objectsNewToThem[hashid] = time()
 32  
 33  
 34  class InvThread(StoppableThread):
 35      """Main thread that sends inv annoucements"""
 36  
 37      name = "InvBroadcaster"
 38  
 39      @staticmethod
 40      def handleLocallyGenerated(stream, hashId):
 41          """Locally generated inventory items require special handling"""
 42          dandelion_ins.addHash(hashId, stream=stream)
 43          for connection in connectionpool.pool.connections():
 44              if dandelion_ins.enabled and connection != \
 45                      dandelion_ins.objectChildStem(hashId):
 46                  continue
 47              connection.objectsNewToThem[hashId] = time()
 48  
 49      def run(self):  # pylint: disable=too-many-branches
 50          while not state.shutdown:  # pylint: disable=too-many-nested-blocks
 51              chunk = []
 52              while True:
 53                  # Dandelion fluff trigger by expiration
 54                  handleExpiredDandelion(dandelion_ins.expire(invQueue))
 55                  try:
 56                      data = invQueue.get(False)
 57                      chunk.append((data[0], data[1]))
 58                      # locally generated
 59                      if len(data) == 2 or data[2] is None:
 60                          self.handleLocallyGenerated(data[0], data[1])
 61                  except queue.Empty:
 62                      break
 63  
 64              if chunk:
 65                  for connection in connectionpool.pool.connections():
 66                      fluffs = []
 67                      stems = []
 68                      for inv in chunk:
 69                          if inv[0] not in connection.streams:
 70                              continue
 71                          try:
 72                              with connection.objectsNewToThemLock:
 73                                  del connection.objectsNewToThem[inv[1]]
 74                          except KeyError:
 75                              continue
 76                          try:
 77                              if connection == dandelion_ins.objectChildStem(inv[1]):
 78                                  # Fluff trigger by RNG
 79                                  # auto-ignore if config set to 0, i.e. dandelion is off
 80                                  if random.randint(1, 100) >= dandelion_ins.enabled:  # nosec B311
 81                                      fluffs.append(inv[1])
 82                                  # send a dinv only if the stem node supports dandelion
 83                                  elif connection.services & protocol.NODE_DANDELION > 0:
 84                                      stems.append(inv[1])
 85                                  else:
 86                                      fluffs.append(inv[1])
 87                          except KeyError:
 88                              fluffs.append(inv[1])
 89  
 90                      if fluffs:
 91                          random.shuffle(fluffs)
 92                          connection.append_write_buf(protocol.CreatePacket(
 93                              'inv',
 94                              addresses.encodeVarint(
 95                                  len(fluffs)) + ''.join(fluffs)))
 96                      if stems:
 97                          random.shuffle(stems)
 98                          connection.append_write_buf(protocol.CreatePacket(
 99                              'dinv',
100                              addresses.encodeVarint(
101                                  len(stems)) + ''.join(stems)))
102  
103              invQueue.iterate()
104              for _ in range(len(chunk)):
105                  invQueue.task_done()
106  
107              dandelion_ins.reRandomiseStems()
108  
109              self.stop.wait(1)