/ src / network / downloadthread.py
downloadthread.py
 1  """
 2  `DownloadThread` class definition
 3  """
 4  import time
 5  import random
 6  import state
 7  import addresses
 8  import protocol
 9  from . import connectionpool
10  from network import dandelion_ins
11  from .objectracker import missingObjects
12  from .threads import StoppableThread
13  
14  
15  class DownloadThread(StoppableThread):
16      """Thread-based class for downloading from connections"""
17      minPending = 200
18      maxRequestChunk = 1000
19      requestTimeout = 60
20      cleanInterval = 60
21      requestExpires = 3600
22  
23      def __init__(self):
24          super(DownloadThread, self).__init__(name="Downloader")
25          self.lastCleaned = time.time()
26  
27      def cleanPending(self):
28          """Expire pending downloads eventually"""
29          deadline = time.time() - self.requestExpires
30          try:
31              toDelete = [
32                  k for k, v in missingObjects.items()
33                  if v < deadline]
34          except RuntimeError:
35              pass
36          else:
37              for i in toDelete:
38                  del missingObjects[i]
39              self.lastCleaned = time.time()
40  
41      def run(self):
42          while not self._stopped:
43              requested = 0
44              # Choose downloading peers randomly
45              connections = connectionpool.pool.establishedConnections()
46              random.shuffle(connections)
47              requestChunk = max(int(
48                  min(self.maxRequestChunk, len(missingObjects))
49                  / len(connections)), 1) if connections else 1
50  
51              for i in connections:
52                  now = time.time()
53                  # avoid unnecessary delay
54                  if i.skipUntil >= now:
55                      continue
56                  try:
57                      request = i.objectsNewToMe.randomKeys(requestChunk)
58                  except KeyError:
59                      continue
60                  payload = bytearray()
61                  chunkCount = 0
62                  for chunk in request:
63                      if chunk in state.Inventory and not dandelion_ins.hasHash(chunk):
64                          try:
65                              del i.objectsNewToMe[chunk]
66                          except KeyError:
67                              pass
68                          continue
69                      payload.extend(chunk)
70                      chunkCount += 1
71                      missingObjects[chunk] = now
72                  if not chunkCount:
73                      continue
74                  payload[0:0] = addresses.encodeVarint(chunkCount)
75                  i.append_write_buf(protocol.CreatePacket('getdata', payload))
76                  self.logger.debug(
77                      '%s:%i Requesting %i objects',
78                      i.destination.host, i.destination.port, chunkCount)
79                  requested += chunkCount
80              if time.time() >= self.lastCleaned + self.cleanInterval:
81                  self.cleanPending()
82              if not requested:
83                  self.stop.wait(1)