storage.py
1 """ 2 StorageTask module 3 """ 4 5 import os 6 import re 7 8 # Conditional import 9 try: 10 from libcloud.storage.providers import get_driver 11 12 LIBCLOUD = True 13 except ImportError: 14 LIBCLOUD = False 15 16 from .base import Task 17 18 19 class StorageTask(Task): 20 """ 21 Task that processes object storage buckets. Supports local and cloud providers in Apache libcloud. 22 """ 23 24 # URL prefix 25 PREFIX = r"(\w+):\/\/.*" 26 PATH = r"\w+:\/\/(.*)" 27 28 def register(self, key=None, secret=None, host=None, port=None, token=None, region=None): 29 """ 30 Checks if required dependencies are installed. Reads in cloud storage parameters. 31 32 Args: 33 key: provider-specific access key 34 secret: provider-specific access secret 35 host: server host name 36 port: server port 37 token: temporary session token 38 region: storage region 39 """ 40 41 if not LIBCLOUD: 42 raise ImportError('StorageTask is not available - install "workflow" extra to enable') 43 44 # pylint: disable=W0201 45 self.key = key 46 self.secret = secret 47 self.host = host 48 self.port = port 49 self.token = token 50 self.region = region 51 52 def __call__(self, elements, executor=None): 53 # Create aggregated directory listing for all elements 54 outputs = [] 55 for element in elements: 56 if self.matches(element): 57 # Get directory listing and run actions 58 outputs.extend(super().__call__(self.list(element), executor)) 59 else: 60 outputs.append(element) 61 62 return outputs 63 64 def matches(self, element): 65 """ 66 Determines if this element is a storage element. 67 68 Args: 69 element: input storage element 70 71 Returns: 72 True if this is a storage element 73 """ 74 75 # Only accept file URLs 76 return re.match(StorageTask.PREFIX, self.upack(element, True).lower()) 77 78 def list(self, element): 79 """ 80 Gets a list of urls for a object container. 81 82 Args: 83 element: object container 84 85 Returns: 86 list of urls 87 """ 88 89 provider = re.sub(StorageTask.PREFIX, r"\1", element.lower()) 90 path = re.sub(StorageTask.PATH, r"\1", element) 91 92 # Load key and secret, if applicable 93 key = self.key if self.key is not None else os.environ.get("ACCESS_KEY") 94 secret = self.secret if self.secret is not None else os.environ.get("ACCESS_SECRET") 95 96 # Parse key and container 97 key, container = (os.path.dirname(path), os.path.basename(path)) if key is None else (key, path) 98 99 # Parse optional prefix from container 100 parts = container.split("/", 1) 101 container, prefix = (parts[0], parts[1]) if len(parts) > 1 else (container, None) 102 103 # Get driver for provider 104 driver = get_driver(provider) 105 106 # Get client connection 107 client = driver(key, secret, **{field: getattr(self, field) for field in ["host", "port", "region", "token"] if getattr(self, field)}) 108 109 container = client.get_container(container_name=container) 110 return [client.get_object_cdn_url(obj) for obj in client.list_container_objects(container=container, prefix=prefix)]