/ src / class_singleCleaner.py
class_singleCleaner.py
  1  """
  2  The `singleCleaner` class is a timer-driven thread that cleans data structures
  3  to free memory, resends messages when a remote node doesn't respond, and
  4  sends pong messages to keep connections alive if the network isn't busy.
  5  
  6  It cleans these data structures in memory:
  7    - inventory (moves data to the on-disk sql database)
  8    - inventorySets (clears then reloads data out of sql database)
  9  
 10  It cleans these tables on the disk:
 11    - inventory (clears expired objects)
 12    - pubkeys (clears pubkeys older than 4 weeks old which we have not used
 13      personally)
 14    - knownNodes (clears addresses which have not been online for over 3 days)
 15  
 16  It resends messages when there has been no response:
 17    - resends getpubkey messages in 5 days (then 10 days, then 20 days, etc...)
 18    - resends msg messages in 5 days (then 10 days, then 20 days, etc...)
 19  
 20  """
 21  
 22  import gc
 23  import os
 24  import time
 25  
 26  from . import queues
 27  from . import state
 28  from .bmconfigparser import config
 29  from .helper_sql import sqlExecute, sqlQuery
 30  from .network import connectionpool, knownnodes, StoppableThread
 31  from .tr import _translate
 32  
 33  
 34  #: Equals 4 weeks. You could make this longer if you want
 35  #: but making it shorter would not be advisable because
 36  #: there is a very small possibility that it could keep you
 37  #: from obtaining a needed pubkey for a period of time.
 38  lengthOfTimeToHoldOnToAllPubkeys = 2419200
 39  
 40  
 41  class singleCleaner(StoppableThread):
 42      """The singleCleaner thread class"""
 43      name = "singleCleaner"
 44      cycleLength = 300
 45      expireDiscoveredPeers = 300
 46  
 47      def run(self):  # pylint: disable=too-many-branches
 48          gc.disable()
 49          timeWeLastClearedInventoryAndPubkeysTables = 0
 50          try:
 51              state.maximumLengthOfTimeToBotherResendingMessages = (
 52                  config.getfloat(
 53                      'bitmessagesettings', 'stopresendingafterxdays')
 54                  * 24 * 60 * 60
 55              ) + (
 56                  config.getfloat(
 57                      'bitmessagesettings', 'stopresendingafterxmonths')
 58                  * (60 * 60 * 24 * 365) / 12)
 59          except:  # noqa:E722
 60              # Either the user hasn't set stopresendingafterxdays and
 61              # stopresendingafterxmonths yet or the options are missing
 62              # from the config file.
 63              state.maximumLengthOfTimeToBotherResendingMessages = float('inf')
 64  
 65          while state.shutdown == 0:
 66              self.stop.wait(self.cycleLength)
 67              queues.UISignalQueue.put((
 68                  'updateStatusBar',
 69                  'Doing housekeeping (Flushing inventory in memory to disk...)'
 70              ))
 71              state.Inventory.flush()
 72              queues.UISignalQueue.put(('updateStatusBar', ''))
 73  
 74              # If we are running as a daemon then we are going to fill up the UI
 75              # queue which will never be handled by a UI. We should clear it to
 76              # save memory.
 77              # FIXME redundant?
 78              if state.thisapp.daemon or not state.enableGUI:
 79                  queues.UISignalQueue.queue.clear()
 80  
 81              tick = int(time.time())
 82              if timeWeLastClearedInventoryAndPubkeysTables < tick - 7380:
 83                  timeWeLastClearedInventoryAndPubkeysTables = tick
 84                  state.Inventory.clean()
 85                  queues.workerQueue.put(('sendOnionPeerObj', ''))
 86                  # pubkeys
 87                  sqlExecute(
 88                      "DELETE FROM pubkeys WHERE time<? AND usedpersonally='no'",
 89                      tick - lengthOfTimeToHoldOnToAllPubkeys)
 90  
 91                  # Let us resend getpubkey objects if we have not yet heard
 92                  # a pubkey, and also msg objects if we have not yet heard
 93                  # an acknowledgement
 94                  queryreturn = sqlQuery(
 95                      "SELECT toaddress, ackdata, status FROM sent"
 96                      " WHERE ((status='awaitingpubkey' OR status='msgsent')"
 97                      " AND folder='sent' AND sleeptill<? AND senttime>?)",
 98                      tick,
 99                      tick - state.maximumLengthOfTimeToBotherResendingMessages
100                  )
101                  for toAddress, ackData, status in queryreturn:
102                      if status == 'awaitingpubkey':
103                          self.resendPubkeyRequest(toAddress)
104                      elif status == 'msgsent':
105                          self.resendMsg(ackData)
106  
107              try:
108                  # Cleanup knownnodes and handle possible severe exception
109                  # while writing it to disk
110                  if state.enableNetwork:
111                      knownnodes.cleanupKnownNodes(connectionpool.pool)
112              except Exception as err:
113                  if "Errno 28" in str(err):
114                      self.logger.fatal(
115                          '(while writing knownnodes to disk)'
116                          ' Alert: Your disk or data storage volume is full.'
117                      )
118                      queues.UISignalQueue.put((
119                          'alert',
120                          (_translate("MainWindow", "Disk full"),
121                           _translate(
122                               "MainWindow",
123                               'Alert: Your disk or data storage volume'
124                               ' is full. Bitmessage will now exit.'),
125                           True)
126                      ))
127                      # FIXME redundant?
128                      if state.thisapp.daemon or not state.enableGUI:
129                          os._exit(1)  # pylint: disable=protected-access
130  
131              # inv/object tracking
132              for connection in connectionpool.pool.connections():
133                  connection.clean()
134  
135              # discovery tracking
136              exp = time.time() - singleCleaner.expireDiscoveredPeers
137              reaper = (k for k, v in list(state.discoveredPeers.items()) if v < exp)
138              for k in reaper:
139                  try:
140                      del state.discoveredPeers[k]
141                  except KeyError:
142                      pass
143              # ..todo:: cleanup pending upload / download
144  
145              gc.collect()
146  
147      def resendPubkeyRequest(self, address):
148          """Resend pubkey request for address"""
149          self.logger.debug(
150              'It has been a long time and we haven\'t heard a response to our'
151              ' getpubkey request. Sending again.'
152          )
153          try:
154              # We need to take this entry out of the neededPubkeys structure
155              # because the queues.workerQueue checks to see whether the entry
156              # is already present and will not do the POW and send the message
157              # because it assumes that it has already done it recently.
158              del state.neededPubkeys[address]
159          except KeyError:
160              pass
161          except RuntimeError:
162              self.logger.warning(
163                  "Can't remove %s from neededPubkeys, requesting pubkey will be delayed", address, exc_info=True)
164  
165          queues.UISignalQueue.put((
166              'updateStatusBar',
167              'Doing work necessary to again attempt to request a public key...'
168          ))
169          sqlExecute(
170              "UPDATE sent SET status = 'msgqueued'"
171              " WHERE toaddress = ? AND folder = 'sent'", address)
172          queues.workerQueue.put(('sendmessage', ''))
173  
174      def resendMsg(self, ackdata):
175          """Resend message by ackdata"""
176          self.logger.debug(
177              'It has been a long time and we haven\'t heard an acknowledgement'
178              ' to our msg. Sending again.'
179          )
180          sqlExecute(
181              "UPDATE sent SET status = 'msgqueued'"
182              " WHERE ackdata = ? AND folder = 'sent'", ackdata)
183          queues.workerQueue.put(('sendmessage', ''))
184          queues.UISignalQueue.put((
185              'updateStatusBar',
186              'Doing work necessary to again attempt to deliver a message...'
187          ))