/ mlflow / models / evaluation / default_evaluator.py
default_evaluator.py
  1  import copy
  2  import inspect
  3  import json
  4  import logging
  5  import pathlib
  6  import pickle
  7  import shutil
  8  import tempfile
  9  import traceback
 10  from abc import abstractmethod
 11  from typing import Any, Callable, NamedTuple, Optional
 12  
 13  import numpy as np
 14  import pandas as pd
 15  
 16  import mlflow
 17  from mlflow import MlflowClient, MlflowException
 18  from mlflow.data.evaluation_dataset import EvaluationDataset
 19  from mlflow.entities.metric import Metric
 20  from mlflow.metrics.base import MetricValue
 21  from mlflow.models.evaluation.artifacts import (
 22      CsvEvaluationArtifact,
 23      ImageEvaluationArtifact,
 24      JsonEvaluationArtifact,
 25      NumpyEvaluationArtifact,
 26      _infer_artifact_type_and_ext,
 27  )
 28  from mlflow.models.evaluation.base import EvaluationMetric, EvaluationResult, ModelEvaluator
 29  from mlflow.models.evaluation.utils.metric import MetricDefinition
 30  from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
 31  from mlflow.pyfunc import _ServedPyFuncModel
 32  from mlflow.utils.file_utils import TempDir
 33  from mlflow.utils.proto_json_utils import NumpyEncoder
 34  from mlflow.utils.time import get_current_time_millis
 35  
 36  _logger = logging.getLogger(__name__)
 37  
 38  _EVAL_TABLE_FILE_NAME = "eval_results_table.json"
 39  _TOKEN_COUNT_METRIC_NAME = "token_count"
 40  _LATENCY_METRIC_NAME = "latency"
 41  
 42  
 43  def _extract_raw_model(model):
 44      if not getattr(model, "metadata", None):
 45          return None, None
 46  
 47      model_loader_module = model.metadata.flavors["python_function"]["loader_module"]
 48      # If we load a model with mlflow.pyfunc.load_model, the model will be wrapped
 49      # with a pyfunc wrapper. We need to extract the raw model so that shap
 50      # explainer uses the raw model instead of the wrapper and skips data schema validation.
 51      if model_loader_module in [
 52          "mlflow.catboost",
 53          "mlflow.sklearn",
 54          "mlflow.xgboost",
 55      ] and not isinstance(model, _ServedPyFuncModel):
 56          if hasattr(model._model_impl, "get_raw_model"):
 57              return model_loader_module, model._model_impl.get_raw_model()
 58          return model_loader_module, model._model_impl
 59      else:
 60          return model_loader_module, None
 61  
 62  
 63  def _extract_output_and_other_columns(
 64      model_predictions: list[Any] | dict[str, Any] | pd.DataFrame | pd.Series,
 65      output_column_name: str | None,
 66  ) -> tuple[pd.Series, pd.DataFrame | None, str]:
 67      y_pred = None
 68      other_output_columns = None
 69      ERROR_MISSING_OUTPUT_COLUMN_NAME = (
 70          "Output column name is not specified for the multi-output model. "
 71          "Please set the correct output column name using the `predictions` parameter."
 72      )
 73  
 74      if isinstance(model_predictions, list) and all(isinstance(p, dict) for p in model_predictions):
 75          # Extract 'y_pred' and 'other_output_columns' from list of dictionaries
 76          if output_column_name in model_predictions[0]:
 77              y_pred = pd.Series(
 78                  [p.get(output_column_name) for p in model_predictions], name=output_column_name
 79              )
 80              other_output_columns = pd.DataFrame([
 81                  {k: v for k, v in p.items() if k != output_column_name} for p in model_predictions
 82              ])
 83          elif len(model_predictions[0]) == 1:
 84              # Set the only key as self.predictions and its value as self.y_pred
 85              key, value = list(model_predictions[0].items())[0]
 86              y_pred = pd.Series(value, name=key)
 87              output_column_name = key
 88          elif output_column_name is None:
 89              raise MlflowException(
 90                  ERROR_MISSING_OUTPUT_COLUMN_NAME,
 91                  error_code=INVALID_PARAMETER_VALUE,
 92              )
 93          else:
 94              raise MlflowException(
 95                  f"Output column name '{output_column_name}' is not found in the model "
 96                  f"predictions list: {model_predictions}. Please set the correct output column "
 97                  "name using the `predictions` parameter.",
 98                  error_code=INVALID_PARAMETER_VALUE,
 99              )
