/ src / storage / sqlite.py.bak
sqlite.py.bak
  1  """
  2  Sqlite Inventory
  3  """
  4  import sqlite3
  5  import time
  6  from threading import RLock
  7  
  8  from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery
  9  from .storage import InventoryItem, InventoryStorage
 10  
 11  
 12  class SqliteInventory(InventoryStorage):
 13      """Inventory using SQLite"""
 14      def __init__(self):
 15          super(SqliteInventory, self).__init__()
 16          # of objects (like msg payloads and pubkey payloads)
 17          # Does not include protocol headers (the first 24 bytes of each packet).
 18          self._inventory = {}
 19          # cache for existing objects, used for quick lookups if we have an object.
 20          # This is used for example whenever we receive an inv message from a peer
 21          # to check to see what items are new to us.
 22          # We don't delete things out of it; instead,
 23          # the singleCleaner thread clears and refills it.
 24          self._objects = {}
 25          # Guarantees that two receiveDataThreads don't receive
 26          # and process the same message concurrently
 27          # (probably sent by a malicious individual)
 28          self.lock = RLock()
 29  
 30      def __contains__(self, hash_):
 31          with self.lock:
 32              if hash_ in self._objects:
 33                  return True
 34              rows = sqlQuery(
 35                  'SELECT streamnumber FROM inventory WHERE hash=?',
 36                  sqlite3.Binary(hash_))
 37              if not rows:
 38                  return False
 39              self._objects[hash_] = rows[0][0]
 40              return True
 41  
 42      def __getitem__(self, hash_):
 43          with self.lock:
 44              if hash_ in self._inventory:
 45                  return self._inventory[hash_]
 46              rows = sqlQuery(
 47                  'SELECT objecttype, streamnumber, payload, expirestime, tag'
 48                  ' FROM inventory WHERE hash=?', sqlite3.Binary(hash_))
 49              if not rows:
 50                  raise KeyError(hash_)
 51              return InventoryItem(*rows[0])
 52  
 53      def __setitem__(self, hash_, value):
 54          with self.lock:
 55              value = InventoryItem(*value)
 56              self._inventory[hash_] = value
 57              self._objects[hash_] = value.stream
 58  
 59      def __delitem__(self, hash_):
 60          raise NotImplementedError
 61  
 62      def __iter__(self):
 63          with self.lock:
 64              hashes = self._inventory.keys()[:]
 65              hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
 66              return hashes.__iter__()
 67  
 68      def __len__(self):
 69          with self.lock:
 70              return len(self._inventory) + sqlQuery(
 71                  'SELECT count(*) FROM inventory')[0][0]
 72  
 73      def by_type_and_tag(self, objectType, tag=None):
 74          """
 75          Get all inventory items of certain *objectType*
 76          with *tag* if given.
 77          """
 78          query = [
 79              'SELECT objecttype, streamnumber, payload, expirestime, tag'
 80              ' FROM inventory WHERE objecttype=?', objectType]
 81          if tag:
 82              query[0] += ' AND tag=?'
 83              query.append(sqlite3.Binary(tag))
 84          with self.lock:
 85              values = [
 86                  value for value in self._inventory.values()
 87                  if value.type == objectType
 88                  and tag is None or value.tag == tag
 89              ] + [InventoryItem(*value) for value in sqlQuery(*query)]
 90              return values
 91  
 92      def unexpired_hashes_by_stream(self, stream):
 93          """Return unexpired inventory vectors filtered by stream"""
 94          with self.lock:
 95              t = int(time.time())
 96              hashes = [x for x, value in self._inventory.items()
 97                        if value.stream == stream and value.expires > t]
 98              hashes += (str(payload) for payload, in sqlQuery(
 99                  'SELECT hash FROM inventory WHERE streamnumber=?'
100                  ' AND expirestime>?', stream, t))
101              return hashes
102  
103      def flush(self):
104          """Flush cache"""
105          with self.lock:
106              # If you use both the inventoryLock and the sqlLock,
107              # always use the inventoryLock OUTSIDE of the sqlLock.
108              with SqlBulkExecute() as sql:
109                  for objectHash, value in self._inventory.items():
110                      sql.execute(
111                          'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
112                          sqlite3.Binary(objectHash), *value)
113                  self._inventory.clear()
114  
115      def clean(self):
116          """Free memory / perform garbage collection"""
117          with self.lock:
118              sqlExecute(
119                  'DELETE FROM inventory WHERE expirestime<?',
120                  int(time.time()) - (60 * 60 * 3))
121              self._objects.clear()
122              for objectHash, value in self._inventory.items():
123                  self._objects[objectHash] = value.stream