/ src / python / txtai / cloud / storage.py
storage.py
  1  """
  2  Object storage module
  3  """
  4  
  5  import os
  6  
  7  # Conditional import
  8  try:
  9      from libcloud.storage.providers import get_driver, DRIVERS
 10      from libcloud.storage.types import ContainerDoesNotExistError, ObjectDoesNotExistError
 11  
 12      LIBCLOUD = True
 13  except ImportError:
 14      LIBCLOUD, DRIVERS = False, None
 15  
 16  
 17  from .base import Cloud
 18  
 19  
 20  class ObjectStorage(Cloud):
 21      """
 22      Object storage cloud provider backed by Apache libcloud.
 23      """
 24  
 25      @staticmethod
 26      def isprovider(provider):
 27          """
 28          Checks if this provider is an object storage provider.
 29  
 30          Args:
 31              provider: provider name
 32  
 33          Returns:
 34              True if this is an object storage provider
 35          """
 36  
 37          return LIBCLOUD and provider and provider.lower() in [x.lower() for x in DRIVERS]
 38  
 39      def __init__(self, config):
 40          super().__init__(config)
 41  
 42          if not LIBCLOUD:
 43              raise ImportError('Cloud object storage is not available - install "cloud" extra to enable')
 44  
 45          # Get driver for provider
 46          driver = get_driver(config["provider"])
 47  
 48          # Get client connection
 49          self.client = driver(
 50              config.get("key", os.environ.get("ACCESS_KEY")),
 51              config.get("secret", os.environ.get("ACCESS_SECRET")),
 52              **{field: config.get(field) for field in ["host", "port", "region", "token"] if config.get(field)},
 53          )
 54  
 55      def metadata(self, path=None):
 56          try:
 57              # If this is an archive path, check if file exists
 58              if self.isarchive(path):
 59                  return self.client.get_object(self.config["container"], self.objectname(path))
 60  
 61              # Otherwise check if container exists
 62              return self.client.get_container(self.config["container"])
 63          except (ContainerDoesNotExistError, ObjectDoesNotExistError):
 64              return None
 65  
 66      def load(self, path=None):
 67          # Download archive file
 68          if self.isarchive(path):
 69              obj = self.client.get_object(self.config["container"], self.objectname(path))
 70  
 71              # Create local directory, if necessary
 72              directory = os.path.dirname(path)
 73              if directory:
 74                  os.makedirs(directory, exist_ok=True)
 75  
 76              obj.download(path, overwrite_existing=True)
 77  
 78          # Download files in container. Optionally filter with a provided prefix.
 79          else:
 80              container = self.client.get_container(self.config["container"])
 81              for obj in container.list_objects(prefix=self.config.get("prefix")):
 82                  # Derive local path and directory
 83                  localpath = os.path.join(path, obj.name)
 84                  directory = os.path.dirname(localpath)
 85  
 86                  # Create local directory, if necessary
 87                  os.makedirs(directory, exist_ok=True)
 88  
 89                  # Download file locally
 90                  obj.download(localpath, overwrite_existing=True)
 91  
 92          return path
 93  
 94      def save(self, path):
 95          # Get or create container
 96          try:
 97              container = self.client.get_container(self.config["container"])
 98          except ContainerDoesNotExistError:
 99              container = self.client.create_container(self.config["container"])
100  
101          # Upload files
102          for f in self.listfiles(path):
103              with open(f, "rb") as iterator:
104                  self.client.upload_object_via_stream(iterator=iterator, container=container, object_name=self.objectname(f))
105  
106      def objectname(self, name):
107          """
108          Derives an object name. This method checks if a prefix configuration parameter is present and combines
109          it with the input name parameter.
110  
111          Args:
112              name: input name
113  
114          Returns:
115              object name
116          """
117  
118          # Get base name
119          name = os.path.basename(name)
120  
121          # Get optional prefix/folder
122          prefix = self.config.get("prefix")
123  
124          # Prepend prefix, if applicable
125          return f"{prefix}/{name}" if prefix else name