/ mlflow / data / evaluation_dataset.py
evaluation_dataset.py
  1  import hashlib
  2  import json
  3  import logging
  4  import math
  5  import struct
  6  import sys
  7  
  8  from packaging.version import Version
  9  
 10  import mlflow
 11  from mlflow.entities import RunTag
 12  from mlflow.exceptions import MlflowException
 13  from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
 14  from mlflow.utils.string_utils import generate_feature_name_if_not_string
 15  
 16  try:
 17      # `numpy` and `pandas` are not required for `mlflow-skinny`.
 18      import numpy as np
 19      import pandas as pd
 20  except ImportError:
 21      pass
 22  
 23  _logger = logging.getLogger(__name__)
 24  
 25  
 26  def _hash_uint64_ndarray_as_bytes(array):
 27      assert len(array.shape) == 1
 28      # see struct pack format string https://docs.python.org/3/library/struct.html#format-strings
 29      return struct.pack(f">{array.size}Q", *array)
 30  
 31  
 32  def _is_empty_list_or_array(data):
 33      if isinstance(data, list):
 34          return len(data) == 0
 35      elif isinstance(data, np.ndarray):
 36          return data.size == 0
 37      return False
 38  
 39  
 40  def _is_array_has_dict(nd_array):
 41      if _is_empty_list_or_array(nd_array):
 42          return False
 43  
 44      # It is less likely the array or list contains heterogeneous elements, so just checking the
 45      # first element to avoid performance overhead.
 46      elm = nd_array.item(0)
 47      if isinstance(elm, (list, np.ndarray)):
 48          return _is_array_has_dict(elm)
 49      elif isinstance(elm, dict):
 50          return True
 51  
 52      return False
 53  
 54  
 55  def _hash_array_of_dict_as_bytes(data):
 56      # NB: If an array or list contains dictionary element, it can't be hashed with
 57      # pandas.util.hash_array. Hence we need to manually hash the elements here. This is
 58      # particularly for the LLM use case where the input can be a list of dictionary
 59      # (chat/completion payloads), so doesn't handle more complex case like nested lists.
 60      result = b""
 61      for elm in data:
 62          if isinstance(elm, (list, np.ndarray)):
 63              result += _hash_array_of_dict_as_bytes(elm)
 64          elif isinstance(elm, dict):
 65              result += _hash_dict_as_bytes(elm)
 66          else:
 67              result += _hash_data_as_bytes(elm)
 68      return result
 69  
 70  
 71  def _hash_ndarray_as_bytes(nd_array):
 72      if not isinstance(nd_array, np.ndarray):
 73          nd_array = np.array(nd_array)
 74  
 75      if _is_array_has_dict(nd_array):
 76          return _hash_array_of_dict_as_bytes(nd_array)
 77  
 78      return _hash_uint64_ndarray_as_bytes(
 79          pd.util.hash_array(nd_array.flatten(order="C"))
 80      ) + _hash_uint64_ndarray_as_bytes(np.array(nd_array.shape, dtype="uint64"))
 81  
 82  
 83  def _hash_data_as_bytes(data):
 84      try:
 85          if isinstance(data, (list, np.ndarray)):
 86              return _hash_ndarray_as_bytes(data)
 87          if isinstance(data, dict):
 88              return _hash_dict_as_bytes(data)
 89          if np.isscalar(data):
 90              return _hash_uint64_ndarray_as_bytes(pd.util.hash_array(np.array([data])))
 91      except Exception:
 92          pass
 93      # Skip unsupported types by returning an empty byte string
 94      return b""
 95  
 96  
 97  def _hash_dict_as_bytes(data_dict):
 98      result = _hash_ndarray_as_bytes(list(data_dict.keys()))
 99      try:
