filesystem.py
1 """ 2 Module for using filesystem (directory with files) for inventory storage 3 """ 4 import logging 5 import os 6 import time 7 from binascii import hexlify, unhexlify 8 from threading import RLock 9 10 from paths import lookupAppdataFolder 11 from .storage import InventoryItem, InventoryStorage 12 13 logger = logging.getLogger('default') 14 15 16 class FilesystemInventory(InventoryStorage): 17 """Filesystem for inventory storage""" 18 topDir = "inventory" 19 objectDir = "objects" 20 metadataFilename = "metadata" 21 dataFilename = "data" 22 23 def __init__(self): 24 super(FilesystemInventory, self).__init__() 25 self.baseDir = os.path.join( 26 lookupAppdataFolder(), FilesystemInventory.topDir) 27 for createDir in [self.baseDir, os.path.join(self.baseDir, "objects")]: 28 if os.path.exists(createDir): 29 if not os.path.isdir(createDir): 30 raise IOError( 31 "%s exists but it's not a directory" % createDir) 32 else: 33 os.makedirs(createDir) 34 # Guarantees that two receiveDataThreads 35 # don't receive and process the same message 36 # concurrently (probably sent by a malicious individual) 37 self.lock = RLock() 38 self._inventory = {} 39 self._load() 40 41 def __contains__(self, hashval): 42 for streamDict in list(self._inventory.values()): 43 if hashval in streamDict: 44 return True 45 return False 46 47 def __delitem__(self, hash_): 48 raise NotImplementedError 49 50 def __getitem__(self, hashval): 51 for streamDict in list(self._inventory.values()): 52 try: 53 retval = streamDict[hashval] 54 except KeyError: 55 continue 56 if retval.payload is None: 57 retval = InventoryItem( 58 retval.type, 59 retval.stream, 60 self.getData(hashval), 61 retval.expires, 62 retval.tag) 63 return retval 64 raise KeyError(hashval) 65 66 def __setitem__(self, hashval, value): 67 with self.lock: 68 value = InventoryItem(*value) 69 try: 70 os.makedirs(os.path.join( 71 self.baseDir, 72 FilesystemInventory.objectDir, 73 hexlify(hashval).decode())) 74 except OSError: 75 pass 76 try: 77 with open( 78 os.path.join( 79 self.baseDir, 80 FilesystemInventory.objectDir, 81 hexlify(hashval).decode(), 82 FilesystemInventory.metadataFilename, 83 ), 84 "w", 85 ) as f: 86 f.write("%s,%s,%s,%s," % ( 87 value.type, 88 value.stream, 89 value.expires, 90 hexlify(value.tag).decode())) 91 with open( 92 os.path.join( 93 self.baseDir, 94 FilesystemInventory.objectDir, 95 hexlify(hashval).decode(), 96 FilesystemInventory.dataFilename, 97 ), 98 "wb", 99 ) as f: 100 f.write(value.payload) 101 except IOError: 102 raise KeyError 103 try: 104 self._inventory[value.stream][hashval] = value 105 except KeyError: 106 self._inventory[value.stream] = {} 107 self._inventory[value.stream][hashval] = value 108 109 def delHashId(self, hashval): 110 """Remove object from inventory""" 111 for stream in self._inventory: 112 try: 113 del self._inventory[stream][hashval] 114 except KeyError: 115 pass 116 with self.lock: 117 try: 118 os.remove( 119 os.path.join( 120 self.baseDir, 121 FilesystemInventory.objectDir, 122 hexlify(hashval).decode(), 123 FilesystemInventory.metadataFilename)) 124 except IOError: 125 pass 126 try: 127 os.remove( 128 os.path.join( 129 self.baseDir, 130 FilesystemInventory.objectDir, 131 hexlify(hashval).decode(), 132 FilesystemInventory.dataFilename)) 133 except IOError: 134 pass 135 try: 136 os.rmdir(os.path.join( 137 self.baseDir, 138 FilesystemInventory.objectDir, 139 hexlify(hashval).decode())) 140 except IOError: 141 pass 142 143 def __iter__(self): 144 elems = [] 145 for streamDict in list(self._inventory.values()): 146 elems.extend(list(streamDict.keys())) 147 return elems.__iter__() 148 149 def __len__(self): 150 retval = 0 151 for streamDict in list(self._inventory.values()): 152 retval += len(streamDict) 153 return retval 154 155 def _load(self): 156 newInventory = {} 157 for hashId in self.object_list(): 158 try: 159 objectType, streamNumber, expiresTime, tag = self.getMetadata( 160 hashId) 161 try: 162 newInventory[streamNumber][hashId] = InventoryItem( 163 objectType, streamNumber, None, expiresTime, tag) 164 except KeyError: 165 newInventory[streamNumber] = {} 166 newInventory[streamNumber][hashId] = InventoryItem( 167 objectType, streamNumber, None, expiresTime, tag) 168 except KeyError: 169 logger.debug( 170 'error loading %s', hexlify(hashId), exc_info=True) 171 self._inventory = newInventory 172 173 def stream_list(self): 174 """Return list of streams""" 175 return list(self._inventory.keys()) 176 177 def object_list(self): 178 """Return inventory vectors (hashes) from a directory""" 179 return [unhexlify(x) for x in os.listdir(os.path.join( 180 self.baseDir, FilesystemInventory.objectDir))] 181 182 def getData(self, hashId): 183 """Get object data""" 184 try: 185 with open( 186 os.path.join( 187 self.baseDir, 188 FilesystemInventory.objectDir, 189 hexlify(hashId).decode(), 190 FilesystemInventory.dataFilename, 191 ), 192 "r", 193 ) as f: 194 return f.read() 195 except IOError: 196 raise AttributeError 197 198 def getMetadata(self, hashId): 199 """Get object metadata""" 200 try: 201 with open( 202 os.path.join( 203 self.baseDir, 204 FilesystemInventory.objectDir, 205 hexlify(hashId).decode(), 206 FilesystemInventory.metadataFilename, 207 ), 208 "r", 209 ) as f: 210 objectType, streamNumber, expiresTime, tag = f.read().split( 211 ",", 4)[:4] 212 return [ 213 int(objectType), 214 int(streamNumber), 215 int(expiresTime), 216 unhexlify(tag)] 217 except IOError: 218 raise KeyError 219 220 def by_type_and_tag(self, objectType, tag): 221 """Get a list of objects filtered by object type and tag""" 222 retval = [] 223 for streamDict in list(self._inventory.values()): 224 for hashId, item in streamDict: 225 if item.type == objectType and item.tag == tag: 226 try: 227 if item.payload is None: 228 item.payload = self.getData(hashId) 229 except IOError: 230 continue 231 retval.append(InventoryItem( 232 item.type, 233 item.stream, 234 item.payload, 235 item.expires, 236 item.tag)) 237 return retval 238 239 def hashes_by_stream(self, stream): 240 """Return inventory vectors (hashes) for a stream""" 241 try: 242 return list(self._inventory[stream].keys()) 243 except KeyError: 244 return [] 245 246 def unexpired_hashes_by_stream(self, stream): 247 """Return unexpired hashes in the inventory for a particular stream""" 248 try: 249 return [ 250 x for x, value in list(self._inventory[stream].items()) 251 if value.expires > int(time.time())] 252 except KeyError: 253 return [] 254 255 def flush(self): 256 """Flush the inventory and create a new, empty one""" 257 self._load() 258 259 def clean(self): 260 """Clean out old items from the inventory""" 261 minTime = int(time.time()) - 60 * 60 * 30 262 deletes = [] 263 for streamDict in list(self._inventory.values()): 264 for hashId, item in list(streamDict.items()): 265 if item.expires < minTime: 266 deletes.append(hashId) 267 for hashId in deletes: 268 self.delHashId(hashId)