/ src / network / objectracker.py.bak
objectracker.py.bak
  1  """
  2  Module for tracking objects
  3  """
  4  import time
  5  from threading import RLock
  6  
  7  import connectionpool
  8  from network import dandelion_ins
  9  from randomtrackingdict import RandomTrackingDict
 10  
 11  haveBloom = False
 12  
 13  try:
 14      # pybloomfiltermmap
 15      from pybloomfilter import BloomFilter
 16      haveBloom = True
 17  except ImportError:
 18      try:
 19          # pybloom
 20          from pybloom import BloomFilter
 21          haveBloom = True
 22      except ImportError:
 23          pass
 24  
 25  # it isn't actually implemented yet so no point in turning it on
 26  haveBloom = False
 27  
 28  # tracking pending downloads globally, for stats
 29  missingObjects = {}
 30  
 31  
 32  class ObjectTracker(object):
 33      """Object tracker mixin"""
 34      invCleanPeriod = 300
 35      invInitialCapacity = 50000
 36      invErrorRate = 0.03
 37      trackingExpires = 3600
 38      initialTimeOffset = 60
 39  
 40      def __init__(self):
 41          self.objectsNewToMe = RandomTrackingDict()
 42          self.objectsNewToThem = {}
 43          self.objectsNewToThemLock = RLock()
 44          self.initInvBloom()
 45          self.initAddrBloom()
 46          self.lastCleaned = time.time()
 47  
 48      def initInvBloom(self):
 49          """Init bloom filter for tracking. WIP."""
 50          if haveBloom:
 51              # lock?
 52              self.invBloom = BloomFilter(
 53                  capacity=ObjectTracker.invInitialCapacity,
 54                  error_rate=ObjectTracker.invErrorRate)
 55  
 56      def initAddrBloom(self):
 57          """Init bloom filter for tracking addrs, WIP.
 58          This either needs to be moved to addrthread.py or removed."""
 59          if haveBloom:
 60              # lock?
 61              self.addrBloom = BloomFilter(
 62                  capacity=ObjectTracker.invInitialCapacity,
 63                  error_rate=ObjectTracker.invErrorRate)
 64  
 65      def clean(self):
 66          """Clean up tracking to prevent memory bloat"""
 67          if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
 68              if haveBloom:
 69                  if missingObjects == 0:
 70                      self.initInvBloom()
 71                  self.initAddrBloom()
 72              else:
 73                  # release memory
 74                  deadline = time.time() - ObjectTracker.trackingExpires
 75                  with self.objectsNewToThemLock:
 76                      self.objectsNewToThem = {
 77                          k: v
 78                          for k, v in self.objectsNewToThem.iteritems()
 79                          if v >= deadline}
 80              self.lastCleaned = time.time()
 81  
 82      def hasObj(self, hashid):
 83          """Do we already have object?"""
 84          if haveBloom:
 85              return hashid in self.invBloom
 86          return hashid in self.objectsNewToMe
 87  
 88      def handleReceivedInventory(self, hashId):
 89          """Handling received inventory"""
 90          if haveBloom:
 91              self.invBloom.add(hashId)
 92          try:
 93              with self.objectsNewToThemLock:
 94                  del self.objectsNewToThem[hashId]
 95          except KeyError:
 96              pass
 97          if hashId not in missingObjects:
 98              missingObjects[hashId] = time.time()
 99          self.objectsNewToMe[hashId] = True
100  
101      def handleReceivedObject(self, streamNumber, hashid):
102          """Handling received object"""
103          for i in connectionpool.pool.connections():
104              if not i.fullyEstablished:
105                  continue
106              try:
107                  del i.objectsNewToMe[hashid]
108              except KeyError:
109                  if streamNumber in i.streams and (
110                          not dandelion_ins.hasHash(hashid)
111                          or dandelion_ins.objectChildStem(hashid) == i):
112                      with i.objectsNewToThemLock:
113                          i.objectsNewToThem[hashid] = time.time()
114                      # update stream number,
115                      # which we didn't have when we just received the dinv
116                      # also resets expiration of the stem mode
117                      dandelion_ins.setHashStream(hashid, streamNumber)
118  
119              if i == self:
120                  try:
121                      with i.objectsNewToThemLock:
122                          del i.objectsNewToThem[hashid]
123                  except KeyError:
124                      pass
125          self.objectsNewToMe.setLastObject()
126  
127      def hasAddr(self, addr):
128          """WIP, should be moved to addrthread.py or removed"""
129          if haveBloom:
130              return addr in self.invBloom
131          return None
132  
133      def addAddr(self, hashid):
134          """WIP, should be moved to addrthread.py or removed"""
135          if haveBloom:
136              self.addrBloom.add(hashid)