class_singleCleaner.py
1 """ 2 The `singleCleaner` class is a timer-driven thread that cleans data structures 3 to free memory, resends messages when a remote node doesn't respond, and 4 sends pong messages to keep connections alive if the network isn't busy. 5 6 It cleans these data structures in memory: 7 - inventory (moves data to the on-disk sql database) 8 - inventorySets (clears then reloads data out of sql database) 9 10 It cleans these tables on the disk: 11 - inventory (clears expired objects) 12 - pubkeys (clears pubkeys older than 4 weeks old which we have not used 13 personally) 14 - knownNodes (clears addresses which have not been online for over 3 days) 15 16 It resends messages when there has been no response: 17 - resends getpubkey messages in 5 days (then 10 days, then 20 days, etc...) 18 - resends msg messages in 5 days (then 10 days, then 20 days, etc...) 19 20 """ 21 22 import gc 23 import os 24 import time 25 26 from . import queues 27 from . import state 28 from .bmconfigparser import config 29 from .helper_sql import sqlExecute, sqlQuery 30 from .network import connectionpool, knownnodes, StoppableThread 31 from .tr import _translate 32 33 34 #: Equals 4 weeks. You could make this longer if you want 35 #: but making it shorter would not be advisable because 36 #: there is a very small possibility that it could keep you 37 #: from obtaining a needed pubkey for a period of time. 38 lengthOfTimeToHoldOnToAllPubkeys = 2419200 39 40 41 class singleCleaner(StoppableThread): 42 """The singleCleaner thread class""" 43 name = "singleCleaner" 44 cycleLength = 300 45 expireDiscoveredPeers = 300 46 47 def run(self): # pylint: disable=too-many-branches 48 gc.disable() 49 timeWeLastClearedInventoryAndPubkeysTables = 0 50 try: 51 state.maximumLengthOfTimeToBotherResendingMessages = ( 52 config.getfloat( 53 'bitmessagesettings', 'stopresendingafterxdays') 54 * 24 * 60 * 60 55 ) + ( 56 config.getfloat( 57 'bitmessagesettings', 'stopresendingafterxmonths') 58 * (60 * 60 * 24 * 365) / 12) 59 except: # noqa:E722 60 # Either the user hasn't set stopresendingafterxdays and 61 # stopresendingafterxmonths yet or the options are missing 62 # from the config file. 63 state.maximumLengthOfTimeToBotherResendingMessages = float('inf') 64 65 while state.shutdown == 0: 66 self.stop.wait(self.cycleLength) 67 queues.UISignalQueue.put(( 68 'updateStatusBar', 69 'Doing housekeeping (Flushing inventory in memory to disk...)' 70 )) 71 state.Inventory.flush() 72 queues.UISignalQueue.put(('updateStatusBar', '')) 73 74 # If we are running as a daemon then we are going to fill up the UI 75 # queue which will never be handled by a UI. We should clear it to 76 # save memory. 77 # FIXME redundant? 78 if state.thisapp.daemon or not state.enableGUI: 79 queues.UISignalQueue.queue.clear() 80 81 tick = int(time.time()) 82 if timeWeLastClearedInventoryAndPubkeysTables < tick - 7380: 83 timeWeLastClearedInventoryAndPubkeysTables = tick 84 state.Inventory.clean() 85 queues.workerQueue.put(('sendOnionPeerObj', '')) 86 # pubkeys 87 sqlExecute( 88 "DELETE FROM pubkeys WHERE time<? AND usedpersonally='no'", 89 tick - lengthOfTimeToHoldOnToAllPubkeys) 90 91 # Let us resend getpubkey objects if we have not yet heard 92 # a pubkey, and also msg objects if we have not yet heard 93 # an acknowledgement 94 queryreturn = sqlQuery( 95 "SELECT toaddress, ackdata, status FROM sent" 96 " WHERE ((status='awaitingpubkey' OR status='msgsent')" 97 " AND folder='sent' AND sleeptill<? AND senttime>?)", 98 tick, 99 tick - state.maximumLengthOfTimeToBotherResendingMessages 100 ) 101 for toAddress, ackData, status in queryreturn: 102 if status == 'awaitingpubkey': 103 self.resendPubkeyRequest(toAddress) 104 elif status == 'msgsent': 105 self.resendMsg(ackData) 106 107 try: 108 # Cleanup knownnodes and handle possible severe exception 109 # while writing it to disk 110 if state.enableNetwork: 111 knownnodes.cleanupKnownNodes(connectionpool.pool) 112 except Exception as err: 113 if "Errno 28" in str(err): 114 self.logger.fatal( 115 '(while writing knownnodes to disk)' 116 ' Alert: Your disk or data storage volume is full.' 117 ) 118 queues.UISignalQueue.put(( 119 'alert', 120 (_translate("MainWindow", "Disk full"), 121 _translate( 122 "MainWindow", 123 'Alert: Your disk or data storage volume' 124 ' is full. Bitmessage will now exit.'), 125 True) 126 )) 127 # FIXME redundant? 128 if state.thisapp.daemon or not state.enableGUI: 129 os._exit(1) # pylint: disable=protected-access 130 131 # inv/object tracking 132 for connection in connectionpool.pool.connections(): 133 connection.clean() 134 135 # discovery tracking 136 exp = time.time() - singleCleaner.expireDiscoveredPeers 137 reaper = (k for k, v in list(state.discoveredPeers.items()) if v < exp) 138 for k in reaper: 139 try: 140 del state.discoveredPeers[k] 141 except KeyError: 142 pass 143 # ..todo:: cleanup pending upload / download 144 145 gc.collect() 146 147 def resendPubkeyRequest(self, address): 148 """Resend pubkey request for address""" 149 self.logger.debug( 150 'It has been a long time and we haven\'t heard a response to our' 151 ' getpubkey request. Sending again.' 152 ) 153 try: 154 # We need to take this entry out of the neededPubkeys structure 155 # because the queues.workerQueue checks to see whether the entry 156 # is already present and will not do the POW and send the message 157 # because it assumes that it has already done it recently. 158 del state.neededPubkeys[address] 159 except KeyError: 160 pass 161 except RuntimeError: 162 self.logger.warning( 163 "Can't remove %s from neededPubkeys, requesting pubkey will be delayed", address, exc_info=True) 164 165 queues.UISignalQueue.put(( 166 'updateStatusBar', 167 'Doing work necessary to again attempt to request a public key...' 168 )) 169 sqlExecute( 170 "UPDATE sent SET status = 'msgqueued'" 171 " WHERE toaddress = ? AND folder = 'sent'", address) 172 queues.workerQueue.put(('sendmessage', '')) 173 174 def resendMsg(self, ackdata): 175 """Resend message by ackdata""" 176 self.logger.debug( 177 'It has been a long time and we haven\'t heard an acknowledgement' 178 ' to our msg. Sending again.' 179 ) 180 sqlExecute( 181 "UPDATE sent SET status = 'msgqueued'" 182 " WHERE ackdata = ? AND folder = 'sent'", ackdata) 183 queues.workerQueue.put(('sendmessage', '')) 184 queues.UISignalQueue.put(( 185 'updateStatusBar', 186 'Doing work necessary to again attempt to deliver a message...' 187 ))