evaluation_dataset.py
1 import hashlib 2 import json 3 import logging 4 import math 5 import struct 6 import sys 7 8 from packaging.version import Version 9 10 import mlflow 11 from mlflow.entities import RunTag 12 from mlflow.exceptions import MlflowException 13 from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE 14 from mlflow.utils.string_utils import generate_feature_name_if_not_string 15 16 try: 17 # `numpy` and `pandas` are not required for `mlflow-skinny`. 18 import numpy as np 19 import pandas as pd 20 except ImportError: 21 pass 22 23 _logger = logging.getLogger(__name__) 24 25 26 def _hash_uint64_ndarray_as_bytes(array): 27 assert len(array.shape) == 1 28 # see struct pack format string https://docs.python.org/3/library/struct.html#format-strings 29 return struct.pack(f">{array.size}Q", *array) 30 31 32 def _is_empty_list_or_array(data): 33 if isinstance(data, list): 34 return len(data) == 0 35 elif isinstance(data, np.ndarray): 36 return data.size == 0 37 return False 38 39 40 def _is_array_has_dict(nd_array): 41 if _is_empty_list_or_array(nd_array): 42 return False 43 44 # It is less likely the array or list contains heterogeneous elements, so just checking the 45 # first element to avoid performance overhead. 46 elm = nd_array.item(0) 47 if isinstance(elm, (list, np.ndarray)): 48 return _is_array_has_dict(elm) 49 elif isinstance(elm, dict): 50 return True 51 52 return False 53 54 55 def _hash_array_of_dict_as_bytes(data): 56 # NB: If an array or list contains dictionary element, it can't be hashed with 57 # pandas.util.hash_array. Hence we need to manually hash the elements here. This is 58 # particularly for the LLM use case where the input can be a list of dictionary 59 # (chat/completion payloads), so doesn't handle more complex case like nested lists. 60 result = b"" 61 for elm in data: 62 if isinstance(elm, (list, np.ndarray)): 63 result += _hash_array_of_dict_as_bytes(elm) 64 elif isinstance(elm, dict): 65 result += _hash_dict_as_bytes(elm) 66 else: 67 result += _hash_data_as_bytes(elm) 68 return result 69 70 71 def _hash_ndarray_as_bytes(nd_array): 72 if not isinstance(nd_array, np.ndarray): 73 nd_array = np.array(nd_array) 74 75 if _is_array_has_dict(nd_array): 76 return _hash_array_of_dict_as_bytes(nd_array) 77 78 return _hash_uint64_ndarray_as_bytes( 79 pd.util.hash_array(nd_array.flatten(order="C")) 80 ) + _hash_uint64_ndarray_as_bytes(np.array(nd_array.shape, dtype="uint64")) 81 82 83 def _hash_data_as_bytes(data): 84 try: 85 if isinstance(data, (list, np.ndarray)): 86 return _hash_ndarray_as_bytes(data) 87 if isinstance(data, dict): 88 return _hash_dict_as_bytes(data) 89 if np.isscalar(data): 90 return _hash_uint64_ndarray_as_bytes(pd.util.hash_array(np.array([data]))) 91 except Exception: 92 pass 93 # Skip unsupported types by returning an empty byte string 94 return b"" 95 96 97 def _hash_dict_as_bytes(data_dict): 98 result = _hash_ndarray_as_bytes(list(data_dict.keys())) 99 try: 100 result += _hash_ndarray_as_bytes(list(data_dict.values())) 101 # If the values containing non-hashable objects, we will hash the values recursively. 102 except Exception: 103 for value in data_dict.values(): 104 result += _hash_data_as_bytes(value) 105 return result 106 107 108 def _hash_array_like_obj_as_bytes(data): 109 """ 110 Helper method to convert pandas dataframe/numpy array/list into bytes for 111 MD5 calculation purpose. 112 """ 113 if isinstance(data, pd.DataFrame): 114 # add checking `'pyspark' in sys.modules` to avoid importing pyspark when user 115 # run code not related to pyspark. 116 if "pyspark" in sys.modules: 117 from pyspark.ml.linalg import Vector as spark_vector_type 118 else: 119 spark_vector_type = None 120 121 def _hash_array_like_element_as_bytes(v): 122 if spark_vector_type is not None: 123 if isinstance(v, spark_vector_type): 124 return _hash_ndarray_as_bytes(v.toArray()) 125 if isinstance(v, (dict, list, np.ndarray)): 126 return _hash_data_as_bytes(v) 127 128 try: 129 # Attempt to hash the value, if it fails, return an empty byte string 130 pd.util.hash_array(np.array([v])) 131 return v 132 except TypeError: 133 return b"" # Skip unhashable types by returning an empty byte string 134 135 if Version(pd.__version__) >= Version("2.1.0"): 136 data = data.map(_hash_array_like_element_as_bytes) 137 else: 138 data = data.applymap(_hash_array_like_element_as_bytes) 139 return _hash_uint64_ndarray_as_bytes(pd.util.hash_pandas_object(data)) 140 elif isinstance(data, np.ndarray) and len(data) > 0 and isinstance(data[0], list): 141 # convert numpy array of lists into numpy array of the string representation of the lists 142 # because lists are not hashable 143 hashable = np.array(str(val) for val in data) 144 return _hash_ndarray_as_bytes(hashable) 145 elif isinstance(data, np.ndarray) and len(data) > 0 and isinstance(data[0], np.ndarray): 146 # convert numpy array of numpy arrays into 2d numpy arrays 147 # because numpy array of numpy arrays are not hashable 148 hashable = np.array(data.tolist()) 149 return _hash_ndarray_as_bytes(hashable) 150 elif isinstance(data, np.ndarray): 151 return _hash_ndarray_as_bytes(data) 152 elif isinstance(data, list): 153 return _hash_ndarray_as_bytes(np.array(data)) 154 else: 155 raise ValueError("Unsupported data type.") 156 157 158 def _gen_md5_for_arraylike_obj(md5_gen, data): 159 """ 160 Helper method to generate MD5 hash array-like object, the MD5 will calculate over: 161 - array length 162 - first NUM_SAMPLE_ROWS_FOR_HASH rows content 163 - last NUM_SAMPLE_ROWS_FOR_HASH rows content 164 """ 165 len_bytes = _hash_uint64_ndarray_as_bytes(np.array([len(data)], dtype="uint64")) 166 md5_gen.update(len_bytes) 167 if len(data) < EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH * 2: 168 md5_gen.update(_hash_array_like_obj_as_bytes(data)) 169 else: 170 if isinstance(data, pd.DataFrame): 171 # Access rows of pandas Df with iloc 172 head_rows = data.iloc[: EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH] 173 tail_rows = data.iloc[-EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH :] 174 else: 175 head_rows = data[: EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH] 176 tail_rows = data[-EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH :] 177 md5_gen.update(_hash_array_like_obj_as_bytes(head_rows)) 178 md5_gen.update(_hash_array_like_obj_as_bytes(tail_rows)) 179 180 181 def convert_data_to_mlflow_dataset(data, targets=None, predictions=None, name=None): 182 """Convert input data to mlflow dataset.""" 183 supported_dataframe_types = [pd.DataFrame] 184 if "pyspark" in sys.modules: 185 from mlflow.utils.spark_utils import get_spark_dataframe_type 186 187 spark_df_type = get_spark_dataframe_type() 188 supported_dataframe_types.append(spark_df_type) 189 190 if predictions is not None: 191 _validate_dataset_type_supports_predictions( 192 data=data, supported_predictions_dataset_types=supported_dataframe_types 193 ) 194 195 if isinstance(data, list): 196 # If the list is flat, we assume each element is an independent sample. 197 if not isinstance(data[0], (list, np.ndarray)): 198 data = [[elm] for elm in data] 199 200 return mlflow.data.from_numpy( 201 np.array(data), targets=np.array(targets) if targets else None, name=name 202 ) 203 elif isinstance(data, np.ndarray): 204 return mlflow.data.from_numpy(data, targets=targets, name=name) 205 elif isinstance(data, pd.DataFrame): 206 return mlflow.data.from_pandas(df=data, targets=targets, predictions=predictions, name=name) 207 elif "pyspark" in sys.modules and isinstance(data, spark_df_type): 208 return mlflow.data.from_spark(df=data, targets=targets, predictions=predictions, name=name) 209 else: 210 # Cannot convert to mlflow dataset, return original data. 211 _logger.info( 212 "Cannot convert input data to `evaluate()` to an mlflow dataset, input must be a list, " 213 f"a numpy array, a panda Dataframe or a spark Dataframe, but received {type(data)}." 214 ) 215 return data 216 217 218 def _validate_dataset_type_supports_predictions(data, supported_predictions_dataset_types): 219 """ 220 Validate that the dataset type supports a user-specified "predictions" column. 221 """ 222 if not any(isinstance(data, sdt) for sdt in supported_predictions_dataset_types): 223 raise MlflowException( 224 message=( 225 "If predictions is specified, data must be one of the following types, or an" 226 " MLflow Dataset that represents one of the following types:" 227 f" {supported_predictions_dataset_types}." 228 ), 229 error_code=INVALID_PARAMETER_VALUE, 230 ) 231 232 233 class EvaluationDataset: 234 """ 235 An input dataset for model evaluation. This is intended for use with the 236 :py:func:`mlflow.models.evaluate()` 237 API. 238 """ 239 240 NUM_SAMPLE_ROWS_FOR_HASH = 5 241 SPARK_DATAFRAME_LIMIT = 10000 242 243 def __init__( 244 self, 245 data, 246 *, 247 targets=None, 248 name=None, 249 path=None, 250 feature_names=None, 251 predictions=None, 252 digest=None, 253 ): 254 """ 255 The values of the constructor arguments comes from the `evaluate` call. 256 """ 257 if name is not None and '"' in name: 258 raise MlflowException( 259 message=f'Dataset name cannot include a double quote (") but got {name}', 260 error_code=INVALID_PARAMETER_VALUE, 261 ) 262 if path is not None and '"' in path: 263 raise MlflowException( 264 message=f'Dataset path cannot include a double quote (") but got {path}', 265 error_code=INVALID_PARAMETER_VALUE, 266 ) 267 268 self._user_specified_name = name 269 self._path = path 270 self._hash = None 271 self._supported_dataframe_types = (pd.DataFrame,) 272 self._spark_df_type = None 273 self._labels_data = None 274 self._targets_name = None 275 self._has_targets = False 276 self._predictions_data = None 277 self._predictions_name = None 278 self._has_predictions = predictions is not None 279 self._digest = digest 280 281 try: 282 # add checking `'pyspark' in sys.modules` to avoid importing pyspark when user 283 # run code not related to pyspark. 284 if "pyspark" in sys.modules: 285 from mlflow.utils.spark_utils import get_spark_dataframe_type 286 287 spark_df_type = get_spark_dataframe_type() 288 self._supported_dataframe_types = (pd.DataFrame, spark_df_type) 289 self._spark_df_type = spark_df_type 290 except ImportError: 291 pass 292 293 if feature_names is not None and len(set(feature_names)) < len(list(feature_names)): 294 raise MlflowException( 295 message="`feature_names` argument must be a list containing unique feature names.", 296 error_code=INVALID_PARAMETER_VALUE, 297 ) 298 299 if self._has_predictions: 300 _validate_dataset_type_supports_predictions( 301 data=data, 302 supported_predictions_dataset_types=self._supported_dataframe_types, 303 ) 304 305 has_targets = targets is not None 306 if has_targets: 307 self._has_targets = True 308 if isinstance(data, (np.ndarray, list)): 309 if has_targets and not isinstance(targets, (np.ndarray, list)): 310 raise MlflowException( 311 message="If data is a numpy array or list of evaluation features, " 312 "`targets` argument must be a numpy array or list of evaluation labels.", 313 error_code=INVALID_PARAMETER_VALUE, 314 ) 315 316 shape_message = ( 317 "If the `data` argument is a numpy array, it must be a 2-dimensional " 318 "array, with the second dimension representing the number of features. If the " 319 "`data` argument is a list, each of its elements must be a feature array of " 320 "the numpy array or list, and all elements must have the same length." 321 ) 322 323 if isinstance(data, list): 324 try: 325 data = np.array(data) 326 except ValueError as e: 327 raise MlflowException( 328 message=shape_message, error_code=INVALID_PARAMETER_VALUE 329 ) from e 330 331 if len(data.shape) != 2: 332 raise MlflowException( 333 message=shape_message, 334 error_code=INVALID_PARAMETER_VALUE, 335 ) 336 337 self._features_data = data 338 if has_targets: 339 self._labels_data = ( 340 targets if isinstance(targets, np.ndarray) else np.array(targets) 341 ) 342 343 if len(self._features_data) != len(self._labels_data): 344 raise MlflowException( 345 message="The input features example rows must be the same length " 346 "with labels array.", 347 error_code=INVALID_PARAMETER_VALUE, 348 ) 349 350 num_features = data.shape[1] 351 352 if feature_names is not None: 353 feature_names = list(feature_names) 354 if num_features != len(feature_names): 355 raise MlflowException( 356 message="feature name list must be the same length with feature data.", 357 error_code=INVALID_PARAMETER_VALUE, 358 ) 359 self._feature_names = feature_names 360 else: 361 self._feature_names = [ 362 f"feature_{str(i + 1).zfill(math.ceil(math.log10(num_features + 1)))}" 363 for i in range(num_features) 364 ] 365 elif isinstance(data, self._supported_dataframe_types): 366 if has_targets and not isinstance(targets, str): 367 raise MlflowException( 368 message="If data is a Pandas DataFrame or Spark DataFrame, `targets` argument " 369 "must be the name of the column which contains evaluation labels in the `data` " 370 "dataframe.", 371 error_code=INVALID_PARAMETER_VALUE, 372 ) 373 if self._spark_df_type and isinstance(data, self._spark_df_type): 374 if data.count() > EvaluationDataset.SPARK_DATAFRAME_LIMIT: 375 _logger.warning( 376 "Specified Spark DataFrame is too large for model evaluation. Only " 377 f"the first {EvaluationDataset.SPARK_DATAFRAME_LIMIT} rows will be used. " 378 "If you want evaluate on the whole spark dataframe, please manually call " 379 "`spark_dataframe.toPandas()`." 380 ) 381 data = data.limit(EvaluationDataset.SPARK_DATAFRAME_LIMIT).toPandas() 382 383 if has_targets: 384 self._labels_data = data[targets].to_numpy() 385 self._targets_name = targets 386 387 if self._has_predictions: 388 self._predictions_data = data[predictions].to_numpy() 389 self._predictions_name = predictions 390 391 if feature_names is not None: 392 self._features_data = data[list(feature_names)] 393 self._feature_names = feature_names 394 else: 395 features_data = data 396 397 if has_targets: 398 features_data = features_data.drop(targets, axis=1, inplace=False) 399 400 if self._has_predictions: 401 features_data = features_data.drop(predictions, axis=1, inplace=False) 402 403 self._features_data = features_data 404 self._feature_names = [ 405 generate_feature_name_if_not_string(c) for c in self._features_data.columns 406 ] 407 else: 408 raise MlflowException( 409 message="The data argument must be a numpy array, a list or a Pandas DataFrame, or " 410 "spark DataFrame if pyspark package installed.", 411 error_code=INVALID_PARAMETER_VALUE, 412 ) 413 414 # generate dataset hash 415 md5_gen = hashlib.md5(usedforsecurity=False) 416 _gen_md5_for_arraylike_obj(md5_gen, self._features_data) 417 if self._labels_data is not None: 418 _gen_md5_for_arraylike_obj(md5_gen, self._labels_data) 419 if self._predictions_data is not None: 420 _gen_md5_for_arraylike_obj(md5_gen, self._predictions_data) 421 md5_gen.update(",".join(list(map(str, self._feature_names))).encode("UTF-8")) 422 423 self._hash = md5_gen.hexdigest() 424 425 @property 426 def feature_names(self): 427 return self._feature_names 428 429 @property 430 def features_data(self): 431 """ 432 return features data as a numpy array or a pandas DataFrame. 433 """ 434 return self._features_data 435 436 @property 437 def labels_data(self): 438 """ 439 return labels data as a numpy array 440 """ 441 return self._labels_data 442 443 @property 444 def has_targets(self): 445 """ 446 Returns True if the dataset has targets, False otherwise. 447 """ 448 return self._has_targets 449 450 @property 451 def targets_name(self): 452 """ 453 return targets name 454 """ 455 return self._targets_name 456 457 @property 458 def predictions_data(self): 459 """ 460 return labels data as a numpy array 461 """ 462 return self._predictions_data 463 464 @property 465 def has_predictions(self): 466 """ 467 Returns True if the dataset has targets, False otherwise. 468 """ 469 return self._has_predictions 470 471 @property 472 def predictions_name(self): 473 """ 474 return predictions name 475 """ 476 return self._predictions_name 477 478 @property 479 def name(self): 480 """ 481 Dataset name, which is specified dataset name or the dataset hash if user don't specify 482 name. 483 """ 484 return self._user_specified_name if self._user_specified_name is not None else self.hash 485 486 @property 487 def path(self): 488 """ 489 Dataset path 490 """ 491 return self._path 492 493 @property 494 def hash(self): 495 """ 496 Dataset hash, includes hash on first 20 rows and last 20 rows. 497 """ 498 return self._hash 499 500 @property 501 def _metadata(self): 502 """ 503 Return dataset metadata containing name, hash, and optional path. 504 """ 505 metadata = { 506 "name": self.name, 507 "hash": self.hash, 508 } 509 if self.path is not None: 510 metadata["path"] = self.path 511 return metadata 512 513 @property 514 def digest(self): 515 """ 516 Return the digest of the dataset. 517 """ 518 return self._digest 519 520 def _log_dataset_tag(self, client, run_id, model_uuid): 521 """ 522 Log dataset metadata as a tag "mlflow.datasets", if the tag already exists, it will 523 append current dataset metadata into existing tag content. 524 """ 525 existing_dataset_metadata_str = client.get_run(run_id).data.tags.get( 526 "mlflow.datasets", "[]" 527 ) 528 dataset_metadata_list = json.loads(existing_dataset_metadata_str) 529 530 for metadata in dataset_metadata_list: 531 if ( 532 metadata["hash"] == self.hash 533 and metadata["name"] == self.name 534 and metadata["model"] == model_uuid 535 ): 536 break 537 else: 538 dataset_metadata_list.append({**self._metadata, "model": model_uuid}) 539 540 dataset_metadata_str = json.dumps(dataset_metadata_list, separators=(",", ":")) 541 client.log_batch( 542 run_id, 543 tags=[RunTag("mlflow.datasets", dataset_metadata_str)], 544 ) 545 546 def __hash__(self): 547 return hash(self.hash) 548 549 def __eq__(self, other): 550 if not isinstance(other, EvaluationDataset): 551 return False 552 553 if isinstance(self._features_data, np.ndarray): 554 is_features_data_equal = np.array_equal(self._features_data, other._features_data) 555 else: 556 is_features_data_equal = self._features_data.equals(other._features_data) 557 558 return ( 559 is_features_data_equal 560 and np.array_equal(self._labels_data, other._labels_data) 561 and self.name == other.name 562 and self.path == other.path 563 and self._feature_names == other._feature_names 564 )