100          result += _hash_ndarray_as_bytes(list(data_dict.values()))
101      # If the values containing non-hashable objects, we will hash the values recursively.
102      except Exception:
103          for value in data_dict.values():
104              result += _hash_data_as_bytes(value)
105      return result
106  
107  
108  def _hash_array_like_obj_as_bytes(data):
109      """
110      Helper method to convert pandas dataframe/numpy array/list into bytes for
111      MD5 calculation purpose.
112      """
113      if isinstance(data, pd.DataFrame):
114          # add checking `'pyspark' in sys.modules` to avoid importing pyspark when user
115          # run code not related to pyspark.
116          if "pyspark" in sys.modules:
117              from pyspark.ml.linalg import Vector as spark_vector_type
118          else:
119              spark_vector_type = None
120  
121          def _hash_array_like_element_as_bytes(v):
122              if spark_vector_type is not None:
123                  if isinstance(v, spark_vector_type):
124                      return _hash_ndarray_as_bytes(v.toArray())
125              if isinstance(v, (dict, list, np.ndarray)):
126                  return _hash_data_as_bytes(v)
127  
128              try:
129                  # Attempt to hash the value, if it fails, return an empty byte string
130                  pd.util.hash_array(np.array([v]))
131                  return v
132              except TypeError:
133                  return b""  # Skip unhashable types by returning an empty byte string
134  
135          if Version(pd.__version__) >= Version("2.1.0"):
136              data = data.map(_hash_array_like_element_as_bytes)
137          else:
138              data = data.applymap(_hash_array_like_element_as_bytes)
139          return _hash_uint64_ndarray_as_bytes(pd.util.hash_pandas_object(data))
140      elif isinstance(data, np.ndarray) and len(data) > 0 and isinstance(data[0], list):
141          # convert numpy array of lists into numpy array of the string representation of the lists
142          # because lists are not hashable
143          hashable = np.array(str(val) for val in data)
144          return _hash_ndarray_as_bytes(hashable)
145      elif isinstance(data, np.ndarray) and len(data) > 0 and isinstance(data[0], np.ndarray):
146          # convert numpy array of numpy arrays into 2d numpy arrays
147          # because numpy array of numpy arrays are not hashable
148          hashable = np.array(data.tolist())
149          return _hash_ndarray_as_bytes(hashable)
150      elif isinstance(data, np.ndarray):
151          return _hash_ndarray_as_bytes(data)
152      elif isinstance(data, list):
153          return _hash_ndarray_as_bytes(np.array(data))
154      else:
155          raise ValueError("Unsupported data type.")
156  
157  
158  def _gen_md5_for_arraylike_obj(md5_gen, data):
159      """
160      Helper method to generate MD5 hash array-like object, the MD5 will calculate over:
161       - array length
162       - first NUM_SAMPLE_ROWS_FOR_HASH rows content
163       - last NUM_SAMPLE_ROWS_FOR_HASH rows content
164      """
165      len_bytes = _hash_uint64_ndarray_as_bytes(np.array([len(data)], dtype="uint64"))
166      md5_gen.update(len_bytes)
167      if len(data) < EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH * 2:
168          md5_gen.update(_hash_array_like_obj_as_bytes(data))
169      else:
170          if isinstance(data, pd.DataFrame):
171              # Access rows of pandas Df with iloc
172              head_rows = data.iloc[: EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH]
173              tail_rows = data.iloc[-EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH :]
174          else:
175              head_rows = data[: EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH]
176              tail_rows = data[-EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH :]
177          md5_gen.update(_hash_array_like_obj_as_bytes(head_rows))
178          md5_gen.update(_hash_array_like_obj_as_bytes(tail_rows))
179  
180  
181  def convert_data_to_mlflow_dataset(data, targets=None, predictions=None, name=None):
182      """Convert input data to mlflow dataset."""
183      supported_dataframe_types = [pd.DataFrame]
184      if "pyspark" in sys.modules:
185          from mlflow.utils.spark_utils import get_spark_dataframe_type
186  
187          spark_df_type = get_spark_dataframe_type()
188          supported_dataframe_types.append(spark_df_type)
189  
190      if predictions is not None:
191          _validate_dataset_type_supports_predictions(
192              data=data, supported_predictions_dataset_types=supported_dataframe_types
193          )
194  
195      if isinstance(data, list):
196          # If the list is flat, we assume each element is an independent sample.
197          if not isinstance(data[0], (list, np.ndarray)):
198              data = [[elm] for elm in data]
199  
200          return mlflow.data.from_numpy(
201              np.array(data), targets=np.array(targets) if targets else None, name=name
202          )
203      elif isinstance(data, np.ndarray):
204          return mlflow.data.from_numpy(data, targets=targets, name=name)
205      elif isinstance(data, pd.DataFrame):
206          return mlflow.data.from_pandas(df=data, targets=targets, predictions=predictions, name=name)
207      elif "pyspark" in sys.modules and isinstance(data, spark_df_type):
208          return mlflow.data.from_spark(df=data, targets=targets, predictions=predictions, name=name)
209      else:
210          # Cannot convert to mlflow dataset, return original data.
211          _logger.info(
212              "Cannot convert input data to `evaluate()` to an mlflow dataset, input must be a list, "
213              f"a numpy array, a panda Dataframe or a spark Dataframe, but received {type(data)}."
214          )
215          return data
216  
217  
218  def _validate_dataset_type_supports_predictions(data, supported_predictions_dataset_types):
219      """
220      Validate that the dataset type supports a user-specified "predictions" column.
221      """
222      if not any(isinstance(data, sdt) for sdt in supported_predictions_dataset_types):
223          raise MlflowException(
224              message=(
225                  "If predictions is specified, data must be one of the following types, or an"
226                  " MLflow Dataset that represents one of the following types:"
227                  f" {supported_predictions_dataset_types}."
228              ),
229              error_code=INVALID_PARAMETER_VALUE,
230          )
231  
232  
233  class EvaluationDataset:
234      """
235      An input dataset for model evaluation. This is intended for use with the
236      :py:func:`mlflow.models.evaluate()`
237      API.
238      """
239  
240      NUM_SAMPLE_ROWS_FOR_HASH = 5
241      SPARK_DATAFRAME_LIMIT = 10000
242  
243      def __init__(
244          self,
245          data,
246          *,
247          targets=None,
248          name=None,
249          path=None,
250          feature_names=None,
251          predictions=None,
252          digest=None,
253      ):
254          """
255          The values of the constructor arguments comes from the `evaluate` call.
256          """
257          if name is not None and '"' in name:
258              raise MlflowException(
259                  message=f'Dataset name cannot include a double quote (") but got {name}',
260                  error_code=INVALID_PARAMETER_VALUE,
261              )
262          if path is not None and '"' in path:
263              raise MlflowException(
264                  message=f'Dataset path cannot include a double quote (") but got {path}',
265                  error_code=INVALID_PARAMETER_VALUE,
266              )
267  
268          self._user_specified_name = name
269          self._path = path
270          self._hash = None
271          self._supported_dataframe_types = (pd.DataFrame,)
272          self._spark_df_type = None
273          self._labels_data = None
274          self._targets_name = None
275          self._has_targets = False
276          self._predictions_data = None
277          self._predictions_name = None
278          self._has_predictions = predictions is not None
279          self._digest = digest
280  
281          try:
282              # add checking `'pyspark' in sys.modules` to avoid importing pyspark when user
283              # run code not related to pyspark.
284              if "pyspark" in sys.modules:
285                  from mlflow.utils.spark_utils import get_spark_dataframe_type
286  
287                  spark_df_type = get_spark_dataframe_type()
288                  self._supported_dataframe_types = (pd.DataFrame, spark_df_type)
289                  self._spark_df_type = spark_df_type
290          except ImportError:
291              pass
292  
293          if feature_names is not None and len(set(feature_names)) < len(list(feature_names)):
294              raise MlflowException(
295                  message="`feature_names` argument must be a list containing unique feature names.",
296                  error_code=INVALID_PARAMETER_VALUE,
297              )
298  
299          if self._has_predictions:
300              _validate_dataset_type_supports_predictions(
301                  data=data,
302                  supported_predictions_dataset_types=self._supported_dataframe_types,
303              )
304  
305          has_targets = targets is not None
306          if has_targets:
307              self._has_targets = True
308          if isinstance(data, (np.ndarray, list)):
309              if has_targets and not isinstance(targets, (np.ndarray, list)):
310                  raise MlflowException(
311                      message="If data is a numpy array or list of evaluation features, "
312                      "`targets` argument must be a numpy array or list of evaluation labels.",
313                      error_code=INVALID_PARAMETER_VALUE,
314                  )
315  
316              shape_message = (
317                  "If the `data` argument is a numpy array, it must be a 2-dimensional "
318                  "array, with the second dimension representing the number of features. If the "
319                  "`data` argument is a list, each of its elements must be a feature array of "
320                  "the numpy array or list, and all elements must have the same length."
321              )
322  
323              if isinstance(data, list):
324                  try:
325                      data = np.array(data)
326                  except ValueError as e:
327                      raise MlflowException(
328                          message=shape_message, error_code=INVALID_PARAMETER_VALUE
329                      ) from e
330  
331              if len(data.shape) != 2:
332                  raise MlflowException(
333                      message=shape_message,
334                      error_code=INVALID_PARAMETER_VALUE,
335                  )
336  
337              self._features_data = data
338              if has_targets:
339                  self._labels_data = (
340                      targets if isinstance(targets, np.ndarray) else np.array(targets)
341                  )
342  
343                  if len(self._features_data) != len(self._labels_data):
344                      raise MlflowException(
345                          message="The input features example rows must be the same length "
346                          "with labels array.",
347                          error_code=INVALID_PARAMETER_VALUE,
348                      )
349  
350              num_features = data.shape[1]
351  
352              if feature_names is not None:
353                  feature_names = list(feature_names)
354                  if num_features != len(feature_names):
355                      raise MlflowException(
356                          message="feature name list must be the same length with feature data.",
357                          error_code=INVALID_PARAMETER_VALUE,
358                      )
359                  self._feature_names = feature_names
360              else:
361                  self._feature_names = [
362                      f"feature_{str(i + 1).zfill(math.ceil(math.log10(num_features + 1)))}"
363                      for i in range(num_features)
364                  ]
365          elif isinstance(data, self._supported_dataframe_types):
366              if has_targets and not isinstance(targets, str):
367                  raise MlflowException(
368                      message="If data is a Pandas DataFrame or Spark DataFrame, `targets` argument "
369                      "must be the name of the column which contains evaluation labels in the `data` "
370                      "dataframe.",
371                      error_code=INVALID_PARAMETER_VALUE,
372                  )
373              if self._spark_df_type and isinstance(data, self._spark_df_type):
374                  if data.count() > EvaluationDataset.SPARK_DATAFRAME_LIMIT:
375                      _logger.warning(
376                          "Specified Spark DataFrame is too large for model evaluation. Only "
377                          f"the first {EvaluationDataset.SPARK_DATAFRAME_LIMIT} rows will be used. "
378                          "If you want evaluate on the whole spark dataframe, please manually call "
379                          "`spark_dataframe.toPandas()`."
380                      )
381                  data = data.limit(EvaluationDataset.SPARK_DATAFRAME_LIMIT).toPandas()
382  
383              if has_targets:
384                  self._labels_data = data[targets].to_numpy()
385                  self._targets_name = targets
386  
387              if self._has_predictions:
388                  self._predictions_data = data[predictions].to_numpy()
389                  self._predictions_name = predictions
390  
391              if feature_names is not None:
392                  self._features_data = data[list(feature_names)]
393                  self._feature_names = feature_names
394              else:
395                  features_data = data
396  
397                  if has_targets:
398                      features_data = features_data.drop(targets, axis=1, inplace=False)
399  
400                  if self._has_predictions:
401                      features_data = features_data.drop(predictions, axis=1, inplace=False)
402  
403                  self._features_data = features_data
404                  self._feature_names = [
405                      generate_feature_name_if_not_string(c) for c in self._features_data.columns
406                  ]
407          else:
408              raise MlflowException(
409                  message="The data argument must be a numpy array, a list or a Pandas DataFrame, or "
410                  "spark DataFrame if pyspark package installed.",
411                  error_code=INVALID_PARAMETER_VALUE,
412              )
413  
414          # generate dataset hash
415          md5_gen = hashlib.md5(usedforsecurity=False)
416          _gen_md5_for_arraylike_obj(md5_gen, self._features_data)
417          if self._labels_data is not None:
418              _gen_md5_for_arraylike_obj(md5_gen, self._labels_data)
419          if self._predictions_data is not None:
420              _gen_md5_for_arraylike_obj(md5_gen, self._predictions_data)
421          md5_gen.update(",".join(list(map(str, self._feature_names))).encode("UTF-8"))
422  
423          self._hash = md5_gen.hexdigest()
424  
425      @property
426      def feature_names(self):
427          return self._feature_names
428  
429      @property
430      def features_data(self):
431          """
432          return features data as a numpy array or a pandas DataFrame.
433          """
434          return self._features_data
435  
436      @property
437      def labels_data(self):
438          """
439          return labels data as a numpy array
440          """
441          return self._labels_data
442  
443      @property
444      def has_targets(self):
445          """
446          Returns True if the dataset has targets, False otherwise.
447          """
448          return self._has_targets
449  
450      @property
451      def targets_name(self):
452          """
453          return targets name
454          """
455          return self._targets_name
456  
457      @property
458      def predictions_data(self):
459          """
460          return labels data as a numpy array
461          """
462          return self._predictions_data
463  
464      @property
465      def has_predictions(self):
466          """
467          Returns True if the dataset has targets, False otherwise.
468          """
469          return self._has_predictions
470  
471      @property
472      def predictions_name(self):
473          """
474          return predictions name
475          """
476          return self._predictions_name
477  
478      @property
479      def name(self):
480          """
481          Dataset name, which is specified dataset name or the dataset hash if user don't specify
482          name.
483          """
484          return self._user_specified_name if self._user_specified_name is not None else self.hash
485  
486      @property
487      def path(self):
488          """
489          Dataset path
490          """
491          return self._path
492  
493      @property
494      def hash(self):
495          """
496          Dataset hash, includes hash on first 20 rows and last 20 rows.
497          """
498          return self._hash
499  
500      @property
501      def _metadata(self):
502          """
503          Return dataset metadata containing name, hash, and optional path.
504          """
505          metadata = {
506              "name": self.name,
507              "hash": self.hash,
508          }
509          if self.path is not None:
510              metadata["path"] = self.path
511          return metadata
512  
513      @property
514      def digest(self):
515          """
516          Return the digest of the dataset.
517          """
518          return self._digest
519  
520      def _log_dataset_tag(self, client, run_id, model_uuid):
521          """
522          Log dataset metadata as a tag "mlflow.datasets", if the tag already exists, it will
523          append current dataset metadata into existing tag content.
524          """
525          existing_dataset_metadata_str = client.get_run(run_id).data.tags.get(
526              "mlflow.datasets", "[]"
527          )
528          dataset_metadata_list = json.loads(existing_dataset_metadata_str)
529  
530          for metadata in dataset_metadata_list:
531              if (
532                  metadata["hash"] == self.hash
533                  and metadata["name"] == self.name
534                  and metadata["model"] == model_uuid
535              ):
536                  break
537          else:
538              dataset_metadata_list.append({**self._metadata, "model": model_uuid})
539  
540          dataset_metadata_str = json.dumps(dataset_metadata_list, separators=(",", ":"))
541          client.log_batch(
542              run_id,
543              tags=[RunTag("mlflow.datasets", dataset_metadata_str)],
544          )
545  
546      def __hash__(self):
547          return hash(self.hash)
548  
549      def __eq__(self, other):
550          if not isinstance(other, EvaluationDataset):
551              return False
552  
553          if isinstance(self._features_data, np.ndarray):
554              is_features_data_equal = np.array_equal(self._features_data, other._features_data)
555          else:
556              is_features_data_equal = self._features_data.equals(other._features_data)
557  
558          return (
559              is_features_data_equal
560              and np.array_equal(self._labels_data, other._labels_data)
561              and self.name == other.name
562              and self.path == other.path
563              and self._feature_names == other._feature_names
564          )