/ mlflow / azure / client.py
client.py
  1  """
  2  This module provides utilities for performing Azure Blob Storage operations without requiring
  3  the heavyweight azure-storage-blob library dependency
  4  """
  5  
  6  import logging
  7  import urllib
  8  from copy import deepcopy
  9  
 10  from mlflow.utils import rest_utils
 11  from mlflow.utils.file_utils import read_chunk
 12  
 13  _logger = logging.getLogger(__name__)
 14  _PUT_BLOCK_HEADERS = {
 15      "x-ms-blob-type": "BlockBlob",
 16  }
 17  
 18  
 19  def put_adls_file_creation(sas_url, headers):
 20      """Performs an ADLS Azure file create `Put` operation
 21      (https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create)
 22  
 23      Args:
 24          sas_url: A shared access signature URL referring to the Azure ADLS server
 25              to which the file creation command should be issued.
 26          headers: Additional headers to include in the Put request body.
 27      """
 28      request_url = _append_query_parameters(sas_url, {"resource": "file"})
 29  
 30      request_headers = {}
 31      for name, value in headers.items():
 32          if _is_valid_adls_put_header(name):
 33              request_headers[name] = value
 34          else:
 35              _logger.debug("Removed unsupported '%s' header for ADLS Gen2 Put operation", name)
 36  
 37      with rest_utils.cloud_storage_http_request(
 38          "put", request_url, headers=request_headers
 39      ) as response:
 40          rest_utils.augmented_raise_for_status(response)
 41  
 42  
 43  def patch_adls_file_upload(sas_url, local_file, start_byte, size, position, headers, is_single):
 44      """
 45      Performs an ADLS Azure file create `Patch` operation
 46      (https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update)
 47  
 48      Args:
 49          sas_url: A shared access signature URL referring to the Azure ADLS server
 50              to which the file update command should be issued.
 51          local_file: The local file to upload
 52          start_byte: The starting byte of the local file to upload
 53          size: The number of bytes to upload
 54          position: Positional offset of the data in the Patch request
 55          headers: Additional headers to include in the Patch request body
 56          is_single: Whether this is the only patch operation for this file
 57      """
 58      new_params = {"action": "append", "position": str(position)}
 59      if is_single:
 60          new_params["flush"] = "true"
 61      request_url = _append_query_parameters(sas_url, new_params)
 62  
 63      request_headers = {}
 64      for name, value in headers.items():
 65          if _is_valid_adls_patch_header(name):
 66              request_headers[name] = value
 67          else:
 68              _logger.debug("Removed unsupported '%s' header for ADLS Gen2 Patch operation", name)
 69  
 70      data = read_chunk(local_file, size, start_byte)
 71      with rest_utils.cloud_storage_http_request(
 72          "patch", request_url, data=data, headers=request_headers
 73      ) as response:
 74          rest_utils.augmented_raise_for_status(response)
 75  
 76  
 77  def patch_adls_flush(sas_url, position, headers):
 78      """Performs an ADLS Azure file flush `Patch` operation
 79      (https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update)
 80  
 81      Args:
 82          sas_url: A shared access signature URL referring to the Azure ADLS server
 83              to which the file update command should be issued.
 84          position: The final size of the file to flush.
 85          headers: Additional headers to include in the Patch request body.
 86  
 87      """
 88      request_url = _append_query_parameters(sas_url, {"action": "flush", "position": str(position)})
 89  
 90      request_headers = {}
 91      for name, value in headers.items():
 92          if _is_valid_adls_put_header(name):
 93              request_headers[name] = value
 94          else:
 95              _logger.debug("Removed unsupported '%s' header for ADLS Gen2 Patch operation", name)
 96  
 97      with rest_utils.cloud_storage_http_request(
 98          "patch", request_url, headers=request_headers
 99      ) as response:
