/ src / network / objectracker.py
objectracker.py
  1  """
  2  Module for tracking objects
  3  """
  4  import time
  5  from threading import RLock
  6  
  7  from network import dandelion_ins
  8  from randomtrackingdict import RandomTrackingDict
  9  
 10  haveBloom = False
 11  
 12  try:
 13      # pybloomfiltermmap
 14      from pybloomfilter import BloomFilter
 15      haveBloom = True
 16  except ImportError:
 17      try:
 18          # pybloom
 19          from pybloom import BloomFilter
 20          haveBloom = True
 21      except ImportError:
 22          pass
 23  
 24  # it isn't actually implemented yet so no point in turning it on
 25  haveBloom = False
 26  
 27  # tracking pending downloads globally, for stats
 28  missingObjects = {}
 29  
 30  
 31  class ObjectTracker(object):
 32      """Object tracker mixin"""
 33      invCleanPeriod = 300
 34      invInitialCapacity = 50000
 35      invErrorRate = 0.03
 36      trackingExpires = 3600
 37      initialTimeOffset = 60
 38  
 39      def __init__(self):
 40          self.objectsNewToMe = RandomTrackingDict()
 41          self.objectsNewToThem = {}
 42          self.objectsNewToThemLock = RLock()
 43          self.initInvBloom()
 44          self.initAddrBloom()
 45          self.lastCleaned = time.time()
 46  
 47      def initInvBloom(self):
 48          """Init bloom filter for tracking. WIP."""
 49          if haveBloom:
 50              # lock?
 51              self.invBloom = BloomFilter(
 52                  capacity=ObjectTracker.invInitialCapacity,
 53                  error_rate=ObjectTracker.invErrorRate)
 54  
 55      def initAddrBloom(self):
 56          """Init bloom filter for tracking addrs, WIP.
 57          This either needs to be moved to addrthread.py or removed."""
 58          if haveBloom:
 59              # lock?
 60              self.addrBloom = BloomFilter(
 61                  capacity=ObjectTracker.invInitialCapacity,
 62                  error_rate=ObjectTracker.invErrorRate)
 63  
 64      def clean(self):
 65          """Clean up tracking to prevent memory bloat"""
 66          if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
 67              if haveBloom:
 68                  if missingObjects == 0:
 69                      self.initInvBloom()
 70                  self.initAddrBloom()
 71              else:
 72                  # release memory
 73                  deadline = time.time() - ObjectTracker.trackingExpires
 74                  with self.objectsNewToThemLock:
 75                      self.objectsNewToThem = {
 76                          k: v
 77                          for k, v in self.objectsNewToThem.items()
 78                          if v >= deadline}
 79              self.lastCleaned = time.time()
 80  
 81      def hasObj(self, hashid):
 82          """Do we already have object?"""
 83          if haveBloom:
 84              return hashid in self.invBloom
 85          return hashid in self.objectsNewToMe
 86  
 87      def handleReceivedInventory(self, hashId):
 88          """Handling received inventory"""
 89          if haveBloom:
 90              self.invBloom.add(hashId)
 91          try:
 92              with self.objectsNewToThemLock:
 93                  del self.objectsNewToThem[hashId]
 94          except KeyError:
 95              pass
 96          if hashId not in missingObjects:
 97              missingObjects[hashId] = time.time()
 98          self.objectsNewToMe[hashId] = True
 99  
100      def handleReceivedObject(self, streamNumber, hashid):
101          import connectionpool
102          """Handling received object"""
103          for i in connectionpool.pool.connections():
104              if not i.fullyEstablished:
105                  continue
106              try:
107                  del i.objectsNewToMe[hashid]
108              except KeyError:
109                  if streamNumber in i.streams and (
110                          not dandelion_ins.hasHash(hashid)
111                          or dandelion_ins.objectChildStem(hashid) == i):
112                      with i.objectsNewToThemLock:
113                          i.objectsNewToThem[hashid] = time.time()
114                      # update stream number,
115                      # which we didn't have when we just received the dinv
116                      # also resets expiration of the stem mode
117                      dandelion_ins.setHashStream(hashid, streamNumber)
118  
119              if i == self:
120                  try:
121                      with i.objectsNewToThemLock:
122                          del i.objectsNewToThem[hashid]
123                  except KeyError:
124                      pass
125          self.objectsNewToMe.setLastObject()
126  
127      def hasAddr(self, addr):
128          """WIP, should be moved to addrthread.py or removed"""
129          if haveBloom:
130              return addr in self.invBloom
131          return None
132  
133      def addAddr(self, hashid):
134          """WIP, should be moved to addrthread.py or removed"""
135          if haveBloom:
136              self.addrBloom.add(hashid)