/ src / storage / filesystem.py
filesystem.py
  1  """
  2  Module for using filesystem (directory with files) for inventory storage
  3  """
  4  import logging
  5  import os
  6  import time
  7  from binascii import hexlify, unhexlify
  8  from threading import RLock
  9  
 10  from paths import lookupAppdataFolder
 11  from .storage import InventoryItem, InventoryStorage
 12  
 13  logger = logging.getLogger('default')
 14  
 15  
 16  class FilesystemInventory(InventoryStorage):
 17      """Filesystem for inventory storage"""
 18      topDir = "inventory"
 19      objectDir = "objects"
 20      metadataFilename = "metadata"
 21      dataFilename = "data"
 22  
 23      def __init__(self):
 24          super(FilesystemInventory, self).__init__()
 25          self.baseDir = os.path.join(
 26              lookupAppdataFolder(), FilesystemInventory.topDir)
 27          for createDir in [self.baseDir, os.path.join(self.baseDir, "objects")]:
 28              if os.path.exists(createDir):
 29                  if not os.path.isdir(createDir):
 30                      raise IOError(
 31                          "%s exists but it's not a directory" % createDir)
 32              else:
 33                  os.makedirs(createDir)
 34          # Guarantees that two receiveDataThreads
 35          # don't receive and process the same message
 36          # concurrently (probably sent by a malicious individual)
 37          self.lock = RLock()
 38          self._inventory = {}
 39          self._load()
 40  
 41      def __contains__(self, hashval):
 42          for streamDict in list(self._inventory.values()):
 43              if hashval in streamDict:
 44                  return True
 45          return False
 46  
 47      def __delitem__(self, hash_):
 48          raise NotImplementedError
 49  
 50      def __getitem__(self, hashval):
 51          for streamDict in list(self._inventory.values()):
 52              try:
 53                  retval = streamDict[hashval]
 54              except KeyError:
 55                  continue
 56              if retval.payload is None:
 57                  retval = InventoryItem(
 58                      retval.type,
 59                      retval.stream,
 60                      self.getData(hashval),
 61                      retval.expires,
 62                      retval.tag)
 63              return retval
 64          raise KeyError(hashval)
 65  
 66      def __setitem__(self, hashval, value):
 67          with self.lock:
 68              value = InventoryItem(*value)
 69              try:
 70                  os.makedirs(os.path.join(
 71                      self.baseDir,
 72                      FilesystemInventory.objectDir,
 73                      hexlify(hashval).decode()))
 74              except OSError:
 75                  pass
 76              try:
 77                  with open(
 78                      os.path.join(
 79                          self.baseDir,
 80                          FilesystemInventory.objectDir,
 81                          hexlify(hashval).decode(),
 82                          FilesystemInventory.metadataFilename,
 83                      ),
 84                      "w",
 85                  ) as f:
 86                      f.write("%s,%s,%s,%s," % (
 87                          value.type,
 88                          value.stream,
 89                          value.expires,
 90                          hexlify(value.tag).decode()))
 91                  with open(
 92                      os.path.join(
 93                          self.baseDir,
 94                          FilesystemInventory.objectDir,
 95                          hexlify(hashval).decode(),
 96                          FilesystemInventory.dataFilename,
 97                      ),
 98                      "wb",
 99                  ) as f:
