sqlite.py
1 """ 2 Sqlite Inventory 3 """ 4 import sqlite3 5 import time 6 from threading import RLock 7 8 from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery 9 from .storage import InventoryItem, InventoryStorage 10 11 12 class SqliteInventory(InventoryStorage): 13 """Inventory using SQLite""" 14 def __init__(self): 15 super(SqliteInventory, self).__init__() 16 # of objects (like msg payloads and pubkey payloads) 17 # Does not include protocol headers (the first 24 bytes of each packet). 18 self._inventory = {} 19 # cache for existing objects, used for quick lookups if we have an object. 20 # This is used for example whenever we receive an inv message from a peer 21 # to check to see what items are new to us. 22 # We don't delete things out of it; instead, 23 # the singleCleaner thread clears and refills it. 24 self._objects = {} 25 # Guarantees that two receiveDataThreads don't receive 26 # and process the same message concurrently 27 # (probably sent by a malicious individual) 28 self.lock = RLock() 29 30 def __contains__(self, hash_): 31 with self.lock: 32 if hash_ in self._objects: 33 return True 34 rows = sqlQuery( 35 'SELECT streamnumber FROM inventory WHERE hash=?', 36 sqlite3.Binary(hash_)) 37 if not rows: 38 return False 39 self._objects[hash_] = rows[0][0] 40 return True 41 42 def __getitem__(self, hash_): 43 with self.lock: 44 if hash_ in self._inventory: 45 return self._inventory[hash_] 46 rows = sqlQuery( 47 'SELECT objecttype, streamnumber, payload, expirestime, tag' 48 ' FROM inventory WHERE hash=?', sqlite3.Binary(hash_)) 49 if not rows: 50 raise KeyError(hash_) 51 return InventoryItem(*rows[0]) 52 53 def __setitem__(self, hash_, value): 54 with self.lock: 55 value = InventoryItem(*value) 56 self._inventory[hash_] = value 57 self._objects[hash_] = value.stream 58 59 def __delitem__(self, hash_): 60 raise NotImplementedError 61 62 def __iter__(self): 63 with self.lock: 64 hashes = list(self._inventory.keys())[:] 65 hashes += (x for x, in sqlQuery('SELECT hash FROM inventory')) 66 return hashes.__iter__() 67 68 def __len__(self): 69 with self.lock: 70 return len(self._inventory) + sqlQuery( 71 'SELECT count(*) FROM inventory')[0][0] 72 73 def by_type_and_tag(self, objectType, tag=None): 74 """ 75 Get all inventory items of certain *objectType* 76 with *tag* if given. 77 """ 78 query = [ 79 'SELECT objecttype, streamnumber, payload, expirestime, tag' 80 ' FROM inventory WHERE objecttype=?', objectType] 81 if tag: 82 query[0] += ' AND tag=?' 83 query.append(sqlite3.Binary(tag)) 84 with self.lock: 85 values = [ 86 value for value in list(self._inventory.values()) 87 if value.type == objectType 88 and tag is None or value.tag == tag 89 ] + [InventoryItem(*value) for value in sqlQuery(*query)] 90 return values 91 92 def unexpired_hashes_by_stream(self, stream): 93 """Return unexpired inventory vectors filtered by stream""" 94 with self.lock: 95 t = int(time.time()) 96 hashes = [x for x, value in list(self._inventory.items()) 97 if value.stream == stream and value.expires > t] 98 hashes += (str(payload) for payload, in sqlQuery( 99 'SELECT hash FROM inventory WHERE streamnumber=?' 100 ' AND expirestime>?', stream, t)) 101 return hashes 102 103 def flush(self): 104 """Flush cache""" 105 with self.lock: 106 # If you use both the inventoryLock and the sqlLock, 107 # always use the inventoryLock OUTSIDE of the sqlLock. 108 with SqlBulkExecute() as sql: 109 for objectHash, value in list(self._inventory.items()): 110 sql.execute( 111 'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', 112 sqlite3.Binary(objectHash), *value) 113 self._inventory.clear() 114 115 def clean(self): 116 """Free memory / perform garbage collection""" 117 with self.lock: 118 sqlExecute( 119 'DELETE FROM inventory WHERE expirestime<?', 120 int(time.time()) - (60 * 60 * 3)) 121 self._objects.clear() 122 for objectHash, value in list(self._inventory.items()): 123 self._objects[objectHash] = value.stream