/ tests / evaluate / logging / utils.py
utils.py
  1  import pandas as pd
  2  
  3  from mlflow.evaluation.evaluation import EvaluationEntity as EvaluationEntity
  4  from mlflow.evaluation.utils import (
  5      _get_assessments_dataframe_schema,
  6      _get_evaluations_dataframe_schema,
  7      _get_metrics_dataframe_schema,
  8      _get_tags_dataframe_schema,
  9  )
 10  from mlflow.exceptions import MlflowException
 11  from mlflow.protos.databricks_pb2 import INTERNAL_ERROR, RESOURCE_DOES_NOT_EXIST
 12  from mlflow.tracking.client import MlflowClient
 13  
 14  
 15  def get_evaluation(*, run_id: str, evaluation_id: str) -> EvaluationEntity:
 16      """
 17      Retrieves an Evaluation object from an MLflow Run.
 18  
 19      Args:
 20          run_id (str): ID of the MLflow Run containing the evaluation.
 21          evaluation_id (str): The ID of the evaluation.
 22  
 23      Returns:
 24          Evaluation: The Evaluation object.
 25      """
 26      client = MlflowClient()
 27      if not _contains_evaluation_artifacts(client=client, run_id=run_id):
 28          raise MlflowException(
 29              "The specified run does not contain any evaluations. "
 30              "Please log evaluations to the run before retrieving them.",
 31              error_code=RESOURCE_DOES_NOT_EXIST,
 32          )
 33  
 34      evaluations_file = client.download_artifacts(run_id=run_id, path="_evaluations.json")
 35      evaluations_df = _read_evaluations_dataframe(evaluations_file)
 36  
 37      assessments_file = client.download_artifacts(run_id=run_id, path="_assessments.json")
 38      assessments_df = _read_assessments_dataframe(assessments_file)
 39  
 40      metrics_file = client.download_artifacts(run_id=run_id, path="_metrics.json")
 41      metrics_df = _read_metrics_dataframe(metrics_file)
 42  
 43      tags_file = client.download_artifacts(run_id=run_id, path="_tags.json")
 44      tags_df = _read_tags_dataframe(tags_file)
 45  
 46      return _get_evaluation_from_dataframes(
 47          run_id=run_id,
 48          evaluation_id=evaluation_id,
 49          evaluations_df=evaluations_df,
 50          metrics_df=metrics_df,
 51          assessments_df=assessments_df,
 52          tags_df=tags_df,
 53      )
 54  
 55  
 56  def _contains_evaluation_artifacts(*, client: MlflowClient, run_id: str) -> bool:
 57      return {"_evaluations.json", "_metrics.json", "_assessments.json", "_tags.json"}.issubset({
 58          file.path for file in client.list_artifacts(run_id)
 59      })
 60  
 61  
 62  def _read_evaluations_dataframe(path: str) -> pd.DataFrame:
 63      """
 64      Reads an evaluations DataFrame from a file.
 65  
 66      Args:
 67          path (str): Path to the file.
 68  
 69      Returns:
 70          pd.DataFrame: The evaluations DataFrame.
 71      """
 72      schema = _get_evaluations_dataframe_schema()
 73      return pd.read_json(path, orient="split", dtype=schema, convert_dates=False).replace(
 74          pd.NA, None
 75      )
 76  
 77  
 78  def _read_assessments_dataframe(path: str) -> pd.DataFrame:
 79      """
 80      Reads an assessments DataFrame from a file.
 81  
 82      Args:
 83          path (str): Path to the file.
 84  
 85      Returns:
 86          pd.DataFrame: The assessments DataFrame.
 87      """
 88      schema = _get_assessments_dataframe_schema()
 89      return pd.read_json(path, orient="split", dtype=schema, convert_dates=False).replace(
 90          pd.NA, None
 91      )
 92  
 93  
 94  def _read_metrics_dataframe(path: str) -> pd.DataFrame:
 95      """
 96      Reads a metrics DataFrame from a file.
 97  
 98      Args:
 99          path (str): Path to the file.
100  
101      Returns:
102          pd.DataFrame: The metrics DataFrame.
103      """
104      schema = _get_metrics_dataframe_schema()
105      return pd.read_json(path, orient="split", dtype=schema, convert_dates=False).replace(
106          pd.NA, None
107      )
108  
109  
110  def _read_tags_dataframe(path: str) -> pd.DataFrame:
111      """
112      Reads a tags DataFrame from a file.
113  
114      Args:
115          path (str): Path to the file.
116  
117      Returns:
118          pd.DataFrame: The tags DataFrame.
119      """
120      schema = _get_tags_dataframe_schema()
121      return pd.read_json(path, orient="split", dtype=schema, convert_dates=False).replace(
122          pd.NA, None
123      )
124  
125  
126  def _get_evaluation_from_dataframes(
127      *,
128      run_id: str,
129      evaluation_id: str,
130      evaluations_df: pd.DataFrame,
131      metrics_df: pd.DataFrame,
132      assessments_df: pd.DataFrame,
133      tags_df: pd.DataFrame,
134  ) -> EvaluationEntity:
135      """
136      Parses an Evaluation object with the specified evaluation ID from the specified DataFrames.
137      """
138      evaluation_row = evaluations_df[evaluations_df["evaluation_id"] == evaluation_id]
139      if evaluation_row.empty:
140          raise MlflowException(
141              f"The specified evaluation ID '{evaluation_id}' does not exist in the run '{run_id}'.",
142              error_code=RESOURCE_DOES_NOT_EXIST,
143          )
144  
145      evaluations: list[EvaluationEntity] = _dataframes_to_evaluations(
146          evaluations_df=evaluation_row,
147          metrics_df=metrics_df,
148          assessments_df=assessments_df,
149          tags_df=tags_df,
150      )
151      if len(evaluations) != 1:
152          raise MlflowException(
153              f"Expected to find a single evaluation with ID '{evaluation_id}', but found "
154              f"{len(evaluations)} evaluations.",
155              error_code=INTERNAL_ERROR,
156          )
157  
158      return evaluations[0]
159  
160  
161  def _dataframes_to_evaluations(
162      evaluations_df: pd.DataFrame,
163      metrics_df: pd.DataFrame,
164      assessments_df: pd.DataFrame,
165      tags_df: pd.DataFrame,
166  ) -> list[EvaluationEntity]:
167      """
168      Converts four separate DataFrames (main evaluation data, metrics, assessments, and tags) back
169      into a list of Evaluation entities.
170  
171      Args:
172          evaluations_df (pd.DataFrame): DataFrame with the main evaluation data
173              (excluding assessment and metrics).
174          metrics_df (pd.DataFrame): DataFrame with metrics.
175          assessments_df (pd.DataFrame): DataFrame with assessments.
176          tags_df (pd.DataFrame): DataFrame with tags.
177  
178      Returns:
179          List[EvaluationEntity]: A list of Evaluation entities created from the DataFrames.
180      """
181      # Group metrics and assessment by evaluation_id
182      metrics_by_eval = _group_dataframe_by_evaluation_id(metrics_df)
183      assessments_by_eval = _group_dataframe_by_evaluation_id(assessments_df)
184      tags_by_eval = _group_dataframe_by_evaluation_id(tags_df)
185  
186      # Convert main DataFrame to list of dictionaries and create Evaluation objects
187      evaluations = []
188      for eval_dict in evaluations_df.to_dict(orient="records"):
189          evaluation_id = eval_dict["evaluation_id"]
190          eval_dict["metrics"] = [
191              {
192                  "key": metric["key"],
193                  "value": metric["value"],
194                  "timestamp": metric["timestamp"],
195                  # Evaluation metrics don't have steps, but we're reusing the MLflow Metric
196                  # class to represent Evaluation metrics as entities in Python for now. Accordingly,
197                  # we set the step to 0 in order to parse the evaluation metric as an MLflow Metric
198                  # Python entity
199                  "step": 0,
200                  # Also discard the evaluation_id field from the evaluation metric, since this
201                  # field is not part of the MLflow Metric Python entity
202              }
203              for metric in metrics_by_eval.get(evaluation_id, [])
204          ]
205          eval_dict["assessments"] = assessments_by_eval.get(evaluation_id, [])
206          eval_dict["tags"] = tags_by_eval.get(evaluation_id, [])
207          evaluations.append(EvaluationEntity.from_dictionary(eval_dict))
208  
209      return evaluations
210  
211  
212  def _group_dataframe_by_evaluation_id(df: pd.DataFrame):
213      """
214      Groups evaluation dataframe rows by 'evaluation_id'.
215  
216      Args:
217          df (pd.DataFrame): DataFrame to group.
218  
219      Returns:
220          Dict[str, List]: A dictionary with 'evaluation_id' as keys and lists of entity
221              dictionaries as values.
222      """
223      grouped = df.groupby("evaluation_id", group_keys=False).apply(
224          lambda x: x.to_dict(orient="records")
225      )
226      return grouped.to_dict()