client.py
1 """ 2 This module provides utilities for performing Azure Blob Storage operations without requiring 3 the heavyweight azure-storage-blob library dependency 4 """ 5 6 import logging 7 import urllib 8 from copy import deepcopy 9 10 from mlflow.utils import rest_utils 11 from mlflow.utils.file_utils import read_chunk 12 13 _logger = logging.getLogger(__name__) 14 _PUT_BLOCK_HEADERS = { 15 "x-ms-blob-type": "BlockBlob", 16 } 17 18 19 def put_adls_file_creation(sas_url, headers): 20 """Performs an ADLS Azure file create `Put` operation 21 (https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create) 22 23 Args: 24 sas_url: A shared access signature URL referring to the Azure ADLS server 25 to which the file creation command should be issued. 26 headers: Additional headers to include in the Put request body. 27 """ 28 request_url = _append_query_parameters(sas_url, {"resource": "file"}) 29 30 request_headers = {} 31 for name, value in headers.items(): 32 if _is_valid_adls_put_header(name): 33 request_headers[name] = value 34 else: 35 _logger.debug("Removed unsupported '%s' header for ADLS Gen2 Put operation", name) 36 37 with rest_utils.cloud_storage_http_request( 38 "put", request_url, headers=request_headers 39 ) as response: 40 rest_utils.augmented_raise_for_status(response) 41 42 43 def patch_adls_file_upload(sas_url, local_file, start_byte, size, position, headers, is_single): 44 """ 45 Performs an ADLS Azure file create `Patch` operation 46 (https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update) 47 48 Args: 49 sas_url: A shared access signature URL referring to the Azure ADLS server 50 to which the file update command should be issued. 51 local_file: The local file to upload 52 start_byte: The starting byte of the local file to upload 53 size: The number of bytes to upload 54 position: Positional offset of the data in the Patch request 55 headers: Additional headers to include in the Patch request body 56 is_single: Whether this is the only patch operation for this file 57 """ 58 new_params = {"action": "append", "position": str(position)} 59 if is_single: 60 new_params["flush"] = "true" 61 request_url = _append_query_parameters(sas_url, new_params) 62 63 request_headers = {} 64 for name, value in headers.items(): 65 if _is_valid_adls_patch_header(name): 66 request_headers[name] = value 67 else: 68 _logger.debug("Removed unsupported '%s' header for ADLS Gen2 Patch operation", name) 69 70 data = read_chunk(local_file, size, start_byte) 71 with rest_utils.cloud_storage_http_request( 72 "patch", request_url, data=data, headers=request_headers 73 ) as response: 74 rest_utils.augmented_raise_for_status(response) 75 76 77 def patch_adls_flush(sas_url, position, headers): 78 """Performs an ADLS Azure file flush `Patch` operation 79 (https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update) 80 81 Args: 82 sas_url: A shared access signature URL referring to the Azure ADLS server 83 to which the file update command should be issued. 84 position: The final size of the file to flush. 85 headers: Additional headers to include in the Patch request body. 86 87 """ 88 request_url = _append_query_parameters(sas_url, {"action": "flush", "position": str(position)}) 89 90 request_headers = {} 91 for name, value in headers.items(): 92 if _is_valid_adls_put_header(name): 93 request_headers[name] = value 94 else: 95 _logger.debug("Removed unsupported '%s' header for ADLS Gen2 Patch operation", name) 96 97 with rest_utils.cloud_storage_http_request( 98 "patch", request_url, headers=request_headers 99 ) as response: 100 rest_utils.augmented_raise_for_status(response) 101 102 103 def put_block(sas_url, block_id, data, headers): 104 """ 105 Performs an Azure `Put Block` operation 106 (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block) 107 108 Args: 109 sas_url: A shared access signature URL referring to the Azure Block Blob 110 to which the specified data should be staged. 111 block_id: A base64-encoded string identifying the block. 112 data: Data to include in the Put Block request body. 113 headers: Additional headers to include in the Put Block request body 114 (the `x-ms-blob-type` header is always included automatically). 115 """ 116 request_url = _append_query_parameters(sas_url, {"comp": "block", "blockid": block_id}) 117 118 request_headers = deepcopy(_PUT_BLOCK_HEADERS) 119 for name, value in headers.items(): 120 if _is_valid_put_block_header(name): 121 request_headers[name] = value 122 else: 123 _logger.debug("Removed unsupported '%s' header for Put Block operation", name) 124 125 with rest_utils.cloud_storage_http_request( 126 "put", request_url, data=data, headers=request_headers 127 ) as response: 128 rest_utils.augmented_raise_for_status(response) 129 130 131 def put_block_list(sas_url, block_list, headers): 132 """Performs an Azure `Put Block List` operation 133 (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list) 134 135 Args: 136 sas_url: A shared access signature URL referring to the Azure Block Blob 137 to which the specified data should be staged. 138 block_list: A list of uncommitted base64-encoded string block IDs to commit. For 139 more information, see 140 https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list. 141 headers: Headers to include in the Put Block request body. 142 143 """ 144 request_url = _append_query_parameters(sas_url, {"comp": "blocklist"}) 145 data = _build_block_list_xml(block_list) 146 147 request_headers = {} 148 for name, value in headers.items(): 149 if _is_valid_put_block_list_header(name): 150 request_headers[name] = value 151 else: 152 _logger.debug("Removed unsupported '%s' header for Put Block List operation", name) 153 154 with rest_utils.cloud_storage_http_request( 155 "put", request_url, data=data, headers=request_headers 156 ) as response: 157 rest_utils.augmented_raise_for_status(response) 158 159 160 def _append_query_parameters(url, parameters): 161 parsed_url = urllib.parse.urlparse(url) 162 query_dict = dict(urllib.parse.parse_qsl(parsed_url.query)) 163 query_dict.update(parameters) 164 new_query = urllib.parse.urlencode(query_dict) 165 new_url_components = parsed_url._replace(query=new_query) 166 return urllib.parse.urlunparse(new_url_components) 167 168 169 def _build_block_list_xml(block_list): 170 xml = '<?xml version="1.0" encoding="utf-8"?>\n<BlockList>\n' 171 for block_id in block_list: 172 # Because block IDs are base64-encoded and base64 strings do not contain 173 # XML special characters, we can safely insert the block ID directly into 174 # the XML document 175 xml += f"<Uncommitted>{block_id}</Uncommitted>\n" 176 xml += "</BlockList>" 177 return xml 178 179 180 def _is_valid_put_block_list_header(header_name): 181 """ 182 Returns: 183 True if the specified header name is a valid header for the Put Block List operation, 184 False otherwise. For a list of valid headers, see https://docs.microsoft.com/en-us/ 185 rest/api/storageservices/put-block-list#request-headers and https://docs.microsoft.com/ 186 en-us/rest/api/storageservices/ 187 specifying-conditional-headers-for-blob-service-operations#Subheading1. 188 """ 189 return header_name.startswith("x-ms-meta-") or header_name in { 190 "Authorization", 191 "Date", 192 "x-ms-date", 193 "x-ms-version", 194 "Content-Length", 195 "Content-MD5", 196 "x-ms-content-crc64", 197 "x-ms-blob-cache-control", 198 "x-ms-blob-content-type", 199 "x-ms-blob-content-encoding", 200 "x-ms-blob-content-language", 201 "x-ms-blob-content-md5", 202 "x-ms-encryption-scope", 203 "x-ms-tags", 204 "x-ms-lease-id", 205 "x-ms-client-request-id", 206 "x-ms-blob-content-disposition", 207 "x-ms-access-tier", 208 "If-Modified-Since", 209 "If-Unmodified-Since", 210 "If-Match", 211 "If-None-Match", 212 } 213 214 215 def _is_valid_put_block_header(header_name): 216 """ 217 Returns: 218 True if the specified header name is a valid header for the Put Block operation, False 219 otherwise. For a list of valid headers, see 220 https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#request-headers and 221 https://docs.microsoft.com/en-us/rest/api/storageservices/put-block# 222 request-headers-customer-provided-encryption-keys. 223 """ 224 return header_name in { 225 "Authorization", 226 "x-ms-date", 227 "x-ms-version", 228 "Content-Length", 229 "Content-MD5", 230 "x-ms-content-crc64", 231 "x-ms-encryption-scope", 232 "x-ms-lease-id", 233 "x-ms-client-request-id", 234 "x-ms-encryption-key", 235 "x-ms-encryption-key-sha256", 236 "x-ms-encryption-algorithm", 237 } 238 239 240 def _is_valid_adls_put_header(header_name): 241 """ 242 Returns: 243 True if the specified header name is a valid header for the ADLS Put operation, False 244 otherwise. For a list of valid headers, see 245 https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create 246 """ 247 return header_name in { 248 "Cache-Control", 249 "Content-Encoding", 250 "Content-Language", 251 "Content-Disposition", 252 "x-ms-cache-control", 253 "x-ms-content-type", 254 "x-ms-content-encoding", 255 "x-ms-content-language", 256 "x-ms-content-disposition", 257 "x-ms-rename-source", 258 "x-ms-lease-id", 259 "x-ms-properties", 260 "x-ms-permissions", 261 "x-ms-umask", 262 "x-ms-owner", 263 "x-ms-group", 264 "x-ms-acl", 265 "x-ms-proposed-lease-id", 266 "x-ms-expiry-option", 267 "x-ms-expiry-time", 268 "If-Match", 269 "If-None-Match", 270 "If-Modified-Since", 271 "If-Unmodified-Since", 272 "x-ms-source-if-match", 273 "x-ms-source-if-none-match", 274 "x-ms-source-if-modified-since", 275 "x-ms-source-if-unmodified-since", 276 "x-ms-encryption-key", 277 "x-ms-encryption-key-sha256", 278 "x-ms-encryption-algorithm", 279 "x-ms-encryption-context", 280 "x-ms-client-request-id", 281 "x-ms-date", 282 "x-ms-version", 283 } 284 285 286 def _is_valid_adls_patch_header(header_name): 287 """ 288 Returns: 289 True if the specified header name is a valid header for the ADLS Patch operation, False 290 otherwise. For a list of valid headers, see 291 https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update 292 """ 293 return header_name in { 294 "Content-Length", 295 "Content-MD5", 296 "x-ms-lease-id", 297 "x-ms-cache-control", 298 "x-ms-content-type", 299 "x-ms-content-disposition", 300 "x-ms-content-encoding", 301 "x-ms-content-language", 302 "x-ms-content-md5", 303 "x-ms-properties", 304 "x-ms-owner", 305 "x-ms-group", 306 "x-ms-permissions", 307 "x-ms-acl", 308 "If-Match", 309 "If-None-Match", 310 "If-Modified-Since", 311 "If-Unmodified-Since", 312 "x-ms-encryption-key", 313 "x-ms-encryption-key-sha256", 314 "x-ms-encryption-algorithm", 315 "x-ms-encryption-context", 316 "x-ms-client-request-id", 317 "x-ms-date", 318 "x-ms-version", 319 }