100                      f.write(value.payload)
101              except IOError:
102                  raise KeyError
103              try:
104                  self._inventory[value.stream][hashval] = value
105              except KeyError:
106                  self._inventory[value.stream] = {}
107                  self._inventory[value.stream][hashval] = value
108  
109      def delHashId(self, hashval):
110          """Remove object from inventory"""
111          for stream in self._inventory:
112              try:
113                  del self._inventory[stream][hashval]
114              except KeyError:
115                  pass
116          with self.lock:
117              try:
118                  os.remove(
119                      os.path.join(
120                          self.baseDir,
121                          FilesystemInventory.objectDir,
122                          hexlify(hashval).decode(),
123                          FilesystemInventory.metadataFilename))
124              except IOError:
125                  pass
126              try:
127                  os.remove(
128                      os.path.join(
129                          self.baseDir,
130                          FilesystemInventory.objectDir,
131                          hexlify(hashval).decode(),
132                          FilesystemInventory.dataFilename))
133              except IOError:
134                  pass
135              try:
136                  os.rmdir(os.path.join(
137                      self.baseDir,
138                      FilesystemInventory.objectDir,
139                      hexlify(hashval).decode()))
140              except IOError:
141                  pass
142  
143      def __iter__(self):
144          elems = []
145          for streamDict in list(self._inventory.values()):
146              elems.extend(list(streamDict.keys()))
147          return elems.__iter__()
148  
149      def __len__(self):
150          retval = 0
151          for streamDict in list(self._inventory.values()):
152              retval += len(streamDict)
153          return retval
154  
155      def _load(self):
156          newInventory = {}
157          for hashId in self.object_list():
158              try:
159                  objectType, streamNumber, expiresTime, tag = self.getMetadata(
160                      hashId)
161                  try:
162                      newInventory[streamNumber][hashId] = InventoryItem(
163                          objectType, streamNumber, None, expiresTime, tag)
164                  except KeyError:
165                      newInventory[streamNumber] = {}
166                      newInventory[streamNumber][hashId] = InventoryItem(
167                          objectType, streamNumber, None, expiresTime, tag)
168              except KeyError:
169                  logger.debug(
170                      'error loading %s', hexlify(hashId), exc_info=True)
171          self._inventory = newInventory
172  
173      def stream_list(self):
174          """Return list of streams"""
175          return list(self._inventory.keys())
176  
177      def object_list(self):
178          """Return inventory vectors (hashes) from a directory"""
179          return [unhexlify(x) for x in os.listdir(os.path.join(
180              self.baseDir, FilesystemInventory.objectDir))]
181  
182      def getData(self, hashId):
183          """Get object data"""
184          try:
185              with open(
186                  os.path.join(
187                      self.baseDir,
188                      FilesystemInventory.objectDir,
189                      hexlify(hashId).decode(),
190                      FilesystemInventory.dataFilename,
191                  ),
192                  "r",
193              ) as f:
194                  return f.read()
195          except IOError:
196              raise AttributeError
197  
198      def getMetadata(self, hashId):
199          """Get object metadata"""
200          try:
201              with open(
202                  os.path.join(
203                      self.baseDir,
204                      FilesystemInventory.objectDir,
205                      hexlify(hashId).decode(),
206                      FilesystemInventory.metadataFilename,
207                  ),
208                  "r",
209              ) as f:
210                  objectType, streamNumber, expiresTime, tag = f.read().split(
211                      ",", 4)[:4]
212                  return [
213                      int(objectType),
214                      int(streamNumber),
215                      int(expiresTime),
216                      unhexlify(tag)]
217          except IOError:
218              raise KeyError
219  
220      def by_type_and_tag(self, objectType, tag):
221          """Get a list of objects filtered by object type and tag"""
222          retval = []
223          for streamDict in list(self._inventory.values()):
224              for hashId, item in streamDict:
225                  if item.type == objectType and item.tag == tag:
226                      try:
227                          if item.payload is None:
228                              item.payload = self.getData(hashId)
229                      except IOError:
230                          continue
231                      retval.append(InventoryItem(
232                          item.type,
233                          item.stream,
234                          item.payload,
235                          item.expires,
236                          item.tag))
237          return retval
238  
239      def hashes_by_stream(self, stream):
240          """Return inventory vectors (hashes) for a stream"""
241          try:
242              return list(self._inventory[stream].keys())
243          except KeyError:
244              return []
245  
246      def unexpired_hashes_by_stream(self, stream):
247          """Return unexpired hashes in the inventory for a particular stream"""
248          try:
249              return [
250                  x for x, value in list(self._inventory[stream].items())
251                  if value.expires > int(time.time())]
252          except KeyError:
253              return []
254  
255      def flush(self):
256          """Flush the inventory and create a new, empty one"""
257          self._load()
258  
259      def clean(self):
260          """Clean out old items from the inventory"""
261          minTime = int(time.time()) - 60 * 60 * 30
262          deletes = []
263          for streamDict in list(self._inventory.values()):
264              for hashId, item in list(streamDict.items()):
265                  if item.expires < minTime:
266                      deletes.append(hashId)
267          for hashId in deletes:
268              self.delHashId(hashId)