100      elif isinstance(model_predictions, pd.DataFrame):
101          if output_column_name in model_predictions.columns:
102              y_pred = model_predictions[output_column_name]
103              other_output_columns = model_predictions.drop(columns=output_column_name)
104          elif len(model_predictions.columns) == 1:
105              output_column_name = model_predictions.columns[0]
106              y_pred = model_predictions[output_column_name]
107          elif output_column_name is None:
108              raise MlflowException(
109                  ERROR_MISSING_OUTPUT_COLUMN_NAME,
110                  error_code=INVALID_PARAMETER_VALUE,
111              )
112          else:
113              raise MlflowException(
114                  f"Output column name '{output_column_name}' is not found in the model "
115                  f"predictions dataframe {model_predictions.columns}. Please set the correct "
116                  "output column name using the `predictions` parameter.",
117                  error_code=INVALID_PARAMETER_VALUE,
118              )
119      elif isinstance(model_predictions, dict):
120          if output_column_name in model_predictions:
121              y_pred = pd.Series(model_predictions[output_column_name], name=output_column_name)
122              other_output_columns = pd.DataFrame({
123                  k: v for k, v in model_predictions.items() if k != output_column_name
124              })
125          elif len(model_predictions) == 1:
126              key, value = list(model_predictions.items())[0]
127              y_pred = pd.Series(value, name=key)
128              output_column_name = key
129          elif output_column_name is None:
130              raise MlflowException(
131                  ERROR_MISSING_OUTPUT_COLUMN_NAME,
132                  error_code=INVALID_PARAMETER_VALUE,
133              )
134          else:
135              raise MlflowException(
136                  f"Output column name '{output_column_name}' is not found in the "
137                  f"model predictions dict {model_predictions}. Please set the correct "
138                  "output column name using the `predictions` parameter.",
139                  error_code=INVALID_PARAMETER_VALUE,
140              )
141  
142      return (
143          y_pred if y_pred is not None else model_predictions,
144          other_output_columns,
145          output_column_name,
146      )
147  
148  
149  def _extract_predict_fn(model: Any) -> Callable[..., Any] | None:
150      """
151      Extracts the predict function from the given model or raw_model.
152  
153      Precedence order:
154      1. If raw_model is specified, its predict function is used.
155      2. If model is specified, its predict function is used.
156      3. If none of the above, predict function is None.
157  
158      Args:
159          model: A model object that has a predict method.
160          raw_model: A raw model object that has a predict method.
161  
162      Returns: The predict function.
163      """
164      _, raw_model = _extract_raw_model(model)
165      predict_fn = None
166  
167      if raw_model is not None:
168          predict_fn = raw_model.predict
169          try:
170              from mlflow.xgboost import _wrapped_xgboost_model_predict_fn
171  
172              # Because shap evaluation will pass evaluation data in ndarray format
173              # (without feature names), if set validate_features=True it will raise error.
174              predict_fn = _wrapped_xgboost_model_predict_fn(raw_model, validate_features=False)
175          except ImportError:
176              pass
177  
178      elif model is not None:
179          predict_fn = model.predict
180  
181      return predict_fn
182  
183  
184  def _get_dataframe_with_renamed_columns(x, new_column_names):
185      """
186      Downstream inference functions may expect a pd.DataFrame to be created from x. However,
187      if x is already a pd.DataFrame, and new_column_names != x.columns, we cannot simply call
188      pd.DataFrame(x, columns=new_column_names) because the resulting pd.DataFrame will contain
189      NaNs for every column in new_column_names that does not exist in x.columns. This function
190      instead creates a new pd.DataFrame object from x, and then explicitly renames the columns
191      to avoid NaNs.
192  
193      Args:
194          x: A data object, such as a Pandas DataFrame, numPy array, or list
195          new_column_names: Column names for the output Pandas DataFrame
196  
197      Returns:
198          A pd.DataFrame with x as data, with columns new_column_names
199      """
200      df = pd.DataFrame(x)
201      return df.rename(columns=dict(zip(df.columns, new_column_names)))
202  
203  
204  def _get_aggregate_metrics_values(metrics):
205      return {name: MetricValue(aggregate_results={name: value}) for name, value in metrics.items()}
206  
207  
208  _matplotlib_config = {
209      "figure.dpi": 175,
210      "figure.figsize": [6.0, 4.0],
211      "figure.autolayout": True,
212      "font.size": 8,
213  }
214  
215  
216  class _CustomArtifact(NamedTuple):
217      """
218      A namedtuple representing a custom artifact function and its properties.
219  
220      function : the custom artifact function
221      name : the name of the custom artifact function
222      index : the index of the function in the ``custom_artifacts`` argument of mlflow.evaluate
223      artifacts_dir : the path to a temporary directory to store produced artifacts of the function
224      """
225  
226      function: Callable[..., Any]
227      name: str
228      index: int
229      artifacts_dir: str
230  
231  
232  def _is_valid_artifacts(artifacts):
233      return isinstance(artifacts, dict) and all(isinstance(k, str) for k in artifacts.keys())
234  
235  
236  def _evaluate_custom_artifacts(custom_artifact_tuple, eval_df, builtin_metrics):
237      """
238      This function calls the `custom_artifact` function and performs validations on the returned
239      result to ensure that they are in the expected format. It will raise a MlflowException if
240      the result is not in the expected format.
241  
242      Args:
243          custom_artifact_tuple: Containing a user provided function and its index in the
244              ``custom_artifacts`` parameter of ``mlflow.evaluate``
245          eval_df: A Pandas dataframe object containing a prediction and a target column.
246          builtin_metrics: A dictionary of metrics produced by the default evaluator.
247  
248      Returns:
249          A dictionary of artifacts.
250      """
251      exception_header = (
252          f"Custom artifact function '{custom_artifact_tuple.name}' "
253          " at index {custom_artifact_tuple.index}"
254          " in the `custom_artifacts` parameter"
255      )
256      artifacts = custom_artifact_tuple.function(
257          eval_df, builtin_metrics, custom_artifact_tuple.artifacts_dir
258      )
259  
260      if artifacts is None:
261          _logger.warning(f"{exception_header} returned None.")
262          return
263  
264      if not _is_valid_artifacts(artifacts):
265          _logger.warning(
266              f"{exception_header} did not return artifacts as a dictionary of string artifact "
267              "names with their corresponding objects."
268          )
269          return
270  
271      return artifacts
272  
273  
274  # TODO: Move this to the /evaluators directory
275  class BuiltInEvaluator(ModelEvaluator):
276      """
277      The base class for all evaluators that are built-in to MLflow.
278  
279      Each evaluator is responsible for implementing the `_evaluate()` method, which is called by
280      the `evaluate()` method of this base class. This class contains many helper methods used
281      across built-in evaluators, such as logging metrics, artifacts, and ordering metrics.
282      """
283  
284      def __init__(self):
285          self.client = MlflowClient()
286  
287      @abstractmethod
288      def _evaluate(
289          self,
290          model: Optional["mlflow.pyfunc.PyFuncModel"],
291          extra_metrics: list[EvaluationMetric],
292          custom_artifacts=None,
293          **kwargs,
294      ) -> EvaluationResult | None:
295          """Implement the evaluation logic for each evaluator."""
296  
297      def log_metrics(self):
298          """
299          Helper method to log metrics into specified run.
300          """
301          self._add_prefix_to_metrics()
302  
303          timestamp = get_current_time_millis()
304          self.client.log_batch(
305              self.run_id,
306              metrics=[
307                  Metric(
308                      key=key,
309                      value=value,
310                      timestamp=timestamp,
311                      step=0,
312                      model_id=self.model_id,
313                      dataset_name=self.dataset.name,
314                      dataset_digest=self.dataset.digest,
315                      run_id=self.run_id,
316                  )
317                  for key, value in self.aggregate_metrics.items()
318              ],
319          )
320  
321      def _log_image_artifact(
322          self,
323          do_plot,
324          artifact_name,
325      ):
326          from matplotlib import pyplot
327  
328          prefix = self.evaluator_config.get("metric_prefix", "")
329          artifact_file_name = f"{prefix}{artifact_name}.png"
330          artifact_file_local_path = self.temp_dir.path(artifact_file_name)
331  
332          try:
333              pyplot.clf()
334              do_plot()
335              pyplot.savefig(artifact_file_local_path, bbox_inches="tight")
336          except Exception as e:
337              _logger.warning(f"Failed to log image artifact {artifact_name!r}: {e!r}")
338          else:
339              mlflow.log_artifact(artifact_file_local_path)
340              artifact = ImageEvaluationArtifact(uri=mlflow.get_artifact_uri(artifact_file_name))
341              artifact._load(artifact_file_local_path)
342              self.artifacts[artifact_name] = artifact
343          finally:
344              pyplot.close(pyplot.gcf())
345  
346      def _evaluate_sklearn_model_score_if_scorable(self, model, y_true, sample_weights):
347          model_loader_module, raw_model = _extract_raw_model(model)
348          if model_loader_module == "mlflow.sklearn" and raw_model is not None:
349              try:
350                  score = raw_model.score(
351                      self.X.copy_to_avoid_mutation(), y_true, sample_weight=sample_weights
352                  )
353                  self.metrics_values.update(_get_aggregate_metrics_values({"score": score}))
354              except Exception as e:
355                  _logger.warning(
356                      f"Computing sklearn model score failed: {e!r}. Set logging level to "
357                      "DEBUG to see the full traceback."
358                  )
359                  _logger.debug("", exc_info=True)
360  
361      def _log_custom_metric_artifact(self, artifact_name, raw_artifact, custom_metric_tuple):
362          """
363          This function logs and returns a custom metric artifact. Two cases:
364              - The provided artifact is a path to a file, the function will make a copy of it with
365                a formatted name in a temporary directory and call mlflow.log_artifact.
366              - Otherwise: will attempt to save the artifact to an temporary path with an inferred
367                type. Then call mlflow.log_artifact.
368  
369          Args:
370              artifact_name: the name of the artifact
371              raw_artifact: the object representing the artifact
372              custom_metric_tuple: an instance of the _CustomMetric namedtuple
373  
374          Returns:
375              EvaluationArtifact
376          """
377  
378          exception_and_warning_header = (
379              f"Custom artifact function '{custom_metric_tuple.name}' at index "
380              f"{custom_metric_tuple.index} in the `custom_artifacts` parameter"
381          )
382  
383          inferred_from_path, inferred_type, inferred_ext = _infer_artifact_type_and_ext(
384              artifact_name, raw_artifact, custom_metric_tuple
385          )
386          artifact_file_local_path = self.temp_dir.path(artifact_name + inferred_ext)
387  
388          if pathlib.Path(artifact_file_local_path).exists():
389              raise MlflowException(
390                  f"{exception_and_warning_header} produced an artifact '{artifact_name}' that "
391                  "cannot be logged because there already exists an artifact with the same name."
392              )
393  
394          # ParquetEvaluationArtifact isn't explicitly stated here because such artifacts can only
395          # be supplied through file. Which is handled by the first if clause. This is because
396          # DataFrame objects default to be stored as CsvEvaluationArtifact.
397          if inferred_from_path:
398              shutil.copy2(raw_artifact, artifact_file_local_path)
399          elif inferred_type is JsonEvaluationArtifact:
400              with open(artifact_file_local_path, "w") as f:
401                  if isinstance(raw_artifact, str):
402                      f.write(raw_artifact)
403                  else:
404                      json.dump(raw_artifact, f, cls=NumpyEncoder)
405          elif inferred_type is CsvEvaluationArtifact:
406              raw_artifact.to_csv(artifact_file_local_path, index=False)
407          elif inferred_type is NumpyEvaluationArtifact:
408              np.save(artifact_file_local_path, raw_artifact, allow_pickle=False)
409          elif inferred_type is ImageEvaluationArtifact:
410              raw_artifact.savefig(artifact_file_local_path)
411          else:
412              # storing as pickle
413              try:
414                  with open(artifact_file_local_path, "wb") as f:
415                      pickle.dump(raw_artifact, f)
416                  _logger.warning(
417                      f"{exception_and_warning_header} produced an artifact '{artifact_name}'"
418                      f" with type '{type(raw_artifact)}' that is logged as a pickle artifact."
419                  )
420              except pickle.PickleError:
421                  raise MlflowException(
422                      f"{exception_and_warning_header} produced an unsupported artifact "
423                      f"'{artifact_name}' with type '{type(raw_artifact)}' that cannot be pickled. "
424                      "Supported object types for artifacts are:\n"
425                      "- A string uri representing the file path to the artifact. MLflow"
426                      "  will infer the type of the artifact based on the file extension.\n"
427                      "- A string representation of a JSON object. This will be saved as a "
428                      ".json artifact.\n"
429                      "- Pandas DataFrame. This will be saved as a .csv artifact."
430                      "- Numpy array. This will be saved as a .npy artifact."
431                      "- Matplotlib Figure. This will be saved as an .png image artifact."
432                      "- Other objects will be attempted to be pickled with default protocol."
433                  )
434  
435          mlflow.log_artifact(artifact_file_local_path)
436          artifact = inferred_type(uri=mlflow.get_artifact_uri(artifact_name + inferred_ext))
437          artifact._load(artifact_file_local_path)
438          return artifact
439  
440      def _get_column_in_metrics_values(self, column):
441          for metric_name, metric_value in self.metrics_values.items():
442              if metric_name.split("/")[0] == column:
443                  return metric_value
444  
445      def _get_args_for_metrics(
446          self,
447          metric: MetricDefinition,
448          eval_df: pd.DataFrame,
449          input_df: pd.DataFrame,
450          other_output_df: pd.DataFrame | None,
451      ) -> tuple[bool, list[str | pd.DataFrame]]:
452          """
453          Given a metric_tuple, read the signature of the metric function and get the appropriate
454          arguments from the input/output columns, other calculated metrics, and evaluator_config.
455  
456          Args:
457              metric: The metric definition containing a user provided function and its index
458                  in the ``extra_metrics`` parameter of ``mlflow.evaluate``.
459              eval_df: The evaluation dataframe containing the prediction and target columns.
460              input_df: The input dataframe containing the features used to make predictions.
461              other_output_df: A dataframe containing all model output columns but the predictions.
462  
463          Returns:
464              tuple: A tuple of (bool, list) where the bool indicates if the given metric can
465              be calculated with the given eval_df, metrics, and input_df.
466                  - If the user is missing "targets" or "predictions" parameters when needed, or we
467                  cannot find a column or metric for a parameter to the metric, return
468                      (False, list of missing parameters)
469                  - If all arguments to the metric function were found, return
470                      (True, list of arguments).
471          """
472          # deepcopying eval_df and builtin_metrics for each custom metric function call,
473          # in case the user modifies them inside their function(s).
474          eval_df_copy = eval_df.copy()
475          parameters = inspect.signature(metric.function).parameters
476          eval_fn_args = []
477          params_not_found = []
478          if len(parameters) == 2:
479              param_0_name, param_1_name = parameters.keys()
480  
481          # eval_fn has parameters (eval_df, builtin_metrics) for backwards compatibility
482          if len(parameters) == 2 and param_0_name != "predictions" and param_1_name != "targets":
483              eval_fn_args.append(eval_df_copy)
484              self._update_aggregate_metrics()
485              eval_fn_args.append(copy.deepcopy(self.aggregate_metrics))
486          # eval_fn can have parameters like (predictions, targets, metrics, random_col)
487          else:
488              for param_name, param in parameters.items():
489                  column = self.col_mapping.get(param_name, param_name)
490  
491                  if column in ("predictions", self.predictions, self.dataset.predictions_name):
492                      eval_fn_args.append(eval_df_copy["prediction"])
493                  elif column in ("targets", self.dataset.targets_name):
494                      if "target" in eval_df_copy:
495                          eval_fn_args.append(eval_df_copy["target"])
496                      else:
497                          if param.default == inspect.Parameter.empty:
498                              params_not_found.append(param_name)
499                          else:
500                              eval_fn_args.append(param.default)
501                  elif column == "metrics":
502                      eval_fn_args.append(copy.deepcopy(self.metrics_values))
503                  else:
504                      # case when column passed in col_mapping contains the entire column
505                      if not isinstance(column, str):
506                          eval_fn_args.append(column)
507  
508                      # case column in col_mapping is string and the column value
509                      # is part of the input_df
510                      elif column in input_df.columns:
511                          eval_fn_args.append(input_df[column])
512  
513                      # case column in col_mapping is string and the column value
514                      # is part of the output_df(other than predictions)
515                      elif other_output_df is not None and column in other_output_df.columns:
516                          self.other_output_columns_for_eval.add(column)
517                          eval_fn_args.append(other_output_df[column])
518  
519                      # case where the param is defined as part of the evaluator_config
520                      elif column in self.evaluator_config:
521                          eval_fn_args.append(self.evaluator_config.get(column))
522  
523                      # case where this is the name of another metric
524                      elif metric_value := self._get_column_in_metrics_values(column):
525                          eval_fn_args.append(metric_value)
526  
527                      # in the case that:
528                      # the metric has not been calculated yet, but is scheduled to be calculated
529                      # "before" this metric in self.ordered_metrics, we append None to indicate
530                      # that there is not an error in the dependencies
531                      elif column in [metric_tuple.name for metric_tuple in self.ordered_metrics]:
532                          eval_fn_args.append(None)
533  
534                      elif param.default == inspect.Parameter.empty:
535                          params_not_found.append(param_name)
536                      else:
537                          eval_fn_args.append(param.default)
538  
539          if len(params_not_found) > 0:
540              return False, params_not_found
541          return True, eval_fn_args
542  
543      def evaluate_and_log_custom_artifacts(
544          self,
545          custom_artifacts: list[_CustomArtifact],
546          prediction: pd.Series,
547          target: np.ndarray | None = None,
548      ):
549          """Evaluate custom artifacts provided by users."""
550          if not custom_artifacts:
551              return
552  
553          eval_df = self._get_eval_df(prediction, target)
554          for index, custom_artifact in enumerate(custom_artifacts):
555              with tempfile.TemporaryDirectory() as artifacts_dir:
556                  # deepcopying eval_df and builtin_metrics for each custom artifact function call,
557                  # in case the user modifies them inside their function(s).
558                  custom_artifact_tuple = _CustomArtifact(
559                      function=custom_artifact,
560                      index=index,
561                      name=getattr(custom_artifact, "__name__", repr(custom_artifact)),
562                      artifacts_dir=artifacts_dir,
563                  )
564                  artifact_results = _evaluate_custom_artifacts(
565                      custom_artifact_tuple,
566                      eval_df.copy(),
567                      copy.deepcopy(self.metrics_values),
568                  )
569                  if artifact_results:
570                      for artifact_name, raw_artifact in artifact_results.items():
571                          self.artifacts[artifact_name] = self._log_custom_metric_artifact(
572                              artifact_name,
573                              raw_artifact,
574                              custom_artifact_tuple,
575                          )
576  
577      def _get_error_message_missing_columns(self, metric_name, param_names):
578          error_message_parts = [f"Metric '{metric_name}' requires the following:"]
579  
580          special_params = ["targets", "predictions"]
581          error_message_parts.extend(
582              f"  - the '{param}' parameter needs to be specified"
583              for param in special_params
584              if param in param_names
585          )
586  
587          if remaining_params := [param for param in param_names if param not in special_params]:
588              error_message_parts.append(
589                  f"  - missing columns {remaining_params} need to be defined or mapped"
590              )
591  
592          return "\n".join(error_message_parts)
593  
594      def _construct_error_message_for_malformed_metrics(
595          self, malformed_results, input_columns, output_columns
596      ):
597          error_messages = [
598              self._get_error_message_missing_columns(metric_name, param_names)
599              for metric_name, param_names in malformed_results
600          ]
601          joined_error_message = "\n".join(error_messages)
602  
603          full_message = f"""Error: Metric calculation failed for the following metrics:
604          {joined_error_message}
605  
606          Below are the existing column names for the input/output data:
607          Input Columns: {input_columns}
608          Output Columns: {output_columns}
609  
610          To resolve this issue, you may need to:
611           - specify any required parameters
612           - if you are missing columns, check that there are no circular dependencies among your
613           metrics, and you may want to map them to an existing column using the following
614           configuration:
615          evaluator_config={{'col_mapping': {{<missing column name>: <existing column name>}}}}"""
616  
617          return "\n".join(l.lstrip() for l in full_message.splitlines())
618  
619      def _raise_exception_for_malformed_metrics(self, malformed_results, eval_df, other_output_df):
620          output_columns = [] if other_output_df is None else list(other_output_df.columns)
621          if self.predictions:
622              output_columns.append(self.predictions)
623          elif self.dataset.predictions_name:
624              output_columns.append(self.dataset.predictions_name)
625          else:
626              output_columns.append("predictions")
627  
628          input_columns = list(self.X.copy_to_avoid_mutation().columns)
629          if "target" in eval_df:
630              if self.dataset.targets_name:
631                  input_columns.append(self.dataset.targets_name)
632              else:
633                  input_columns.append("targets")
634  
635          error_message = self._construct_error_message_for_malformed_metrics(
636              malformed_results, input_columns, output_columns
637          )
638  
639          raise MlflowException(error_message, error_code=INVALID_PARAMETER_VALUE)
640  
641      def _get_eval_df(self, prediction: pd.Series, target: np.ndarray | None = None):
642          """
643          Create a DataFrame with "prediction" and "target" columns.
644  
645          This is a standard format that can be passed to the metric functions.
646          """
647          eval_df = pd.DataFrame({"prediction": copy.deepcopy(prediction)})
648          if target is not None:
649              eval_df["target"] = target
650          return eval_df
651  
652      def _order_metrics(
653          self,
654          metrics: list[EvaluationMetric],
655          eval_df: pd.DataFrame,
656          other_output_df: pd.DataFrame | None,
657      ):
658          """
659          Order the list metrics so they can be computed in sequence.
660  
661          Some metrics might use the results of other metrics to compute their own results. This
662          function iteratively resolve this dependency, by checking if each metric can be computed
663          with the current available columns and metrics values.
664          """
665          remaining_metrics = metrics
666          input_df = self.X.copy_to_avoid_mutation()
667  
668          while len(remaining_metrics) > 0:
669              pending_metrics = []
670              failed_results = []
671              did_append_metric = False
672              for metric_tuple in remaining_metrics:
673                  can_calculate, eval_fn_args = self._get_args_for_metrics(
674                      metric_tuple, eval_df, input_df, other_output_df
675                  )
676                  if can_calculate:
677                      self.ordered_metrics.append(metric_tuple)
678                      did_append_metric = True
679                  else:  # cannot calculate the metric yet
680                      pending_metrics.append(metric_tuple)
681                      failed_results.append((metric_tuple.name, eval_fn_args))
682  
683              # cant calculate any more metrics
684              if not did_append_metric:
685                  self._raise_exception_for_malformed_metrics(
686                      failed_results, eval_df, other_output_df
687                  )
688  
689              remaining_metrics = pending_metrics
690  
691          return self.ordered_metrics
692  
693      def _test_first_row(
694          self,
695          metrics: list[MetricDefinition],
696          eval_df: pd.DataFrame,
697          other_output_df: pd.DataFrame | None,
698      ):
699          # test calculations on first row of eval_df
700          _logger.info("Testing metrics on first row...")
701          exceptions = []
702          first_row_df = eval_df.iloc[[0]]
703          first_row_input_df = self.X.copy_to_avoid_mutation().iloc[[0]]
704          for metric in metrics:
705              try:
706                  _, eval_fn_args = self._get_args_for_metrics(
707                      metric, first_row_df, first_row_input_df, other_output_df
708                  )
709                  if metric_value := metric.evaluate(eval_fn_args):
710                      name = f"{metric.name}/{metric.version}" if metric.version else metric.name
711                      self.metrics_values.update({name: metric_value})
712              except Exception as e:
713                  stacktrace_str = traceback.format_exc()
714                  if isinstance(e, MlflowException):
715                      exceptions.append(
716                          f"Metric '{metric.name}': Error:\n{e.message}\n{stacktrace_str}"
717                      )
718                  else:
719                      exceptions.append(f"Metric '{metric.name}': Error:\n{e!r}\n{stacktrace_str}")
720  
721          if len(exceptions) > 0:
722              raise MlflowException("\n".join(exceptions))
723  
724      def evaluate_metrics(
725          self,
726          metrics: list[EvaluationMetric],
727          prediction: pd.Series,
728          target: np.ndarray | None = None,
729          other_output_df: pd.DataFrame | None = None,
730      ):
731          """
732          Evaluate the metrics on the given prediction and target data.
733  
734          Args:
735              metrics: A list of metrics to evaluate.
736              prediction: A Pandas Series containing the predictions.
737              target: A numpy array containing the target values.
738              other_output_df: A Pandas DataFrame containing other output columns from the model.
739  
740          Returns:
741              None, the metrics values are recorded in the self.metrics_values dictionary.
742          """
743  
744          eval_df = self._get_eval_df(prediction, target)
745          metrics = [
746              MetricDefinition.from_index_and_metric(i, metric) for i, metric in enumerate(metrics)
747          ]
748          metrics = self._order_metrics(metrics, eval_df, other_output_df)
749  
750          self._test_first_row(metrics, eval_df, other_output_df)
751  
752          # calculate metrics for the full eval_df
753          input_df = self.X.copy_to_avoid_mutation()
754          for metric in metrics:
755              _, eval_fn_args = self._get_args_for_metrics(metric, eval_df, input_df, other_output_df)
756              if metric_value := metric.evaluate(eval_fn_args):
757                  name = f"{metric.name}/{metric.version}" if metric.version else metric.name
758                  self.metrics_values.update({name: metric_value})
759  
760      def log_eval_table(self, y_pred, other_output_columns=None):
761          # only log eval table if there are per row metrics recorded
762          if not any(
763              metric_value.scores is not None or metric_value.justifications is not None
764              for _, metric_value in self.metrics_values.items()
765          ):
766              return
767  
768          metric_prefix = self.evaluator_config.get("metric_prefix", "")
769          if not isinstance(metric_prefix, str):
770              metric_prefix = ""
771          if isinstance(self.dataset.features_data, pd.DataFrame):
772              # Handle DataFrame case
773              if self.dataset.has_targets:
774                  data = self.dataset.features_data.assign(**{
775                      self.dataset.targets_name or "target": self.dataset.labels_data,
776                      self.dataset.predictions_name or self.predictions or "outputs": y_pred,
777                  })
778              else:
779                  data = self.dataset.features_data.assign(outputs=y_pred)
780          else:
781              # Handle NumPy array case, converting it to a DataFrame
782              data = pd.DataFrame(self.dataset.features_data, columns=self.dataset.feature_names)
783              if self.dataset.has_targets:
784                  data = data.assign(**{
785                      self.dataset.targets_name or "target": self.dataset.labels_data,
786                      self.dataset.predictions_name or self.predictions or "outputs": y_pred,
787                  })
788              else:
789                  data = data.assign(outputs=y_pred)
790  
791          # Include other_output_columns used in evaluation to the eval table
792          if other_output_columns is not None and len(self.other_output_columns_for_eval) > 0:
793              for column in self.other_output_columns_for_eval:
794                  data[column] = other_output_columns[column]
795  
796          columns = {}
797          for metric_name, metric_value in self.metrics_values.items():
798              scores = metric_value.scores
799              justifications = metric_value.justifications
800  
801              if scores:
802                  if metric_name.startswith(metric_prefix) and metric_name[len(metric_prefix) :] in [
803                      _TOKEN_COUNT_METRIC_NAME,
804                      _LATENCY_METRIC_NAME,
805                  ]:
806                      columns[metric_name] = scores
807                  else:
808                      columns[f"{metric_name}/score"] = scores
809              if justifications:
810                  columns[f"{metric_name}/justification"] = justifications
811          data = data.assign(**columns)
812          artifact_file_name = f"{metric_prefix}{_EVAL_TABLE_FILE_NAME}"
813          mlflow.log_table(data, artifact_file=artifact_file_name)
814          if self.eval_results_path:
815              eval_table_spark = self.spark_session.createDataFrame(data)
816              try:
817                  eval_table_spark.write.mode(self.eval_results_mode).option(
818                      "mergeSchema", "true"
819                  ).format("delta").saveAsTable(self.eval_results_path)
820              except Exception as e:
821                  _logger.info(f"Saving eval table to delta table failed. Reason: {e}")
822  
823          name = _EVAL_TABLE_FILE_NAME.split(".", 1)[0]
824          self.artifacts[name] = JsonEvaluationArtifact(
825              uri=mlflow.get_artifact_uri(artifact_file_name)
826          )
827  
828      def _update_aggregate_metrics(self):
829          self.aggregate_metrics = {}
830          for metric_name, metric_value in self.metrics_values.items():
831              if metric_value.aggregate_results:
832                  for agg_name, agg_value in metric_value.aggregate_results.items():
833                      if agg_value is not None:
834                          if agg_name == metric_name.split("/")[0]:
835                              self.aggregate_metrics[metric_name] = agg_value
836                          else:
837                              self.aggregate_metrics[f"{metric_name}/{agg_name}"] = agg_value
838  
839      def _add_prefix_to_metrics(self):
840          def _prefix_value(value):
841              aggregate = (
842                  {f"{prefix}{k}": v for k, v in value.aggregate_results.items()}
843                  if value.aggregate_results
844                  else None
845              )
846              return MetricValue(value.scores, value.justifications, aggregate)
847  
848          if prefix := self.evaluator_config.get("metric_prefix"):
849              self.metrics_values = {
850                  f"{prefix}{k}": _prefix_value(v) for k, v in self.metrics_values.items()
851              }
852  
853          self._update_aggregate_metrics()
854  
855      def evaluate(
856          self,
857          *,
858          model_type,
859          dataset,
860          run_id,
861          evaluator_config,
862          model: "mlflow.pyfunc.PyFuncModel" = None,
863          extra_metrics=None,
864          custom_artifacts=None,
865          predictions=None,
866          model_id=None,
867          **kwargs,
868      ) -> EvaluationResult:
869          if model is None and predictions is None and dataset.predictions_data is None:
870              raise MlflowException(
871                  message=(
872                      "Either a model or set of predictions must be specified in order to use the"
873                      " default evaluator. Either specify the `model` parameter, the `predictions`"
874                      " parameter, an MLflow dataset containing the `predictions` column name"
875                      " (via the `data` parameter), or a different evaluator (via the `evaluators`"
876                      " parameter)."
877                  ),
878                  error_code=INVALID_PARAMETER_VALUE,
879              )
880  
881          self.artifacts = {}
882          self.aggregate_metrics = {}
883          self.metrics_values = {}
884          self.ordered_metrics = []
885          self.other_output_columns_for_eval = set()
886  
887          self.dataset: EvaluationDataset = dataset
888          self.run_id = run_id
889          self.model_type = model_type
890          self.model_id = model_id
891          self.evaluator_config = evaluator_config
892  
893          self.predictions = predictions
894          self.col_mapping = self.evaluator_config.get("col_mapping", {})
895          self.eval_results_path = self.evaluator_config.get("eval_results_path")
896          self.eval_results_mode = self.evaluator_config.get("eval_results_mode", "overwrite")
897  
898          if self.eval_results_path:
899              from mlflow.utils._spark_utils import _get_active_spark_session
900  
901              self.spark_session = _get_active_spark_session()
902              if not self.spark_session:
903                  raise MlflowException(
904                      message="eval_results_path is only supported in Spark environment. ",
905                      error_code=INVALID_PARAMETER_VALUE,
906                  )
907  
908              if self.eval_results_mode not in ["overwrite", "append"]:
909                  raise MlflowException(
910                      message="eval_results_mode can only be 'overwrite' or 'append'. ",
911                      error_code=INVALID_PARAMETER_VALUE,
912                  )
913  
914          if extra_metrics is None:
915              extra_metrics = []
916  
917          bad_metrics = [
918              metric for metric in extra_metrics if not isinstance(metric, EvaluationMetric)
919          ]
920          if len(bad_metrics) > 0:
921              message = "\n".join([
922                  f"- Metric '{m}' has type '{type(m).__name__}'" for m in bad_metrics
923              ])
924              raise MlflowException(
925                  f"In the 'extra_metrics' parameter, the following metrics have the wrong type:\n"
926                  f"{message}\n"
927                  f"Please ensure that all extra metrics are instances of "
928                  f"mlflow.metrics.EvaluationMetric."
929              )
930  
931          import matplotlib
932  
933          with TempDir() as temp_dir, matplotlib.rc_context(_matplotlib_config):
934              self.temp_dir = temp_dir
935              return self._evaluate(model, extra_metrics, custom_artifacts)
936  
937      @property
938      def X(self) -> pd.DataFrame:
939          """
940          The features (`X`) portion of the dataset, guarded against accidental mutations.
941          """
942          return BuiltInEvaluator._MutationGuardedData(
943              _get_dataframe_with_renamed_columns(
944                  self.dataset.features_data, self.dataset.feature_names
945              )
946          )
947  
948      class _MutationGuardedData:
949          """
950          Wrapper around a data object that requires explicit API calls to obtain either a copy
951          of the data object, or, in cases where the caller can guaranteed that the object will not
952          be mutated, the original data object.
953          """
954  
955          def __init__(self, data):
956              """
957              Args:
958                  data: A data object, such as a Pandas DataFrame, numPy array, or list.
959              """
960              self._data = data
961  
962          def copy_to_avoid_mutation(self):
963              """
964              Obtain a copy of the data. This method should be called every time the data needs
965              to be used in a context where it may be subsequently mutated, guarding against
966              accidental reuse after mutation.
967  
968              Returns:
969                  A copy of the data object.
970              """
971              if isinstance(self._data, pd.DataFrame):
972                  return self._data.copy(deep=True)
973              else:
974                  return copy.deepcopy(self._data)
975  
976          def get_original(self):
977              """
978              Obtain the original data object. This method should only be called if the caller
979              can guarantee that it will not mutate the data during subsequent operations.
980  
981              Returns:
982                  The original data object.
983              """
984              return self._data