100          rest_utils.augmented_raise_for_status(response)
101  
102  
103  def put_block(sas_url, block_id, data, headers):
104      """
105      Performs an Azure `Put Block` operation
106      (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
107  
108      Args:
109          sas_url: A shared access signature URL referring to the Azure Block Blob
110              to which the specified data should be staged.
111          block_id: A base64-encoded string identifying the block.
112          data: Data to include in the Put Block request body.
113          headers: Additional headers to include in the Put Block request body
114              (the `x-ms-blob-type` header is always included automatically).
115      """
116      request_url = _append_query_parameters(sas_url, {"comp": "block", "blockid": block_id})
117  
118      request_headers = deepcopy(_PUT_BLOCK_HEADERS)
119      for name, value in headers.items():
120          if _is_valid_put_block_header(name):
121              request_headers[name] = value
122          else:
123              _logger.debug("Removed unsupported '%s' header for Put Block operation", name)
124  
125      with rest_utils.cloud_storage_http_request(
126          "put", request_url, data=data, headers=request_headers
127      ) as response:
128          rest_utils.augmented_raise_for_status(response)
129  
130  
131  def put_block_list(sas_url, block_list, headers):
132      """Performs an Azure `Put Block List` operation
133      (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
134  
135      Args:
136          sas_url: A shared access signature URL referring to the Azure Block Blob
137              to which the specified data should be staged.
138          block_list: A list of uncommitted base64-encoded string block IDs to commit. For
139              more information, see
140              https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list.
141          headers: Headers to include in the Put Block request body.
142  
143      """
144      request_url = _append_query_parameters(sas_url, {"comp": "blocklist"})
145      data = _build_block_list_xml(block_list)
146  
147      request_headers = {}
148      for name, value in headers.items():
149          if _is_valid_put_block_list_header(name):
150              request_headers[name] = value
151          else:
152              _logger.debug("Removed unsupported '%s' header for Put Block List operation", name)
153  
154      with rest_utils.cloud_storage_http_request(
155          "put", request_url, data=data, headers=request_headers
156      ) as response:
157          rest_utils.augmented_raise_for_status(response)
158  
159  
160  def _append_query_parameters(url, parameters):
161      parsed_url = urllib.parse.urlparse(url)
162      query_dict = dict(urllib.parse.parse_qsl(parsed_url.query))
163      query_dict.update(parameters)
164      new_query = urllib.parse.urlencode(query_dict)
165      new_url_components = parsed_url._replace(query=new_query)
166      return urllib.parse.urlunparse(new_url_components)
167  
168  
169  def _build_block_list_xml(block_list):
170      xml = '<?xml version="1.0" encoding="utf-8"?>\n<BlockList>\n'
171      for block_id in block_list:
172          # Because block IDs are base64-encoded and base64 strings do not contain
173          # XML special characters, we can safely insert the block ID directly into
174          # the XML document
175          xml += f"<Uncommitted>{block_id}</Uncommitted>\n"
176      xml += "</BlockList>"
177      return xml
178  
179  
180  def _is_valid_put_block_list_header(header_name):
181      """
182      Returns:
183          True if the specified header name is a valid header for the Put Block List operation,
184          False otherwise. For a list of valid headers, see https://docs.microsoft.com/en-us/
185          rest/api/storageservices/put-block-list#request-headers and https://docs.microsoft.com/
186          en-us/rest/api/storageservices/
187          specifying-conditional-headers-for-blob-service-operations#Subheading1.
188      """
189      return header_name.startswith("x-ms-meta-") or header_name in {
190          "Authorization",
191          "Date",
192          "x-ms-date",
193          "x-ms-version",
194          "Content-Length",
195          "Content-MD5",
196          "x-ms-content-crc64",
197          "x-ms-blob-cache-control",
198          "x-ms-blob-content-type",
199          "x-ms-blob-content-encoding",
200          "x-ms-blob-content-language",
201          "x-ms-blob-content-md5",
202          "x-ms-encryption-scope",
203          "x-ms-tags",
204          "x-ms-lease-id",
205          "x-ms-client-request-id",
206          "x-ms-blob-content-disposition",
207          "x-ms-access-tier",
208          "If-Modified-Since",
209          "If-Unmodified-Since",
210          "If-Match",
211          "If-None-Match",
212      }
213  
214  
215  def _is_valid_put_block_header(header_name):
216      """
217      Returns:
218          True if the specified header name is a valid header for the Put Block operation, False
219          otherwise. For a list of valid headers, see
220          https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#request-headers and
221          https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#
222          request-headers-customer-provided-encryption-keys.
223      """
224      return header_name in {
225          "Authorization",
226          "x-ms-date",
227          "x-ms-version",
228          "Content-Length",
229          "Content-MD5",
230          "x-ms-content-crc64",
231          "x-ms-encryption-scope",
232          "x-ms-lease-id",
233          "x-ms-client-request-id",
234          "x-ms-encryption-key",
235          "x-ms-encryption-key-sha256",
236          "x-ms-encryption-algorithm",
237      }
238  
239  
240  def _is_valid_adls_put_header(header_name):
241      """
242      Returns:
243          True if the specified header name is a valid header for the ADLS Put operation, False
244          otherwise. For a list of valid headers, see
245          https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
246      """
247      return header_name in {
248          "Cache-Control",
249          "Content-Encoding",
250          "Content-Language",
251          "Content-Disposition",
252          "x-ms-cache-control",
253          "x-ms-content-type",
254          "x-ms-content-encoding",
255          "x-ms-content-language",
256          "x-ms-content-disposition",
257          "x-ms-rename-source",
258          "x-ms-lease-id",
259          "x-ms-properties",
260          "x-ms-permissions",
261          "x-ms-umask",
262          "x-ms-owner",
263          "x-ms-group",
264          "x-ms-acl",
265          "x-ms-proposed-lease-id",
266          "x-ms-expiry-option",
267          "x-ms-expiry-time",
268          "If-Match",
269          "If-None-Match",
270          "If-Modified-Since",
271          "If-Unmodified-Since",
272          "x-ms-source-if-match",
273          "x-ms-source-if-none-match",
274          "x-ms-source-if-modified-since",
275          "x-ms-source-if-unmodified-since",
276          "x-ms-encryption-key",
277          "x-ms-encryption-key-sha256",
278          "x-ms-encryption-algorithm",
279          "x-ms-encryption-context",
280          "x-ms-client-request-id",
281          "x-ms-date",
282          "x-ms-version",
283      }
284  
285  
286  def _is_valid_adls_patch_header(header_name):
287      """
288      Returns:
289          True if the specified header name is a valid header for the ADLS Patch operation, False
290          otherwise. For a list of valid headers, see
291          https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
292      """
293      return header_name in {
294          "Content-Length",
295          "Content-MD5",
296          "x-ms-lease-id",
297          "x-ms-cache-control",
298          "x-ms-content-type",
299          "x-ms-content-disposition",
300          "x-ms-content-encoding",
301          "x-ms-content-language",
302          "x-ms-content-md5",
303          "x-ms-properties",
304          "x-ms-owner",
305          "x-ms-group",
306          "x-ms-permissions",
307          "x-ms-acl",
308          "If-Match",
309          "If-None-Match",
310          "If-Modified-Since",
311          "If-Unmodified-Since",
312          "x-ms-encryption-key",
313          "x-ms-encryption-key-sha256",
314          "x-ms-encryption-algorithm",
315          "x-ms-encryption-context",
316          "x-ms-client-request-id",
317          "x-ms-date",
318          "x-ms-version",
319      }