invthread.py
1 """ 2 Thread to send inv annoucements 3 """ 4 import queue 5 import random 6 from time import time 7 8 import addresses 9 import protocol 10 import state 11 from . import connectionpool 12 from network import dandelion_ins, invQueue 13 from .threads import StoppableThread 14 15 16 def handleExpiredDandelion(expired): 17 """For expired dandelion objects, mark all remotes as not having 18 the object""" 19 if not expired: 20 return 21 for i in connectionpool.pool.connections(): 22 if not i.fullyEstablished: 23 continue 24 for x in expired: 25 streamNumber, hashid, _ = x 26 try: 27 del i.objectsNewToMe[hashid] 28 except KeyError: 29 if streamNumber in i.streams: 30 with i.objectsNewToThemLock: 31 i.objectsNewToThem[hashid] = time() 32 33 34 class InvThread(StoppableThread): 35 """Main thread that sends inv annoucements""" 36 37 name = "InvBroadcaster" 38 39 @staticmethod 40 def handleLocallyGenerated(stream, hashId): 41 """Locally generated inventory items require special handling""" 42 dandelion_ins.addHash(hashId, stream=stream) 43 for connection in connectionpool.pool.connections(): 44 if dandelion_ins.enabled and connection != \ 45 dandelion_ins.objectChildStem(hashId): 46 continue 47 connection.objectsNewToThem[hashId] = time() 48 49 def run(self): # pylint: disable=too-many-branches 50 while not state.shutdown: # pylint: disable=too-many-nested-blocks 51 chunk = [] 52 while True: 53 # Dandelion fluff trigger by expiration 54 handleExpiredDandelion(dandelion_ins.expire(invQueue)) 55 try: 56 data = invQueue.get(False) 57 chunk.append((data[0], data[1])) 58 # locally generated 59 if len(data) == 2 or data[2] is None: 60 self.handleLocallyGenerated(data[0], data[1]) 61 except queue.Empty: 62 break 63 64 if chunk: 65 for connection in connectionpool.pool.connections(): 66 fluffs = [] 67 stems = [] 68 for inv in chunk: 69 if inv[0] not in connection.streams: 70 continue 71 try: 72 with connection.objectsNewToThemLock: 73 del connection.objectsNewToThem[inv[1]] 74 except KeyError: 75 continue 76 try: 77 if connection == dandelion_ins.objectChildStem(inv[1]): 78 # Fluff trigger by RNG 79 # auto-ignore if config set to 0, i.e. dandelion is off 80 if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311 81 fluffs.append(inv[1]) 82 # send a dinv only if the stem node supports dandelion 83 elif connection.services & protocol.NODE_DANDELION > 0: 84 stems.append(inv[1]) 85 else: 86 fluffs.append(inv[1]) 87 except KeyError: 88 fluffs.append(inv[1]) 89 90 if fluffs: 91 random.shuffle(fluffs) 92 connection.append_write_buf(protocol.CreatePacket( 93 'inv', 94 addresses.encodeVarint( 95 len(fluffs)) + ''.join(fluffs))) 96 if stems: 97 random.shuffle(stems) 98 connection.append_write_buf(protocol.CreatePacket( 99 'dinv', 100 addresses.encodeVarint( 101 len(stems)) + ''.join(stems))) 102 103 invQueue.iterate() 104 for _ in range(len(chunk)): 105 invQueue.task_done() 106 107 dandelion_ins.reRandomiseStems() 108 109 self.stop.wait(1)