/ src / class_singleCleaner.py.bak
class_singleCleaner.py.bak
  1  """
  2  The `singleCleaner` class is a timer-driven thread that cleans data structures
  3  to free memory, resends messages when a remote node doesn't respond, and
  4  sends pong messages to keep connections alive if the network isn't busy.
  5  
  6  It cleans these data structures in memory:
  7    - inventory (moves data to the on-disk sql database)
  8    - inventorySets (clears then reloads data out of sql database)
  9  
 10  It cleans these tables on the disk:
 11    - inventory (clears expired objects)
 12    - pubkeys (clears pubkeys older than 4 weeks old which we have not used
 13      personally)
 14    - knownNodes (clears addresses which have not been online for over 3 days)
 15  
 16  It resends messages when there has been no response:
 17    - resends getpubkey messages in 5 days (then 10 days, then 20 days, etc...)
 18    - resends msg messages in 5 days (then 10 days, then 20 days, etc...)
 19  
 20  """
 21  
 22  import gc
 23  import os
 24  import time
 25  
 26  import queues
 27  import state
 28  from bmconfigparser import config
 29  from helper_sql import sqlExecute, sqlQuery
 30  from network import connectionpool, knownnodes, StoppableThread
 31  from tr import _translate
 32  
 33  
 34  #: Equals 4 weeks. You could make this longer if you want
 35  #: but making it shorter would not be advisable because
 36  #: there is a very small possibility that it could keep you
 37  #: from obtaining a needed pubkey for a period of time.
 38  lengthOfTimeToHoldOnToAllPubkeys = 2419200
 39  
 40  
 41  class singleCleaner(StoppableThread):
 42      """The singleCleaner thread class"""
 43      name = "singleCleaner"
 44      cycleLength = 300
 45      expireDiscoveredPeers = 300
 46  
 47      def run(self):  # pylint: disable=too-many-branches
 48          gc.disable()
 49          timeWeLastClearedInventoryAndPubkeysTables = 0
 50          try:
 51              state.maximumLengthOfTimeToBotherResendingMessages = (
 52                  config.getfloat(
 53                      'bitmessagesettings', 'stopresendingafterxdays')
 54                  * 24 * 60 * 60
 55              ) + (
 56                  config.getfloat(
 57                      'bitmessagesettings', 'stopresendingafterxmonths')
 58                  * (60 * 60 * 24 * 365) / 12)
 59          except:  # noqa:E722
 60              # Either the user hasn't set stopresendingafterxdays and
 61              # stopresendingafterxmonths yet or the options are missing
 62              # from the config file.
 63              state.maximumLengthOfTimeToBotherResendingMessages = float('inf')
 64  
 65          while state.shutdown == 0:
 66              self.stop.wait(self.cycleLength)
 67              queues.UISignalQueue.put((
 68                  'updateStatusBar',
 69                  'Doing housekeeping (Flushing inventory in memory to disk...)'
 70              ))
 71              state.Inventory.flush()
 72              queues.UISignalQueue.put(('updateStatusBar', ''))
 73  
 74              # If we are running as a daemon then we are going to fill up the UI
 75              # queue which will never be handled by a UI. We should clear it to
 76              # save memory.
 77              # FIXME redundant?
 78              if state.thisapp.daemon or not state.enableGUI:
 79                  queues.UISignalQueue.queue.clear()
 80  
 81              tick = int(time.time())
 82              if timeWeLastClearedInventoryAndPubkeysTables < tick - 7380:
 83                  timeWeLastClearedInventoryAndPubkeysTables = tick
 84                  state.Inventory.clean()
 85                  queues.workerQueue.put(('sendOnionPeerObj', ''))
 86                  # pubkeys
 87                  sqlExecute(
 88                      "DELETE FROM pubkeys WHERE time<? AND usedpersonally='no'",
 89                      tick - lengthOfTimeToHoldOnToAllPubkeys)
 90  
 91                  # Let us resend getpubkey objects if we have not yet heard
 92                  # a pubkey, and also msg objects if we have not yet heard
 93                  # an acknowledgement
 94                  queryreturn = sqlQuery(
 95                      "SELECT toaddress, ackdata, status FROM sent"
 96                      " WHERE ((status='awaitingpubkey' OR status='msgsent')"
 97                      " AND folder='sent' AND sleeptill<? AND senttime>?)",
 98                      tick,
 99                      tick - state.maximumLengthOfTimeToBotherResendingMessages
100                  )
101                  for toAddress, ackData, status in queryreturn:
102                      if status == 'awaitingpubkey':
103                          self.resendPubkeyRequest(toAddress)
104                      elif status == 'msgsent':
105                          self.resendMsg(ackData)
106  
107              try:
108                  # Cleanup knownnodes and handle possible severe exception
109                  # while writing it to disk
110                  if state.enableNetwork:
111                      knownnodes.cleanupKnownNodes(connectionpool.pool)
112              except Exception as err:
113                  if "Errno 28" in str(err):
114                      self.logger.fatal(
115                          '(while writing knownnodes to disk)'
116                          ' Alert: Your disk or data storage volume is full.'
117                      )
118                      queues.UISignalQueue.put((
119                          'alert',
120                          (_translate("MainWindow", "Disk full"),
121                           _translate(
122                               "MainWindow",
123                               'Alert: Your disk or data storage volume'
124                               ' is full. Bitmessage will now exit.'),
125                           True)
126                      ))
127                      # FIXME redundant?
128                      if state.thisapp.daemon or not state.enableGUI:
129                          os._exit(1)  # pylint: disable=protected-access
130  
131              # inv/object tracking
132              for connection in connectionpool.pool.connections():
133                  connection.clean()
134  
135              # discovery tracking
136              exp = time.time() - singleCleaner.expireDiscoveredPeers
137              reaper = (k for k, v in state.discoveredPeers.items() if v < exp)
138              for k in reaper:
139                  try:
140                      del state.discoveredPeers[k]
141                  except KeyError:
142                      pass
143              # ..todo:: cleanup pending upload / download
144  
145              gc.collect()
146  
147      def resendPubkeyRequest(self, address):
148          """Resend pubkey request for address"""
149          self.logger.debug(
150              'It has been a long time and we haven\'t heard a response to our'
151              ' getpubkey request. Sending again.'
152          )
153          try:
154              # We need to take this entry out of the neededPubkeys structure
155              # because the queues.workerQueue checks to see whether the entry
156              # is already present and will not do the POW and send the message
157              # because it assumes that it has already done it recently.
158              del state.neededPubkeys[address]
159          except KeyError:
160              pass
161          except RuntimeError:
162              self.logger.warning(
163                  "Can't remove %s from neededPubkeys, requesting pubkey will be delayed", address, exc_info=True)
164  
165          queues.UISignalQueue.put((
166              'updateStatusBar',
167              'Doing work necessary to again attempt to request a public key...'
168          ))
169          sqlExecute(
170              "UPDATE sent SET status = 'msgqueued'"
171              " WHERE toaddress = ? AND folder = 'sent'", address)
172          queues.workerQueue.put(('sendmessage', ''))
173  
174      def resendMsg(self, ackdata):
175          """Resend message by ackdata"""
176          self.logger.debug(
177              'It has been a long time and we haven\'t heard an acknowledgement'
178              ' to our msg. Sending again.'
179          )
180          sqlExecute(
181              "UPDATE sent SET status = 'msgqueued'"
182              " WHERE ackdata = ? AND folder = 'sent'", ackdata)
183          queues.workerQueue.put(('sendmessage', ''))
184          queues.UISignalQueue.put((
185              'updateStatusBar',
186              'Doing work necessary to again attempt to deliver a message...'
187          ))