objectracker.py
1 """ 2 Module for tracking objects 3 """ 4 import time 5 from threading import RLock 6 7 from network import dandelion_ins 8 from randomtrackingdict import RandomTrackingDict 9 10 haveBloom = False 11 12 try: 13 # pybloomfiltermmap 14 from pybloomfilter import BloomFilter 15 haveBloom = True 16 except ImportError: 17 try: 18 # pybloom 19 from pybloom import BloomFilter 20 haveBloom = True 21 except ImportError: 22 pass 23 24 # it isn't actually implemented yet so no point in turning it on 25 haveBloom = False 26 27 # tracking pending downloads globally, for stats 28 missingObjects = {} 29 30 31 class ObjectTracker(object): 32 """Object tracker mixin""" 33 invCleanPeriod = 300 34 invInitialCapacity = 50000 35 invErrorRate = 0.03 36 trackingExpires = 3600 37 initialTimeOffset = 60 38 39 def __init__(self): 40 self.objectsNewToMe = RandomTrackingDict() 41 self.objectsNewToThem = {} 42 self.objectsNewToThemLock = RLock() 43 self.initInvBloom() 44 self.initAddrBloom() 45 self.lastCleaned = time.time() 46 47 def initInvBloom(self): 48 """Init bloom filter for tracking. WIP.""" 49 if haveBloom: 50 # lock? 51 self.invBloom = BloomFilter( 52 capacity=ObjectTracker.invInitialCapacity, 53 error_rate=ObjectTracker.invErrorRate) 54 55 def initAddrBloom(self): 56 """Init bloom filter for tracking addrs, WIP. 57 This either needs to be moved to addrthread.py or removed.""" 58 if haveBloom: 59 # lock? 60 self.addrBloom = BloomFilter( 61 capacity=ObjectTracker.invInitialCapacity, 62 error_rate=ObjectTracker.invErrorRate) 63 64 def clean(self): 65 """Clean up tracking to prevent memory bloat""" 66 if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod: 67 if haveBloom: 68 if missingObjects == 0: 69 self.initInvBloom() 70 self.initAddrBloom() 71 else: 72 # release memory 73 deadline = time.time() - ObjectTracker.trackingExpires 74 with self.objectsNewToThemLock: 75 self.objectsNewToThem = { 76 k: v 77 for k, v in self.objectsNewToThem.items() 78 if v >= deadline} 79 self.lastCleaned = time.time() 80 81 def hasObj(self, hashid): 82 """Do we already have object?""" 83 if haveBloom: 84 return hashid in self.invBloom 85 return hashid in self.objectsNewToMe 86 87 def handleReceivedInventory(self, hashId): 88 """Handling received inventory""" 89 if haveBloom: 90 self.invBloom.add(hashId) 91 try: 92 with self.objectsNewToThemLock: 93 del self.objectsNewToThem[hashId] 94 except KeyError: 95 pass 96 if hashId not in missingObjects: 97 missingObjects[hashId] = time.time() 98 self.objectsNewToMe[hashId] = True 99 100 def handleReceivedObject(self, streamNumber, hashid): 101 import connectionpool 102 """Handling received object""" 103 for i in connectionpool.pool.connections(): 104 if not i.fullyEstablished: 105 continue 106 try: 107 del i.objectsNewToMe[hashid] 108 except KeyError: 109 if streamNumber in i.streams and ( 110 not dandelion_ins.hasHash(hashid) 111 or dandelion_ins.objectChildStem(hashid) == i): 112 with i.objectsNewToThemLock: 113 i.objectsNewToThem[hashid] = time.time() 114 # update stream number, 115 # which we didn't have when we just received the dinv 116 # also resets expiration of the stem mode 117 dandelion_ins.setHashStream(hashid, streamNumber) 118 119 if i == self: 120 try: 121 with i.objectsNewToThemLock: 122 del i.objectsNewToThem[hashid] 123 except KeyError: 124 pass 125 self.objectsNewToMe.setLastObject() 126 127 def hasAddr(self, addr): 128 """WIP, should be moved to addrthread.py or removed""" 129 if haveBloom: 130 return addr in self.invBloom 131 return None 132 133 def addAddr(self, hashid): 134 """WIP, should be moved to addrthread.py or removed""" 135 if haveBloom: 136 self.addrBloom.add(hashid)