utils.py
1 import pandas as pd 2 3 from mlflow.evaluation.evaluation import EvaluationEntity as EvaluationEntity 4 from mlflow.evaluation.utils import ( 5 _get_assessments_dataframe_schema, 6 _get_evaluations_dataframe_schema, 7 _get_metrics_dataframe_schema, 8 _get_tags_dataframe_schema, 9 ) 10 from mlflow.exceptions import MlflowException 11 from mlflow.protos.databricks_pb2 import INTERNAL_ERROR, RESOURCE_DOES_NOT_EXIST 12 from mlflow.tracking.client import MlflowClient 13 14 15 def get_evaluation(*, run_id: str, evaluation_id: str) -> EvaluationEntity: 16 """ 17 Retrieves an Evaluation object from an MLflow Run. 18 19 Args: 20 run_id (str): ID of the MLflow Run containing the evaluation. 21 evaluation_id (str): The ID of the evaluation. 22 23 Returns: 24 Evaluation: The Evaluation object. 25 """ 26 client = MlflowClient() 27 if not _contains_evaluation_artifacts(client=client, run_id=run_id): 28 raise MlflowException( 29 "The specified run does not contain any evaluations. " 30 "Please log evaluations to the run before retrieving them.", 31 error_code=RESOURCE_DOES_NOT_EXIST, 32 ) 33 34 evaluations_file = client.download_artifacts(run_id=run_id, path="_evaluations.json") 35 evaluations_df = _read_evaluations_dataframe(evaluations_file) 36 37 assessments_file = client.download_artifacts(run_id=run_id, path="_assessments.json") 38 assessments_df = _read_assessments_dataframe(assessments_file) 39 40 metrics_file = client.download_artifacts(run_id=run_id, path="_metrics.json") 41 metrics_df = _read_metrics_dataframe(metrics_file) 42 43 tags_file = client.download_artifacts(run_id=run_id, path="_tags.json") 44 tags_df = _read_tags_dataframe(tags_file) 45 46 return _get_evaluation_from_dataframes( 47 run_id=run_id, 48 evaluation_id=evaluation_id, 49 evaluations_df=evaluations_df, 50 metrics_df=metrics_df, 51 assessments_df=assessments_df, 52 tags_df=tags_df, 53 ) 54 55 56 def _contains_evaluation_artifacts(*, client: MlflowClient, run_id: str) -> bool: 57 return {"_evaluations.json", "_metrics.json", "_assessments.json", "_tags.json"}.issubset({ 58 file.path for file in client.list_artifacts(run_id) 59 }) 60 61 62 def _read_evaluations_dataframe(path: str) -> pd.DataFrame: 63 """ 64 Reads an evaluations DataFrame from a file. 65 66 Args: 67 path (str): Path to the file. 68 69 Returns: 70 pd.DataFrame: The evaluations DataFrame. 71 """ 72 schema = _get_evaluations_dataframe_schema() 73 return pd.read_json(path, orient="split", dtype=schema, convert_dates=False).replace( 74 pd.NA, None 75 ) 76 77 78 def _read_assessments_dataframe(path: str) -> pd.DataFrame: 79 """ 80 Reads an assessments DataFrame from a file. 81 82 Args: 83 path (str): Path to the file. 84 85 Returns: 86 pd.DataFrame: The assessments DataFrame. 87 """ 88 schema = _get_assessments_dataframe_schema() 89 return pd.read_json(path, orient="split", dtype=schema, convert_dates=False).replace( 90 pd.NA, None 91 ) 92 93 94 def _read_metrics_dataframe(path: str) -> pd.DataFrame: 95 """ 96 Reads a metrics DataFrame from a file. 97 98 Args: 99 path (str): Path to the file. 100 101 Returns: 102 pd.DataFrame: The metrics DataFrame. 103 """ 104 schema = _get_metrics_dataframe_schema() 105 return pd.read_json(path, orient="split", dtype=schema, convert_dates=False).replace( 106 pd.NA, None 107 ) 108 109 110 def _read_tags_dataframe(path: str) -> pd.DataFrame: 111 """ 112 Reads a tags DataFrame from a file. 113 114 Args: 115 path (str): Path to the file. 116 117 Returns: 118 pd.DataFrame: The tags DataFrame. 119 """ 120 schema = _get_tags_dataframe_schema() 121 return pd.read_json(path, orient="split", dtype=schema, convert_dates=False).replace( 122 pd.NA, None 123 ) 124 125 126 def _get_evaluation_from_dataframes( 127 *, 128 run_id: str, 129 evaluation_id: str, 130 evaluations_df: pd.DataFrame, 131 metrics_df: pd.DataFrame, 132 assessments_df: pd.DataFrame, 133 tags_df: pd.DataFrame, 134 ) -> EvaluationEntity: 135 """ 136 Parses an Evaluation object with the specified evaluation ID from the specified DataFrames. 137 """ 138 evaluation_row = evaluations_df[evaluations_df["evaluation_id"] == evaluation_id] 139 if evaluation_row.empty: 140 raise MlflowException( 141 f"The specified evaluation ID '{evaluation_id}' does not exist in the run '{run_id}'.", 142 error_code=RESOURCE_DOES_NOT_EXIST, 143 ) 144 145 evaluations: list[EvaluationEntity] = _dataframes_to_evaluations( 146 evaluations_df=evaluation_row, 147 metrics_df=metrics_df, 148 assessments_df=assessments_df, 149 tags_df=tags_df, 150 ) 151 if len(evaluations) != 1: 152 raise MlflowException( 153 f"Expected to find a single evaluation with ID '{evaluation_id}', but found " 154 f"{len(evaluations)} evaluations.", 155 error_code=INTERNAL_ERROR, 156 ) 157 158 return evaluations[0] 159 160 161 def _dataframes_to_evaluations( 162 evaluations_df: pd.DataFrame, 163 metrics_df: pd.DataFrame, 164 assessments_df: pd.DataFrame, 165 tags_df: pd.DataFrame, 166 ) -> list[EvaluationEntity]: 167 """ 168 Converts four separate DataFrames (main evaluation data, metrics, assessments, and tags) back 169 into a list of Evaluation entities. 170 171 Args: 172 evaluations_df (pd.DataFrame): DataFrame with the main evaluation data 173 (excluding assessment and metrics). 174 metrics_df (pd.DataFrame): DataFrame with metrics. 175 assessments_df (pd.DataFrame): DataFrame with assessments. 176 tags_df (pd.DataFrame): DataFrame with tags. 177 178 Returns: 179 List[EvaluationEntity]: A list of Evaluation entities created from the DataFrames. 180 """ 181 # Group metrics and assessment by evaluation_id 182 metrics_by_eval = _group_dataframe_by_evaluation_id(metrics_df) 183 assessments_by_eval = _group_dataframe_by_evaluation_id(assessments_df) 184 tags_by_eval = _group_dataframe_by_evaluation_id(tags_df) 185 186 # Convert main DataFrame to list of dictionaries and create Evaluation objects 187 evaluations = [] 188 for eval_dict in evaluations_df.to_dict(orient="records"): 189 evaluation_id = eval_dict["evaluation_id"] 190 eval_dict["metrics"] = [ 191 { 192 "key": metric["key"], 193 "value": metric["value"], 194 "timestamp": metric["timestamp"], 195 # Evaluation metrics don't have steps, but we're reusing the MLflow Metric 196 # class to represent Evaluation metrics as entities in Python for now. Accordingly, 197 # we set the step to 0 in order to parse the evaluation metric as an MLflow Metric 198 # Python entity 199 "step": 0, 200 # Also discard the evaluation_id field from the evaluation metric, since this 201 # field is not part of the MLflow Metric Python entity 202 } 203 for metric in metrics_by_eval.get(evaluation_id, []) 204 ] 205 eval_dict["assessments"] = assessments_by_eval.get(evaluation_id, []) 206 eval_dict["tags"] = tags_by_eval.get(evaluation_id, []) 207 evaluations.append(EvaluationEntity.from_dictionary(eval_dict)) 208 209 return evaluations 210 211 212 def _group_dataframe_by_evaluation_id(df: pd.DataFrame): 213 """ 214 Groups evaluation dataframe rows by 'evaluation_id'. 215 216 Args: 217 df (pd.DataFrame): DataFrame to group. 218 219 Returns: 220 Dict[str, List]: A dictionary with 'evaluation_id' as keys and lists of entity 221 dictionaries as values. 222 """ 223 grouped = df.groupby("evaluation_id", group_keys=False).apply( 224 lambda x: x.to_dict(orient="records") 225 ) 226 return grouped.to_dict()