/ src / python / txtai / workflow / task / storage.py
storage.py
  1  """
  2  StorageTask module
  3  """
  4  
  5  import os
  6  import re
  7  
  8  # Conditional import
  9  try:
 10      from libcloud.storage.providers import get_driver
 11  
 12      LIBCLOUD = True
 13  except ImportError:
 14      LIBCLOUD = False
 15  
 16  from .base import Task
 17  
 18  
 19  class StorageTask(Task):
 20      """
 21      Task that processes object storage buckets. Supports local and cloud providers in Apache libcloud.
 22      """
 23  
 24      # URL prefix
 25      PREFIX = r"(\w+):\/\/.*"
 26      PATH = r"\w+:\/\/(.*)"
 27  
 28      def register(self, key=None, secret=None, host=None, port=None, token=None, region=None):
 29          """
 30          Checks if required dependencies are installed. Reads in cloud storage parameters.
 31  
 32          Args:
 33              key: provider-specific access key
 34              secret: provider-specific access secret
 35              host: server host name
 36              port: server port
 37              token: temporary session token
 38              region: storage region
 39          """
 40  
 41          if not LIBCLOUD:
 42              raise ImportError('StorageTask is not available - install "workflow" extra to enable')
 43  
 44          # pylint: disable=W0201
 45          self.key = key
 46          self.secret = secret
 47          self.host = host
 48          self.port = port
 49          self.token = token
 50          self.region = region
 51  
 52      def __call__(self, elements, executor=None):
 53          # Create aggregated directory listing for all elements
 54          outputs = []
 55          for element in elements:
 56              if self.matches(element):
 57                  # Get directory listing and run actions
 58                  outputs.extend(super().__call__(self.list(element), executor))
 59              else:
 60                  outputs.append(element)
 61  
 62          return outputs
 63  
 64      def matches(self, element):
 65          """
 66          Determines if this element is a storage element.
 67  
 68          Args:
 69              element: input storage element
 70  
 71          Returns:
 72              True if this is a storage element
 73          """
 74  
 75          # Only accept file URLs
 76          return re.match(StorageTask.PREFIX, self.upack(element, True).lower())
 77  
 78      def list(self, element):
 79          """
 80          Gets a list of urls for a object container.
 81  
 82          Args:
 83              element: object container
 84  
 85          Returns:
 86              list of urls
 87          """
 88  
 89          provider = re.sub(StorageTask.PREFIX, r"\1", element.lower())
 90          path = re.sub(StorageTask.PATH, r"\1", element)
 91  
 92          # Load key and secret, if applicable
 93          key = self.key if self.key is not None else os.environ.get("ACCESS_KEY")
 94          secret = self.secret if self.secret is not None else os.environ.get("ACCESS_SECRET")
 95  
 96          # Parse key and container
 97          key, container = (os.path.dirname(path), os.path.basename(path)) if key is None else (key, path)
 98  
 99          # Parse optional prefix from container
100          parts = container.split("/", 1)
101          container, prefix = (parts[0], parts[1]) if len(parts) > 1 else (container, None)
102  
103          # Get driver for provider
104          driver = get_driver(provider)
105  
106          # Get client connection
107          client = driver(key, secret, **{field: getattr(self, field) for field in ["host", "port", "region", "token"] if getattr(self, field)})
108  
109          container = client.get_container(container_name=container)
110          return [client.get_object_cdn_url(obj) for obj in client.list_container_objects(container=container, prefix=prefix)]