storage.py
1 """ 2 Object storage module 3 """ 4 5 import os 6 7 # Conditional import 8 try: 9 from libcloud.storage.providers import get_driver, DRIVERS 10 from libcloud.storage.types import ContainerDoesNotExistError, ObjectDoesNotExistError 11 12 LIBCLOUD = True 13 except ImportError: 14 LIBCLOUD, DRIVERS = False, None 15 16 17 from .base import Cloud 18 19 20 class ObjectStorage(Cloud): 21 """ 22 Object storage cloud provider backed by Apache libcloud. 23 """ 24 25 @staticmethod 26 def isprovider(provider): 27 """ 28 Checks if this provider is an object storage provider. 29 30 Args: 31 provider: provider name 32 33 Returns: 34 True if this is an object storage provider 35 """ 36 37 return LIBCLOUD and provider and provider.lower() in [x.lower() for x in DRIVERS] 38 39 def __init__(self, config): 40 super().__init__(config) 41 42 if not LIBCLOUD: 43 raise ImportError('Cloud object storage is not available - install "cloud" extra to enable') 44 45 # Get driver for provider 46 driver = get_driver(config["provider"]) 47 48 # Get client connection 49 self.client = driver( 50 config.get("key", os.environ.get("ACCESS_KEY")), 51 config.get("secret", os.environ.get("ACCESS_SECRET")), 52 **{field: config.get(field) for field in ["host", "port", "region", "token"] if config.get(field)}, 53 ) 54 55 def metadata(self, path=None): 56 try: 57 # If this is an archive path, check if file exists 58 if self.isarchive(path): 59 return self.client.get_object(self.config["container"], self.objectname(path)) 60 61 # Otherwise check if container exists 62 return self.client.get_container(self.config["container"]) 63 except (ContainerDoesNotExistError, ObjectDoesNotExistError): 64 return None 65 66 def load(self, path=None): 67 # Download archive file 68 if self.isarchive(path): 69 obj = self.client.get_object(self.config["container"], self.objectname(path)) 70 71 # Create local directory, if necessary 72 directory = os.path.dirname(path) 73 if directory: 74 os.makedirs(directory, exist_ok=True) 75 76 obj.download(path, overwrite_existing=True) 77 78 # Download files in container. Optionally filter with a provided prefix. 79 else: 80 container = self.client.get_container(self.config["container"]) 81 for obj in container.list_objects(prefix=self.config.get("prefix")): 82 # Derive local path and directory 83 localpath = os.path.join(path, obj.name) 84 directory = os.path.dirname(localpath) 85 86 # Create local directory, if necessary 87 os.makedirs(directory, exist_ok=True) 88 89 # Download file locally 90 obj.download(localpath, overwrite_existing=True) 91 92 return path 93 94 def save(self, path): 95 # Get or create container 96 try: 97 container = self.client.get_container(self.config["container"]) 98 except ContainerDoesNotExistError: 99 container = self.client.create_container(self.config["container"]) 100 101 # Upload files 102 for f in self.listfiles(path): 103 with open(f, "rb") as iterator: 104 self.client.upload_object_via_stream(iterator=iterator, container=container, object_name=self.objectname(f)) 105 106 def objectname(self, name): 107 """ 108 Derives an object name. This method checks if a prefix configuration parameter is present and combines 109 it with the input name parameter. 110 111 Args: 112 name: input name 113 114 Returns: 115 object name 116 """ 117 118 # Get base name 119 name = os.path.basename(name) 120 121 # Get optional prefix/folder 122 prefix = self.config.get("prefix") 123 124 # Prepend prefix, if applicable 125 return f"{prefix}/{name}" if prefix else name