objectracker.py.bak
1 """ 2 Module for tracking objects 3 """ 4 import time 5 from threading import RLock 6 7 import connectionpool 8 from network import dandelion_ins 9 from randomtrackingdict import RandomTrackingDict 10 11 haveBloom = False 12 13 try: 14 # pybloomfiltermmap 15 from pybloomfilter import BloomFilter 16 haveBloom = True 17 except ImportError: 18 try: 19 # pybloom 20 from pybloom import BloomFilter 21 haveBloom = True 22 except ImportError: 23 pass 24 25 # it isn't actually implemented yet so no point in turning it on 26 haveBloom = False 27 28 # tracking pending downloads globally, for stats 29 missingObjects = {} 30 31 32 class ObjectTracker(object): 33 """Object tracker mixin""" 34 invCleanPeriod = 300 35 invInitialCapacity = 50000 36 invErrorRate = 0.03 37 trackingExpires = 3600 38 initialTimeOffset = 60 39 40 def __init__(self): 41 self.objectsNewToMe = RandomTrackingDict() 42 self.objectsNewToThem = {} 43 self.objectsNewToThemLock = RLock() 44 self.initInvBloom() 45 self.initAddrBloom() 46 self.lastCleaned = time.time() 47 48 def initInvBloom(self): 49 """Init bloom filter for tracking. WIP.""" 50 if haveBloom: 51 # lock? 52 self.invBloom = BloomFilter( 53 capacity=ObjectTracker.invInitialCapacity, 54 error_rate=ObjectTracker.invErrorRate) 55 56 def initAddrBloom(self): 57 """Init bloom filter for tracking addrs, WIP. 58 This either needs to be moved to addrthread.py or removed.""" 59 if haveBloom: 60 # lock? 61 self.addrBloom = BloomFilter( 62 capacity=ObjectTracker.invInitialCapacity, 63 error_rate=ObjectTracker.invErrorRate) 64 65 def clean(self): 66 """Clean up tracking to prevent memory bloat""" 67 if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod: 68 if haveBloom: 69 if missingObjects == 0: 70 self.initInvBloom() 71 self.initAddrBloom() 72 else: 73 # release memory 74 deadline = time.time() - ObjectTracker.trackingExpires 75 with self.objectsNewToThemLock: 76 self.objectsNewToThem = { 77 k: v 78 for k, v in self.objectsNewToThem.iteritems() 79 if v >= deadline} 80 self.lastCleaned = time.time() 81 82 def hasObj(self, hashid): 83 """Do we already have object?""" 84 if haveBloom: 85 return hashid in self.invBloom 86 return hashid in self.objectsNewToMe 87 88 def handleReceivedInventory(self, hashId): 89 """Handling received inventory""" 90 if haveBloom: 91 self.invBloom.add(hashId) 92 try: 93 with self.objectsNewToThemLock: 94 del self.objectsNewToThem[hashId] 95 except KeyError: 96 pass 97 if hashId not in missingObjects: 98 missingObjects[hashId] = time.time() 99 self.objectsNewToMe[hashId] = True 100 101 def handleReceivedObject(self, streamNumber, hashid): 102 """Handling received object""" 103 for i in connectionpool.pool.connections(): 104 if not i.fullyEstablished: 105 continue 106 try: 107 del i.objectsNewToMe[hashid] 108 except KeyError: 109 if streamNumber in i.streams and ( 110 not dandelion_ins.hasHash(hashid) 111 or dandelion_ins.objectChildStem(hashid) == i): 112 with i.objectsNewToThemLock: 113 i.objectsNewToThem[hashid] = time.time() 114 # update stream number, 115 # which we didn't have when we just received the dinv 116 # also resets expiration of the stem mode 117 dandelion_ins.setHashStream(hashid, streamNumber) 118 119 if i == self: 120 try: 121 with i.objectsNewToThemLock: 122 del i.objectsNewToThem[hashid] 123 except KeyError: 124 pass 125 self.objectsNewToMe.setLastObject() 126 127 def hasAddr(self, addr): 128 """WIP, should be moved to addrthread.py or removed""" 129 if haveBloom: 130 return addr in self.invBloom 131 return None 132 133 def addAddr(self, hashid): 134 """WIP, should be moved to addrthread.py or removed""" 135 if haveBloom: 136 self.addrBloom.add(hashid)