default_evaluator.py
1 import copy 2 import inspect 3 import json 4 import logging 5 import pathlib 6 import pickle 7 import shutil 8 import tempfile 9 import traceback 10 from abc import abstractmethod 11 from typing import Any, Callable, NamedTuple, Optional 12 13 import numpy as np 14 import pandas as pd 15 16 import mlflow 17 from mlflow import MlflowClient, MlflowException 18 from mlflow.data.evaluation_dataset import EvaluationDataset 19 from mlflow.entities.metric import Metric 20 from mlflow.metrics.base import MetricValue 21 from mlflow.models.evaluation.artifacts import ( 22 CsvEvaluationArtifact, 23 ImageEvaluationArtifact, 24 JsonEvaluationArtifact, 25 NumpyEvaluationArtifact, 26 _infer_artifact_type_and_ext, 27 ) 28 from mlflow.models.evaluation.base import EvaluationMetric, EvaluationResult, ModelEvaluator 29 from mlflow.models.evaluation.utils.metric import MetricDefinition 30 from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE 31 from mlflow.pyfunc import _ServedPyFuncModel 32 from mlflow.utils.file_utils import TempDir 33 from mlflow.utils.proto_json_utils import NumpyEncoder 34 from mlflow.utils.time import get_current_time_millis 35 36 _logger = logging.getLogger(__name__) 37 38 _EVAL_TABLE_FILE_NAME = "eval_results_table.json" 39 _TOKEN_COUNT_METRIC_NAME = "token_count" 40 _LATENCY_METRIC_NAME = "latency" 41 42 43 def _extract_raw_model(model): 44 if not getattr(model, "metadata", None): 45 return None, None 46 47 model_loader_module = model.metadata.flavors["python_function"]["loader_module"] 48 # If we load a model with mlflow.pyfunc.load_model, the model will be wrapped 49 # with a pyfunc wrapper. We need to extract the raw model so that shap 50 # explainer uses the raw model instead of the wrapper and skips data schema validation. 51 if model_loader_module in [ 52 "mlflow.catboost", 53 "mlflow.sklearn", 54 "mlflow.xgboost", 55 ] and not isinstance(model, _ServedPyFuncModel): 56 if hasattr(model._model_impl, "get_raw_model"): 57 return model_loader_module, model._model_impl.get_raw_model() 58 return model_loader_module, model._model_impl 59 else: 60 return model_loader_module, None 61 62 63 def _extract_output_and_other_columns( 64 model_predictions: list[Any] | dict[str, Any] | pd.DataFrame | pd.Series, 65 output_column_name: str | None, 66 ) -> tuple[pd.Series, pd.DataFrame | None, str]: 67 y_pred = None 68 other_output_columns = None 69 ERROR_MISSING_OUTPUT_COLUMN_NAME = ( 70 "Output column name is not specified for the multi-output model. " 71 "Please set the correct output column name using the `predictions` parameter." 72 ) 73 74 if isinstance(model_predictions, list) and all(isinstance(p, dict) for p in model_predictions): 75 # Extract 'y_pred' and 'other_output_columns' from list of dictionaries 76 if output_column_name in model_predictions[0]: 77 y_pred = pd.Series( 78 [p.get(output_column_name) for p in model_predictions], name=output_column_name 79 ) 80 other_output_columns = pd.DataFrame([ 81 {k: v for k, v in p.items() if k != output_column_name} for p in model_predictions 82 ]) 83 elif len(model_predictions[0]) == 1: 84 # Set the only key as self.predictions and its value as self.y_pred 85 key, value = list(model_predictions[0].items())[0] 86 y_pred = pd.Series(value, name=key) 87 output_column_name = key 88 elif output_column_name is None: 89 raise MlflowException( 90 ERROR_MISSING_OUTPUT_COLUMN_NAME, 91 error_code=INVALID_PARAMETER_VALUE, 92 ) 93 else: 94 raise MlflowException( 95 f"Output column name '{output_column_name}' is not found in the model " 96 f"predictions list: {model_predictions}. Please set the correct output column " 97 "name using the `predictions` parameter.", 98 error_code=INVALID_PARAMETER_VALUE, 99 ) 100 elif isinstance(model_predictions, pd.DataFrame): 101 if output_column_name in model_predictions.columns: 102 y_pred = model_predictions[output_column_name] 103 other_output_columns = model_predictions.drop(columns=output_column_name) 104 elif len(model_predictions.columns) == 1: 105 output_column_name = model_predictions.columns[0] 106 y_pred = model_predictions[output_column_name] 107 elif output_column_name is None: 108 raise MlflowException( 109 ERROR_MISSING_OUTPUT_COLUMN_NAME, 110 error_code=INVALID_PARAMETER_VALUE, 111 ) 112 else: 113 raise MlflowException( 114 f"Output column name '{output_column_name}' is not found in the model " 115 f"predictions dataframe {model_predictions.columns}. Please set the correct " 116 "output column name using the `predictions` parameter.", 117 error_code=INVALID_PARAMETER_VALUE, 118 ) 119 elif isinstance(model_predictions, dict): 120 if output_column_name in model_predictions: 121 y_pred = pd.Series(model_predictions[output_column_name], name=output_column_name) 122 other_output_columns = pd.DataFrame({ 123 k: v for k, v in model_predictions.items() if k != output_column_name 124 }) 125 elif len(model_predictions) == 1: 126 key, value = list(model_predictions.items())[0] 127 y_pred = pd.Series(value, name=key) 128 output_column_name = key 129 elif output_column_name is None: 130 raise MlflowException( 131 ERROR_MISSING_OUTPUT_COLUMN_NAME, 132 error_code=INVALID_PARAMETER_VALUE, 133 ) 134 else: 135 raise MlflowException( 136 f"Output column name '{output_column_name}' is not found in the " 137 f"model predictions dict {model_predictions}. Please set the correct " 138 "output column name using the `predictions` parameter.", 139 error_code=INVALID_PARAMETER_VALUE, 140 ) 141 142 return ( 143 y_pred if y_pred is not None else model_predictions, 144 other_output_columns, 145 output_column_name, 146 ) 147 148 149 def _extract_predict_fn(model: Any) -> Callable[..., Any] | None: 150 """ 151 Extracts the predict function from the given model or raw_model. 152 153 Precedence order: 154 1. If raw_model is specified, its predict function is used. 155 2. If model is specified, its predict function is used. 156 3. If none of the above, predict function is None. 157 158 Args: 159 model: A model object that has a predict method. 160 raw_model: A raw model object that has a predict method. 161 162 Returns: The predict function. 163 """ 164 _, raw_model = _extract_raw_model(model) 165 predict_fn = None 166 167 if raw_model is not None: 168 predict_fn = raw_model.predict 169 try: 170 from mlflow.xgboost import _wrapped_xgboost_model_predict_fn 171 172 # Because shap evaluation will pass evaluation data in ndarray format 173 # (without feature names), if set validate_features=True it will raise error. 174 predict_fn = _wrapped_xgboost_model_predict_fn(raw_model, validate_features=False) 175 except ImportError: 176 pass 177 178 elif model is not None: 179 predict_fn = model.predict 180 181 return predict_fn 182 183 184 def _get_dataframe_with_renamed_columns(x, new_column_names): 185 """ 186 Downstream inference functions may expect a pd.DataFrame to be created from x. However, 187 if x is already a pd.DataFrame, and new_column_names != x.columns, we cannot simply call 188 pd.DataFrame(x, columns=new_column_names) because the resulting pd.DataFrame will contain 189 NaNs for every column in new_column_names that does not exist in x.columns. This function 190 instead creates a new pd.DataFrame object from x, and then explicitly renames the columns 191 to avoid NaNs. 192 193 Args: 194 x: A data object, such as a Pandas DataFrame, numPy array, or list 195 new_column_names: Column names for the output Pandas DataFrame 196 197 Returns: 198 A pd.DataFrame with x as data, with columns new_column_names 199 """ 200 df = pd.DataFrame(x) 201 return df.rename(columns=dict(zip(df.columns, new_column_names))) 202 203 204 def _get_aggregate_metrics_values(metrics): 205 return {name: MetricValue(aggregate_results={name: value}) for name, value in metrics.items()} 206 207 208 _matplotlib_config = { 209 "figure.dpi": 175, 210 "figure.figsize": [6.0, 4.0], 211 "figure.autolayout": True, 212 "font.size": 8, 213 } 214 215 216 class _CustomArtifact(NamedTuple): 217 """ 218 A namedtuple representing a custom artifact function and its properties. 219 220 function : the custom artifact function 221 name : the name of the custom artifact function 222 index : the index of the function in the ``custom_artifacts`` argument of mlflow.evaluate 223 artifacts_dir : the path to a temporary directory to store produced artifacts of the function 224 """ 225 226 function: Callable[..., Any] 227 name: str 228 index: int 229 artifacts_dir: str 230 231 232 def _is_valid_artifacts(artifacts): 233 return isinstance(artifacts, dict) and all(isinstance(k, str) for k in artifacts.keys()) 234 235 236 def _evaluate_custom_artifacts(custom_artifact_tuple, eval_df, builtin_metrics): 237 """ 238 This function calls the `custom_artifact` function and performs validations on the returned 239 result to ensure that they are in the expected format. It will raise a MlflowException if 240 the result is not in the expected format. 241 242 Args: 243 custom_artifact_tuple: Containing a user provided function and its index in the 244 ``custom_artifacts`` parameter of ``mlflow.evaluate`` 245 eval_df: A Pandas dataframe object containing a prediction and a target column. 246 builtin_metrics: A dictionary of metrics produced by the default evaluator. 247 248 Returns: 249 A dictionary of artifacts. 250 """ 251 exception_header = ( 252 f"Custom artifact function '{custom_artifact_tuple.name}' " 253 " at index {custom_artifact_tuple.index}" 254 " in the `custom_artifacts` parameter" 255 ) 256 artifacts = custom_artifact_tuple.function( 257 eval_df, builtin_metrics, custom_artifact_tuple.artifacts_dir 258 ) 259 260 if artifacts is None: 261 _logger.warning(f"{exception_header} returned None.") 262 return 263 264 if not _is_valid_artifacts(artifacts): 265 _logger.warning( 266 f"{exception_header} did not return artifacts as a dictionary of string artifact " 267 "names with their corresponding objects." 268 ) 269 return 270 271 return artifacts 272 273 274 # TODO: Move this to the /evaluators directory 275 class BuiltInEvaluator(ModelEvaluator): 276 """ 277 The base class for all evaluators that are built-in to MLflow. 278 279 Each evaluator is responsible for implementing the `_evaluate()` method, which is called by 280 the `evaluate()` method of this base class. This class contains many helper methods used 281 across built-in evaluators, such as logging metrics, artifacts, and ordering metrics. 282 """ 283 284 def __init__(self): 285 self.client = MlflowClient() 286 287 @abstractmethod 288 def _evaluate( 289 self, 290 model: Optional["mlflow.pyfunc.PyFuncModel"], 291 extra_metrics: list[EvaluationMetric], 292 custom_artifacts=None, 293 **kwargs, 294 ) -> EvaluationResult | None: 295 """Implement the evaluation logic for each evaluator.""" 296 297 def log_metrics(self): 298 """ 299 Helper method to log metrics into specified run. 300 """ 301 self._add_prefix_to_metrics() 302 303 timestamp = get_current_time_millis() 304 self.client.log_batch( 305 self.run_id, 306 metrics=[ 307 Metric( 308 key=key, 309 value=value, 310 timestamp=timestamp, 311 step=0, 312 model_id=self.model_id, 313 dataset_name=self.dataset.name, 314 dataset_digest=self.dataset.digest, 315 run_id=self.run_id, 316 ) 317 for key, value in self.aggregate_metrics.items() 318 ], 319 ) 320 321 def _log_image_artifact( 322 self, 323 do_plot, 324 artifact_name, 325 ): 326 from matplotlib import pyplot 327 328 prefix = self.evaluator_config.get("metric_prefix", "") 329 artifact_file_name = f"{prefix}{artifact_name}.png" 330 artifact_file_local_path = self.temp_dir.path(artifact_file_name) 331 332 try: 333 pyplot.clf() 334 do_plot() 335 pyplot.savefig(artifact_file_local_path, bbox_inches="tight") 336 except Exception as e: 337 _logger.warning(f"Failed to log image artifact {artifact_name!r}: {e!r}") 338 else: 339 mlflow.log_artifact(artifact_file_local_path) 340 artifact = ImageEvaluationArtifact(uri=mlflow.get_artifact_uri(artifact_file_name)) 341 artifact._load(artifact_file_local_path) 342 self.artifacts[artifact_name] = artifact 343 finally: 344 pyplot.close(pyplot.gcf()) 345 346 def _evaluate_sklearn_model_score_if_scorable(self, model, y_true, sample_weights): 347 model_loader_module, raw_model = _extract_raw_model(model) 348 if model_loader_module == "mlflow.sklearn" and raw_model is not None: 349 try: 350 score = raw_model.score( 351 self.X.copy_to_avoid_mutation(), y_true, sample_weight=sample_weights 352 ) 353 self.metrics_values.update(_get_aggregate_metrics_values({"score": score})) 354 except Exception as e: 355 _logger.warning( 356 f"Computing sklearn model score failed: {e!r}. Set logging level to " 357 "DEBUG to see the full traceback." 358 ) 359 _logger.debug("", exc_info=True) 360 361 def _log_custom_metric_artifact(self, artifact_name, raw_artifact, custom_metric_tuple): 362 """ 363 This function logs and returns a custom metric artifact. Two cases: 364 - The provided artifact is a path to a file, the function will make a copy of it with 365 a formatted name in a temporary directory and call mlflow.log_artifact. 366 - Otherwise: will attempt to save the artifact to an temporary path with an inferred 367 type. Then call mlflow.log_artifact. 368 369 Args: 370 artifact_name: the name of the artifact 371 raw_artifact: the object representing the artifact 372 custom_metric_tuple: an instance of the _CustomMetric namedtuple 373 374 Returns: 375 EvaluationArtifact 376 """ 377 378 exception_and_warning_header = ( 379 f"Custom artifact function '{custom_metric_tuple.name}' at index " 380 f"{custom_metric_tuple.index} in the `custom_artifacts` parameter" 381 ) 382 383 inferred_from_path, inferred_type, inferred_ext = _infer_artifact_type_and_ext( 384 artifact_name, raw_artifact, custom_metric_tuple 385 ) 386 artifact_file_local_path = self.temp_dir.path(artifact_name + inferred_ext) 387 388 if pathlib.Path(artifact_file_local_path).exists(): 389 raise MlflowException( 390 f"{exception_and_warning_header} produced an artifact '{artifact_name}' that " 391 "cannot be logged because there already exists an artifact with the same name." 392 ) 393 394 # ParquetEvaluationArtifact isn't explicitly stated here because such artifacts can only 395 # be supplied through file. Which is handled by the first if clause. This is because 396 # DataFrame objects default to be stored as CsvEvaluationArtifact. 397 if inferred_from_path: 398 shutil.copy2(raw_artifact, artifact_file_local_path) 399 elif inferred_type is JsonEvaluationArtifact: 400 with open(artifact_file_local_path, "w") as f: 401 if isinstance(raw_artifact, str): 402 f.write(raw_artifact) 403 else: 404 json.dump(raw_artifact, f, cls=NumpyEncoder) 405 elif inferred_type is CsvEvaluationArtifact: 406 raw_artifact.to_csv(artifact_file_local_path, index=False) 407 elif inferred_type is NumpyEvaluationArtifact: 408 np.save(artifact_file_local_path, raw_artifact, allow_pickle=False) 409 elif inferred_type is ImageEvaluationArtifact: 410 raw_artifact.savefig(artifact_file_local_path) 411 else: 412 # storing as pickle 413 try: 414 with open(artifact_file_local_path, "wb") as f: 415 pickle.dump(raw_artifact, f) 416 _logger.warning( 417 f"{exception_and_warning_header} produced an artifact '{artifact_name}'" 418 f" with type '{type(raw_artifact)}' that is logged as a pickle artifact." 419 ) 420 except pickle.PickleError: 421 raise MlflowException( 422 f"{exception_and_warning_header} produced an unsupported artifact " 423 f"'{artifact_name}' with type '{type(raw_artifact)}' that cannot be pickled. " 424 "Supported object types for artifacts are:\n" 425 "- A string uri representing the file path to the artifact. MLflow" 426 " will infer the type of the artifact based on the file extension.\n" 427 "- A string representation of a JSON object. This will be saved as a " 428 ".json artifact.\n" 429 "- Pandas DataFrame. This will be saved as a .csv artifact." 430 "- Numpy array. This will be saved as a .npy artifact." 431 "- Matplotlib Figure. This will be saved as an .png image artifact." 432 "- Other objects will be attempted to be pickled with default protocol." 433 ) 434 435 mlflow.log_artifact(artifact_file_local_path) 436 artifact = inferred_type(uri=mlflow.get_artifact_uri(artifact_name + inferred_ext)) 437 artifact._load(artifact_file_local_path) 438 return artifact 439 440 def _get_column_in_metrics_values(self, column): 441 for metric_name, metric_value in self.metrics_values.items(): 442 if metric_name.split("/")[0] == column: 443 return metric_value 444 445 def _get_args_for_metrics( 446 self, 447 metric: MetricDefinition, 448 eval_df: pd.DataFrame, 449 input_df: pd.DataFrame, 450 other_output_df: pd.DataFrame | None, 451 ) -> tuple[bool, list[str | pd.DataFrame]]: 452 """ 453 Given a metric_tuple, read the signature of the metric function and get the appropriate 454 arguments from the input/output columns, other calculated metrics, and evaluator_config. 455 456 Args: 457 metric: The metric definition containing a user provided function and its index 458 in the ``extra_metrics`` parameter of ``mlflow.evaluate``. 459 eval_df: The evaluation dataframe containing the prediction and target columns. 460 input_df: The input dataframe containing the features used to make predictions. 461 other_output_df: A dataframe containing all model output columns but the predictions. 462 463 Returns: 464 tuple: A tuple of (bool, list) where the bool indicates if the given metric can 465 be calculated with the given eval_df, metrics, and input_df. 466 - If the user is missing "targets" or "predictions" parameters when needed, or we 467 cannot find a column or metric for a parameter to the metric, return 468 (False, list of missing parameters) 469 - If all arguments to the metric function were found, return 470 (True, list of arguments). 471 """ 472 # deepcopying eval_df and builtin_metrics for each custom metric function call, 473 # in case the user modifies them inside their function(s). 474 eval_df_copy = eval_df.copy() 475 parameters = inspect.signature(metric.function).parameters 476 eval_fn_args = [] 477 params_not_found = [] 478 if len(parameters) == 2: 479 param_0_name, param_1_name = parameters.keys() 480 481 # eval_fn has parameters (eval_df, builtin_metrics) for backwards compatibility 482 if len(parameters) == 2 and param_0_name != "predictions" and param_1_name != "targets": 483 eval_fn_args.append(eval_df_copy) 484 self._update_aggregate_metrics() 485 eval_fn_args.append(copy.deepcopy(self.aggregate_metrics)) 486 # eval_fn can have parameters like (predictions, targets, metrics, random_col) 487 else: 488 for param_name, param in parameters.items(): 489 column = self.col_mapping.get(param_name, param_name) 490 491 if column in ("predictions", self.predictions, self.dataset.predictions_name): 492 eval_fn_args.append(eval_df_copy["prediction"]) 493 elif column in ("targets", self.dataset.targets_name): 494 if "target" in eval_df_copy: 495 eval_fn_args.append(eval_df_copy["target"]) 496 else: 497 if param.default == inspect.Parameter.empty: 498 params_not_found.append(param_name) 499 else: 500 eval_fn_args.append(param.default) 501 elif column == "metrics": 502 eval_fn_args.append(copy.deepcopy(self.metrics_values)) 503 else: 504 # case when column passed in col_mapping contains the entire column 505 if not isinstance(column, str): 506 eval_fn_args.append(column) 507 508 # case column in col_mapping is string and the column value 509 # is part of the input_df 510 elif column in input_df.columns: 511 eval_fn_args.append(input_df[column]) 512 513 # case column in col_mapping is string and the column value 514 # is part of the output_df(other than predictions) 515 elif other_output_df is not None and column in other_output_df.columns: 516 self.other_output_columns_for_eval.add(column) 517 eval_fn_args.append(other_output_df[column]) 518 519 # case where the param is defined as part of the evaluator_config 520 elif column in self.evaluator_config: 521 eval_fn_args.append(self.evaluator_config.get(column)) 522 523 # case where this is the name of another metric 524 elif metric_value := self._get_column_in_metrics_values(column): 525 eval_fn_args.append(metric_value) 526 527 # in the case that: 528 # the metric has not been calculated yet, but is scheduled to be calculated 529 # "before" this metric in self.ordered_metrics, we append None to indicate 530 # that there is not an error in the dependencies 531 elif column in [metric_tuple.name for metric_tuple in self.ordered_metrics]: 532 eval_fn_args.append(None) 533 534 elif param.default == inspect.Parameter.empty: 535 params_not_found.append(param_name) 536 else: 537 eval_fn_args.append(param.default) 538 539 if len(params_not_found) > 0: 540 return False, params_not_found 541 return True, eval_fn_args 542 543 def evaluate_and_log_custom_artifacts( 544 self, 545 custom_artifacts: list[_CustomArtifact], 546 prediction: pd.Series, 547 target: np.ndarray | None = None, 548 ): 549 """Evaluate custom artifacts provided by users.""" 550 if not custom_artifacts: 551 return 552 553 eval_df = self._get_eval_df(prediction, target) 554 for index, custom_artifact in enumerate(custom_artifacts): 555 with tempfile.TemporaryDirectory() as artifacts_dir: 556 # deepcopying eval_df and builtin_metrics for each custom artifact function call, 557 # in case the user modifies them inside their function(s). 558 custom_artifact_tuple = _CustomArtifact( 559 function=custom_artifact, 560 index=index, 561 name=getattr(custom_artifact, "__name__", repr(custom_artifact)), 562 artifacts_dir=artifacts_dir, 563 ) 564 artifact_results = _evaluate_custom_artifacts( 565 custom_artifact_tuple, 566 eval_df.copy(), 567 copy.deepcopy(self.metrics_values), 568 ) 569 if artifact_results: 570 for artifact_name, raw_artifact in artifact_results.items(): 571 self.artifacts[artifact_name] = self._log_custom_metric_artifact( 572 artifact_name, 573 raw_artifact, 574 custom_artifact_tuple, 575 ) 576 577 def _get_error_message_missing_columns(self, metric_name, param_names): 578 error_message_parts = [f"Metric '{metric_name}' requires the following:"] 579 580 special_params = ["targets", "predictions"] 581 error_message_parts.extend( 582 f" - the '{param}' parameter needs to be specified" 583 for param in special_params 584 if param in param_names 585 ) 586 587 if remaining_params := [param for param in param_names if param not in special_params]: 588 error_message_parts.append( 589 f" - missing columns {remaining_params} need to be defined or mapped" 590 ) 591 592 return "\n".join(error_message_parts) 593 594 def _construct_error_message_for_malformed_metrics( 595 self, malformed_results, input_columns, output_columns 596 ): 597 error_messages = [ 598 self._get_error_message_missing_columns(metric_name, param_names) 599 for metric_name, param_names in malformed_results 600 ] 601 joined_error_message = "\n".join(error_messages) 602 603 full_message = f"""Error: Metric calculation failed for the following metrics: 604 {joined_error_message} 605 606 Below are the existing column names for the input/output data: 607 Input Columns: {input_columns} 608 Output Columns: {output_columns} 609 610 To resolve this issue, you may need to: 611 - specify any required parameters 612 - if you are missing columns, check that there are no circular dependencies among your 613 metrics, and you may want to map them to an existing column using the following 614 configuration: 615 evaluator_config={{'col_mapping': {{<missing column name>: <existing column name>}}}}""" 616 617 return "\n".join(l.lstrip() for l in full_message.splitlines()) 618 619 def _raise_exception_for_malformed_metrics(self, malformed_results, eval_df, other_output_df): 620 output_columns = [] if other_output_df is None else list(other_output_df.columns) 621 if self.predictions: 622 output_columns.append(self.predictions) 623 elif self.dataset.predictions_name: 624 output_columns.append(self.dataset.predictions_name) 625 else: 626 output_columns.append("predictions") 627 628 input_columns = list(self.X.copy_to_avoid_mutation().columns) 629 if "target" in eval_df: 630 if self.dataset.targets_name: 631 input_columns.append(self.dataset.targets_name) 632 else: 633 input_columns.append("targets") 634 635 error_message = self._construct_error_message_for_malformed_metrics( 636 malformed_results, input_columns, output_columns 637 ) 638 639 raise MlflowException(error_message, error_code=INVALID_PARAMETER_VALUE) 640 641 def _get_eval_df(self, prediction: pd.Series, target: np.ndarray | None = None): 642 """ 643 Create a DataFrame with "prediction" and "target" columns. 644 645 This is a standard format that can be passed to the metric functions. 646 """ 647 eval_df = pd.DataFrame({"prediction": copy.deepcopy(prediction)}) 648 if target is not None: 649 eval_df["target"] = target 650 return eval_df 651 652 def _order_metrics( 653 self, 654 metrics: list[EvaluationMetric], 655 eval_df: pd.DataFrame, 656 other_output_df: pd.DataFrame | None, 657 ): 658 """ 659 Order the list metrics so they can be computed in sequence. 660 661 Some metrics might use the results of other metrics to compute their own results. This 662 function iteratively resolve this dependency, by checking if each metric can be computed 663 with the current available columns and metrics values. 664 """ 665 remaining_metrics = metrics 666 input_df = self.X.copy_to_avoid_mutation() 667 668 while len(remaining_metrics) > 0: 669 pending_metrics = [] 670 failed_results = [] 671 did_append_metric = False 672 for metric_tuple in remaining_metrics: 673 can_calculate, eval_fn_args = self._get_args_for_metrics( 674 metric_tuple, eval_df, input_df, other_output_df 675 ) 676 if can_calculate: 677 self.ordered_metrics.append(metric_tuple) 678 did_append_metric = True 679 else: # cannot calculate the metric yet 680 pending_metrics.append(metric_tuple) 681 failed_results.append((metric_tuple.name, eval_fn_args)) 682 683 # cant calculate any more metrics 684 if not did_append_metric: 685 self._raise_exception_for_malformed_metrics( 686 failed_results, eval_df, other_output_df 687 ) 688 689 remaining_metrics = pending_metrics 690 691 return self.ordered_metrics 692 693 def _test_first_row( 694 self, 695 metrics: list[MetricDefinition], 696 eval_df: pd.DataFrame, 697 other_output_df: pd.DataFrame | None, 698 ): 699 # test calculations on first row of eval_df 700 _logger.info("Testing metrics on first row...") 701 exceptions = [] 702 first_row_df = eval_df.iloc[[0]] 703 first_row_input_df = self.X.copy_to_avoid_mutation().iloc[[0]] 704 for metric in metrics: 705 try: 706 _, eval_fn_args = self._get_args_for_metrics( 707 metric, first_row_df, first_row_input_df, other_output_df 708 ) 709 if metric_value := metric.evaluate(eval_fn_args): 710 name = f"{metric.name}/{metric.version}" if metric.version else metric.name 711 self.metrics_values.update({name: metric_value}) 712 except Exception as e: 713 stacktrace_str = traceback.format_exc() 714 if isinstance(e, MlflowException): 715 exceptions.append( 716 f"Metric '{metric.name}': Error:\n{e.message}\n{stacktrace_str}" 717 ) 718 else: 719 exceptions.append(f"Metric '{metric.name}': Error:\n{e!r}\n{stacktrace_str}") 720 721 if len(exceptions) > 0: 722 raise MlflowException("\n".join(exceptions)) 723 724 def evaluate_metrics( 725 self, 726 metrics: list[EvaluationMetric], 727 prediction: pd.Series, 728 target: np.ndarray | None = None, 729 other_output_df: pd.DataFrame | None = None, 730 ): 731 """ 732 Evaluate the metrics on the given prediction and target data. 733 734 Args: 735 metrics: A list of metrics to evaluate. 736 prediction: A Pandas Series containing the predictions. 737 target: A numpy array containing the target values. 738 other_output_df: A Pandas DataFrame containing other output columns from the model. 739 740 Returns: 741 None, the metrics values are recorded in the self.metrics_values dictionary. 742 """ 743 744 eval_df = self._get_eval_df(prediction, target) 745 metrics = [ 746 MetricDefinition.from_index_and_metric(i, metric) for i, metric in enumerate(metrics) 747 ] 748 metrics = self._order_metrics(metrics, eval_df, other_output_df) 749 750 self._test_first_row(metrics, eval_df, other_output_df) 751 752 # calculate metrics for the full eval_df 753 input_df = self.X.copy_to_avoid_mutation() 754 for metric in metrics: 755 _, eval_fn_args = self._get_args_for_metrics(metric, eval_df, input_df, other_output_df) 756 if metric_value := metric.evaluate(eval_fn_args): 757 name = f"{metric.name}/{metric.version}" if metric.version else metric.name 758 self.metrics_values.update({name: metric_value}) 759 760 def log_eval_table(self, y_pred, other_output_columns=None): 761 # only log eval table if there are per row metrics recorded 762 if not any( 763 metric_value.scores is not None or metric_value.justifications is not None 764 for _, metric_value in self.metrics_values.items() 765 ): 766 return 767 768 metric_prefix = self.evaluator_config.get("metric_prefix", "") 769 if not isinstance(metric_prefix, str): 770 metric_prefix = "" 771 if isinstance(self.dataset.features_data, pd.DataFrame): 772 # Handle DataFrame case 773 if self.dataset.has_targets: 774 data = self.dataset.features_data.assign(**{ 775 self.dataset.targets_name or "target": self.dataset.labels_data, 776 self.dataset.predictions_name or self.predictions or "outputs": y_pred, 777 }) 778 else: 779 data = self.dataset.features_data.assign(outputs=y_pred) 780 else: 781 # Handle NumPy array case, converting it to a DataFrame 782 data = pd.DataFrame(self.dataset.features_data, columns=self.dataset.feature_names) 783 if self.dataset.has_targets: 784 data = data.assign(**{ 785 self.dataset.targets_name or "target": self.dataset.labels_data, 786 self.dataset.predictions_name or self.predictions or "outputs": y_pred, 787 }) 788 else: 789 data = data.assign(outputs=y_pred) 790 791 # Include other_output_columns used in evaluation to the eval table 792 if other_output_columns is not None and len(self.other_output_columns_for_eval) > 0: 793 for column in self.other_output_columns_for_eval: 794 data[column] = other_output_columns[column] 795 796 columns = {} 797 for metric_name, metric_value in self.metrics_values.items(): 798 scores = metric_value.scores 799 justifications = metric_value.justifications 800 801 if scores: 802 if metric_name.startswith(metric_prefix) and metric_name[len(metric_prefix) :] in [ 803 _TOKEN_COUNT_METRIC_NAME, 804 _LATENCY_METRIC_NAME, 805 ]: 806 columns[metric_name] = scores 807 else: 808 columns[f"{metric_name}/score"] = scores 809 if justifications: 810 columns[f"{metric_name}/justification"] = justifications 811 data = data.assign(**columns) 812 artifact_file_name = f"{metric_prefix}{_EVAL_TABLE_FILE_NAME}" 813 mlflow.log_table(data, artifact_file=artifact_file_name) 814 if self.eval_results_path: 815 eval_table_spark = self.spark_session.createDataFrame(data) 816 try: 817 eval_table_spark.write.mode(self.eval_results_mode).option( 818 "mergeSchema", "true" 819 ).format("delta").saveAsTable(self.eval_results_path) 820 except Exception as e: 821 _logger.info(f"Saving eval table to delta table failed. Reason: {e}") 822 823 name = _EVAL_TABLE_FILE_NAME.split(".", 1)[0] 824 self.artifacts[name] = JsonEvaluationArtifact( 825 uri=mlflow.get_artifact_uri(artifact_file_name) 826 ) 827 828 def _update_aggregate_metrics(self): 829 self.aggregate_metrics = {} 830 for metric_name, metric_value in self.metrics_values.items(): 831 if metric_value.aggregate_results: 832 for agg_name, agg_value in metric_value.aggregate_results.items(): 833 if agg_value is not None: 834 if agg_name == metric_name.split("/")[0]: 835 self.aggregate_metrics[metric_name] = agg_value 836 else: 837 self.aggregate_metrics[f"{metric_name}/{agg_name}"] = agg_value 838 839 def _add_prefix_to_metrics(self): 840 def _prefix_value(value): 841 aggregate = ( 842 {f"{prefix}{k}": v for k, v in value.aggregate_results.items()} 843 if value.aggregate_results 844 else None 845 ) 846 return MetricValue(value.scores, value.justifications, aggregate) 847 848 if prefix := self.evaluator_config.get("metric_prefix"): 849 self.metrics_values = { 850 f"{prefix}{k}": _prefix_value(v) for k, v in self.metrics_values.items() 851 } 852 853 self._update_aggregate_metrics() 854 855 def evaluate( 856 self, 857 *, 858 model_type, 859 dataset, 860 run_id, 861 evaluator_config, 862 model: "mlflow.pyfunc.PyFuncModel" = None, 863 extra_metrics=None, 864 custom_artifacts=None, 865 predictions=None, 866 model_id=None, 867 **kwargs, 868 ) -> EvaluationResult: 869 if model is None and predictions is None and dataset.predictions_data is None: 870 raise MlflowException( 871 message=( 872 "Either a model or set of predictions must be specified in order to use the" 873 " default evaluator. Either specify the `model` parameter, the `predictions`" 874 " parameter, an MLflow dataset containing the `predictions` column name" 875 " (via the `data` parameter), or a different evaluator (via the `evaluators`" 876 " parameter)." 877 ), 878 error_code=INVALID_PARAMETER_VALUE, 879 ) 880 881 self.artifacts = {} 882 self.aggregate_metrics = {} 883 self.metrics_values = {} 884 self.ordered_metrics = [] 885 self.other_output_columns_for_eval = set() 886 887 self.dataset: EvaluationDataset = dataset 888 self.run_id = run_id 889 self.model_type = model_type 890 self.model_id = model_id 891 self.evaluator_config = evaluator_config 892 893 self.predictions = predictions 894 self.col_mapping = self.evaluator_config.get("col_mapping", {}) 895 self.eval_results_path = self.evaluator_config.get("eval_results_path") 896 self.eval_results_mode = self.evaluator_config.get("eval_results_mode", "overwrite") 897 898 if self.eval_results_path: 899 from mlflow.utils._spark_utils import _get_active_spark_session 900 901 self.spark_session = _get_active_spark_session() 902 if not self.spark_session: 903 raise MlflowException( 904 message="eval_results_path is only supported in Spark environment. ", 905 error_code=INVALID_PARAMETER_VALUE, 906 ) 907 908 if self.eval_results_mode not in ["overwrite", "append"]: 909 raise MlflowException( 910 message="eval_results_mode can only be 'overwrite' or 'append'. ", 911 error_code=INVALID_PARAMETER_VALUE, 912 ) 913 914 if extra_metrics is None: 915 extra_metrics = [] 916 917 bad_metrics = [ 918 metric for metric in extra_metrics if not isinstance(metric, EvaluationMetric) 919 ] 920 if len(bad_metrics) > 0: 921 message = "\n".join([ 922 f"- Metric '{m}' has type '{type(m).__name__}'" for m in bad_metrics 923 ]) 924 raise MlflowException( 925 f"In the 'extra_metrics' parameter, the following metrics have the wrong type:\n" 926 f"{message}\n" 927 f"Please ensure that all extra metrics are instances of " 928 f"mlflow.metrics.EvaluationMetric." 929 ) 930 931 import matplotlib 932 933 with TempDir() as temp_dir, matplotlib.rc_context(_matplotlib_config): 934 self.temp_dir = temp_dir 935 return self._evaluate(model, extra_metrics, custom_artifacts) 936 937 @property 938 def X(self) -> pd.DataFrame: 939 """ 940 The features (`X`) portion of the dataset, guarded against accidental mutations. 941 """ 942 return BuiltInEvaluator._MutationGuardedData( 943 _get_dataframe_with_renamed_columns( 944 self.dataset.features_data, self.dataset.feature_names 945 ) 946 ) 947 948 class _MutationGuardedData: 949 """ 950 Wrapper around a data object that requires explicit API calls to obtain either a copy 951 of the data object, or, in cases where the caller can guaranteed that the object will not 952 be mutated, the original data object. 953 """ 954 955 def __init__(self, data): 956 """ 957 Args: 958 data: A data object, such as a Pandas DataFrame, numPy array, or list. 959 """ 960 self._data = data 961 962 def copy_to_avoid_mutation(self): 963 """ 964 Obtain a copy of the data. This method should be called every time the data needs 965 to be used in a context where it may be subsequently mutated, guarding against 966 accidental reuse after mutation. 967 968 Returns: 969 A copy of the data object. 970 """ 971 if isinstance(self._data, pd.DataFrame): 972 return self._data.copy(deep=True) 973 else: 974 return copy.deepcopy(self._data) 975 976 def get_original(self): 977 """ 978 Obtain the original data object. This method should only be called if the caller 979 can guarantee that it will not mutate the data during subsequent operations. 980 981 Returns: 982 The original data object. 983 """ 984 return self._data