class_singleCleaner.py.bak
1 """ 2 The `singleCleaner` class is a timer-driven thread that cleans data structures 3 to free memory, resends messages when a remote node doesn't respond, and 4 sends pong messages to keep connections alive if the network isn't busy. 5 6 It cleans these data structures in memory: 7 - inventory (moves data to the on-disk sql database) 8 - inventorySets (clears then reloads data out of sql database) 9 10 It cleans these tables on the disk: 11 - inventory (clears expired objects) 12 - pubkeys (clears pubkeys older than 4 weeks old which we have not used 13 personally) 14 - knownNodes (clears addresses which have not been online for over 3 days) 15 16 It resends messages when there has been no response: 17 - resends getpubkey messages in 5 days (then 10 days, then 20 days, etc...) 18 - resends msg messages in 5 days (then 10 days, then 20 days, etc...) 19 20 """ 21 22 import gc 23 import os 24 import time 25 26 import queues 27 import state 28 from bmconfigparser import config 29 from helper_sql import sqlExecute, sqlQuery 30 from network import connectionpool, knownnodes, StoppableThread 31 from tr import _translate 32 33 34 #: Equals 4 weeks. You could make this longer if you want 35 #: but making it shorter would not be advisable because 36 #: there is a very small possibility that it could keep you 37 #: from obtaining a needed pubkey for a period of time. 38 lengthOfTimeToHoldOnToAllPubkeys = 2419200 39 40 41 class singleCleaner(StoppableThread): 42 """The singleCleaner thread class""" 43 name = "singleCleaner" 44 cycleLength = 300 45 expireDiscoveredPeers = 300 46 47 def run(self): # pylint: disable=too-many-branches 48 gc.disable() 49 timeWeLastClearedInventoryAndPubkeysTables = 0 50 try: 51 state.maximumLengthOfTimeToBotherResendingMessages = ( 52 config.getfloat( 53 'bitmessagesettings', 'stopresendingafterxdays') 54 * 24 * 60 * 60 55 ) + ( 56 config.getfloat( 57 'bitmessagesettings', 'stopresendingafterxmonths') 58 * (60 * 60 * 24 * 365) / 12) 59 except: # noqa:E722 60 # Either the user hasn't set stopresendingafterxdays and 61 # stopresendingafterxmonths yet or the options are missing 62 # from the config file. 63 state.maximumLengthOfTimeToBotherResendingMessages = float('inf') 64 65 while state.shutdown == 0: 66 self.stop.wait(self.cycleLength) 67 queues.UISignalQueue.put(( 68 'updateStatusBar', 69 'Doing housekeeping (Flushing inventory in memory to disk...)' 70 )) 71 state.Inventory.flush() 72 queues.UISignalQueue.put(('updateStatusBar', '')) 73 74 # If we are running as a daemon then we are going to fill up the UI 75 # queue which will never be handled by a UI. We should clear it to 76 # save memory. 77 # FIXME redundant? 78 if state.thisapp.daemon or not state.enableGUI: 79 queues.UISignalQueue.queue.clear() 80 81 tick = int(time.time()) 82 if timeWeLastClearedInventoryAndPubkeysTables < tick - 7380: 83 timeWeLastClearedInventoryAndPubkeysTables = tick 84 state.Inventory.clean() 85 queues.workerQueue.put(('sendOnionPeerObj', '')) 86 # pubkeys 87 sqlExecute( 88 "DELETE FROM pubkeys WHERE time<? AND usedpersonally='no'", 89 tick - lengthOfTimeToHoldOnToAllPubkeys) 90 91 # Let us resend getpubkey objects if we have not yet heard 92 # a pubkey, and also msg objects if we have not yet heard 93 # an acknowledgement 94 queryreturn = sqlQuery( 95 "SELECT toaddress, ackdata, status FROM sent" 96 " WHERE ((status='awaitingpubkey' OR status='msgsent')" 97 " AND folder='sent' AND sleeptill<? AND senttime>?)", 98 tick, 99 tick - state.maximumLengthOfTimeToBotherResendingMessages 100 ) 101 for toAddress, ackData, status in queryreturn: 102 if status == 'awaitingpubkey': 103 self.resendPubkeyRequest(toAddress) 104 elif status == 'msgsent': 105 self.resendMsg(ackData) 106 107 try: 108 # Cleanup knownnodes and handle possible severe exception 109 # while writing it to disk 110 if state.enableNetwork: 111 knownnodes.cleanupKnownNodes(connectionpool.pool) 112 except Exception as err: 113 if "Errno 28" in str(err): 114 self.logger.fatal( 115 '(while writing knownnodes to disk)' 116 ' Alert: Your disk or data storage volume is full.' 117 ) 118 queues.UISignalQueue.put(( 119 'alert', 120 (_translate("MainWindow", "Disk full"), 121 _translate( 122 "MainWindow", 123 'Alert: Your disk or data storage volume' 124 ' is full. Bitmessage will now exit.'), 125 True) 126 )) 127 # FIXME redundant? 128 if state.thisapp.daemon or not state.enableGUI: 129 os._exit(1) # pylint: disable=protected-access 130 131 # inv/object tracking 132 for connection in connectionpool.pool.connections(): 133 connection.clean() 134 135 # discovery tracking 136 exp = time.time() - singleCleaner.expireDiscoveredPeers 137 reaper = (k for k, v in state.discoveredPeers.items() if v < exp) 138 for k in reaper: 139 try: 140 del state.discoveredPeers[k] 141 except KeyError: 142 pass 143 # ..todo:: cleanup pending upload / download 144 145 gc.collect() 146 147 def resendPubkeyRequest(self, address): 148 """Resend pubkey request for address""" 149 self.logger.debug( 150 'It has been a long time and we haven\'t heard a response to our' 151 ' getpubkey request. Sending again.' 152 ) 153 try: 154 # We need to take this entry out of the neededPubkeys structure 155 # because the queues.workerQueue checks to see whether the entry 156 # is already present and will not do the POW and send the message 157 # because it assumes that it has already done it recently. 158 del state.neededPubkeys[address] 159 except KeyError: 160 pass 161 except RuntimeError: 162 self.logger.warning( 163 "Can't remove %s from neededPubkeys, requesting pubkey will be delayed", address, exc_info=True) 164 165 queues.UISignalQueue.put(( 166 'updateStatusBar', 167 'Doing work necessary to again attempt to request a public key...' 168 )) 169 sqlExecute( 170 "UPDATE sent SET status = 'msgqueued'" 171 " WHERE toaddress = ? AND folder = 'sent'", address) 172 queues.workerQueue.put(('sendmessage', '')) 173 174 def resendMsg(self, ackdata): 175 """Resend message by ackdata""" 176 self.logger.debug( 177 'It has been a long time and we haven\'t heard an acknowledgement' 178 ' to our msg. Sending again.' 179 ) 180 sqlExecute( 181 "UPDATE sent SET status = 'msgqueued'" 182 " WHERE ackdata = ? AND folder = 'sent'", ackdata) 183 queues.workerQueue.put(('sendmessage', '')) 184 queues.UISignalQueue.put(( 185 'updateStatusBar', 186 'Doing work necessary to again attempt to deliver a message...' 187 ))