downloadthread.py
1 """ 2 `DownloadThread` class definition 3 """ 4 import time 5 import random 6 import state 7 import addresses 8 import protocol 9 from . import connectionpool 10 from network import dandelion_ins 11 from .objectracker import missingObjects 12 from .threads import StoppableThread 13 14 15 class DownloadThread(StoppableThread): 16 """Thread-based class for downloading from connections""" 17 minPending = 200 18 maxRequestChunk = 1000 19 requestTimeout = 60 20 cleanInterval = 60 21 requestExpires = 3600 22 23 def __init__(self): 24 super(DownloadThread, self).__init__(name="Downloader") 25 self.lastCleaned = time.time() 26 27 def cleanPending(self): 28 """Expire pending downloads eventually""" 29 deadline = time.time() - self.requestExpires 30 try: 31 toDelete = [ 32 k for k, v in missingObjects.items() 33 if v < deadline] 34 except RuntimeError: 35 pass 36 else: 37 for i in toDelete: 38 del missingObjects[i] 39 self.lastCleaned = time.time() 40 41 def run(self): 42 while not self._stopped: 43 requested = 0 44 # Choose downloading peers randomly 45 connections = connectionpool.pool.establishedConnections() 46 random.shuffle(connections) 47 requestChunk = max(int( 48 min(self.maxRequestChunk, len(missingObjects)) 49 / len(connections)), 1) if connections else 1 50 51 for i in connections: 52 now = time.time() 53 # avoid unnecessary delay 54 if i.skipUntil >= now: 55 continue 56 try: 57 request = i.objectsNewToMe.randomKeys(requestChunk) 58 except KeyError: 59 continue 60 payload = bytearray() 61 chunkCount = 0 62 for chunk in request: 63 if chunk in state.Inventory and not dandelion_ins.hasHash(chunk): 64 try: 65 del i.objectsNewToMe[chunk] 66 except KeyError: 67 pass 68 continue 69 payload.extend(chunk) 70 chunkCount += 1 71 missingObjects[chunk] = now 72 if not chunkCount: 73 continue 74 payload[0:0] = addresses.encodeVarint(chunkCount) 75 i.append_write_buf(protocol.CreatePacket('getdata', payload)) 76 self.logger.debug( 77 '%s:%i Requesting %i objects', 78 i.destination.host, i.destination.port, chunkCount) 79 requested += chunkCount 80 if time.time() >= self.lastCleaned + self.cleanInterval: 81 self.cleanPending() 82 if not requested: 83 self.stop.wait(1)