regression_performance.py
1 from dataclasses import dataclass 2 from typing import Dict 3 from typing import Optional 4 5 import numpy as np 6 import pandas as pd 7 from scipy.stats import probplot 8 9 from evidently.legacy.metric_results import DatasetColumns 10 11 12 class ErrorWithQuantiles: 13 def __init__(self, error, quantile_top, quantile_other): 14 self.error = error 15 self.quantile_top = quantile_top 16 self.quantile_other = quantile_other 17 18 19 @dataclass 20 class FeatureBias: 21 feature_type: str 22 majority: float 23 under: float 24 over: float 25 range: float 26 27 def as_dict(self, prefix): 28 return { 29 prefix + "majority": self.majority, 30 prefix + "under": self.under, 31 prefix + "over": self.over, 32 prefix + "range": self.range, 33 } 34 35 36 def _calculate_error_normality(error: ErrorWithQuantiles): 37 qq_lines = probplot(error.error, dist="norm", plot=None) 38 qq_dots = [t.tolist() for t in qq_lines[0]] 39 qq_line = list(qq_lines[1]) 40 return { 41 "order_statistic_medians_x": [float(x) for x in qq_dots[0]], 42 "order_statistic_medians_y": [float(x) for x in qq_dots[1]], 43 "slope": float(qq_line[0]), 44 "intercept": float(qq_line[1]), 45 "r": float(qq_line[2]), 46 } 47 48 49 def _calculate_quality_metrics( 50 dataset, 51 prediction_column, 52 target_column, 53 conf_interval_n_sigmas=1, 54 mape_zero_handling: str = "none", 55 mape_replace_value: float = 1.0, 56 mape_epsilon: Optional[float] = None, 57 ): 58 me = np.mean(dataset[prediction_column] - dataset[target_column]) 59 sde = np.std(dataset[prediction_column] - dataset[target_column], ddof=1) 60 61 abs_err = np.abs(dataset[prediction_column] - dataset[target_column]) 62 abs_error_max = abs_err.max() 63 mae = np.mean(abs_err) 64 sdae = np.std(abs_err, ddof=1) 65 66 data = dataset[[prediction_column, target_column]] 67 68 if mape_epsilon is None: 69 epsilon = np.finfo(np.float64).eps 70 else: 71 epsilon = mape_epsilon 72 73 epsilon_values = data[~(abs(data[target_column]) > epsilon)] 74 if mape_zero_handling == "drop": 75 data.drop(epsilon_values.index, inplace=True) 76 77 abs_perc_err = np.abs(data[prediction_column] - data[target_column]) / np.maximum(data[target_column], epsilon) 78 if mape_zero_handling == "replace" and epsilon_values.size > 0: 79 abs_perc_err[epsilon_values.index] = mape_replace_value 80 mape = 100.0 * np.mean(abs_perc_err) 81 sdape = np.std(abs_perc_err, ddof=1) 82 83 return { 84 "mean_error": float(me), 85 "mean_abs_error": float(mae), 86 "mean_abs_perc_error": float(mape), 87 "abs_error_max": abs_error_max, 88 "error_std": conf_interval_n_sigmas * float(sde), 89 "abs_error_std": conf_interval_n_sigmas * float(sdae), 90 "abs_perc_error_std": conf_interval_n_sigmas * float(sdape), 91 "near_zero_values": epsilon_values.size, 92 } 93 94 95 def _prepare_dataset(dataset, target_column, prediction_column): 96 dataset.replace([np.inf, -np.inf], np.nan, inplace=True) 97 dataset.dropna(axis=0, how="any", inplace=True, subset=[target_column, prediction_column]) 98 99 100 def _calculate_underperformance(err_quantiles: ErrorWithQuantiles, conf_interval_n_sigmas: int = 1): 101 error = err_quantiles.error 102 quantile_top = err_quantiles.quantile_top 103 quantile_other = err_quantiles.quantile_other 104 mae_under = np.mean(error[error <= quantile_top]) 105 mae_exp = np.mean(error[(error > quantile_top) & (error < quantile_other)]) 106 mae_over = np.mean(error[error >= quantile_other]) 107 sd_under = np.std(error[error <= quantile_top], ddof=1) 108 sd_exp = np.std(error[(error > quantile_top) & (error < quantile_other)], ddof=1) 109 sd_over = np.std(error[error >= quantile_other], ddof=1) 110 111 return { 112 "majority": { 113 "mean_error": float(mae_exp), 114 "std_error": conf_interval_n_sigmas * float(sd_exp), 115 }, 116 "underestimation": { 117 "mean_error": float(mae_under), 118 "std_error": conf_interval_n_sigmas * float(sd_under), 119 }, 120 "overestimation": { 121 "mean_error": float(mae_over), 122 "std_error": conf_interval_n_sigmas * float(sd_over), 123 }, 124 } 125 126 127 def error_bias_table(dataset, err_quantiles, num_feature_names, cat_feature_names) -> Dict[str, FeatureBias]: 128 num_bias = { 129 feature_name: _error_num_feature_bias(dataset, feature_name, err_quantiles) 130 for feature_name in num_feature_names 131 } 132 cat_bias = { 133 feature_name: _error_cat_feature_bias(dataset, feature_name, err_quantiles) 134 for feature_name in cat_feature_names 135 } 136 error_bias = num_bias.copy() 137 error_bias.update(cat_bias) 138 return error_bias 139 140 141 def _error_num_feature_bias(dataset, feature_name, err_quantiles: ErrorWithQuantiles) -> FeatureBias: 142 error = err_quantiles.error 143 quantile_top = err_quantiles.quantile_top 144 quantile_other = err_quantiles.quantile_other 145 ref_overal_value = np.mean(dataset[feature_name]) 146 ref_under_value = np.mean(dataset[error <= quantile_top][feature_name]) 147 148 ref_over_value = np.mean(dataset[error >= quantile_other][feature_name]) 149 if ref_over_value == ref_under_value: 150 ref_range_value = 0 151 152 else: 153 ref_range_value = ( 154 100 155 * abs(ref_over_value - ref_under_value) 156 / (np.max(dataset[feature_name]) - np.min(dataset[feature_name])) 157 ) 158 159 return FeatureBias( 160 feature_type="num", 161 majority=float(ref_overal_value), 162 under=float(ref_under_value), 163 over=float(ref_over_value), 164 range=float(ref_range_value), 165 ) 166 167 168 def _stable_value_counts(series: pd.Series): 169 return series.value_counts().reindex(pd.unique(series.to_numpy())) 170 171 172 def _idmax_possibly_empty_column(series: pd.Series): 173 value_count = _stable_value_counts(series) 174 if all(pd.isna(value_count)): 175 return None 176 else: 177 value = value_count.idxmax() 178 if pd.isnull(value): 179 return None 180 return value 181 182 183 def _error_cat_feature_bias(dataset, feature_name, err_quantiles: ErrorWithQuantiles) -> FeatureBias: 184 error = err_quantiles.error 185 quantile_top = err_quantiles.quantile_top 186 quantile_other = err_quantiles.quantile_other 187 ref_overall_value = _idmax_possibly_empty_column(dataset[feature_name]) 188 ref_under_value = _idmax_possibly_empty_column(dataset[error <= quantile_top][feature_name]) 189 ref_over_value = _idmax_possibly_empty_column(dataset[error >= quantile_other][feature_name]) 190 if ( 191 (ref_overall_value is None and ref_under_value is None and ref_over_value is None) 192 or (ref_overall_value != ref_under_value) 193 or (ref_over_value != ref_overall_value) 194 or (ref_under_value != ref_overall_value) 195 ): 196 ref_range_value = 1 197 else: 198 ref_range_value = 0 199 200 return FeatureBias( 201 feature_type="cat", 202 majority=ref_overall_value, 203 under=ref_under_value, 204 over=ref_over_value, 205 range=float(ref_range_value), 206 ) 207 208 209 def error_with_quantiles(dataset, prediction_column, target_column, quantile: float): 210 error = dataset[prediction_column] - dataset[target_column] 211 212 # underperformance metrics 213 quantile_top = np.quantile(error, quantile) 214 quantile_other = np.quantile(error, 1 - quantile) 215 return ErrorWithQuantiles(error, quantile_top, quantile_other) 216 217 218 @dataclass 219 class RegressionPerformanceMetrics: 220 mean_error: float 221 mean_abs_error: float 222 mean_abs_perc_error: float 223 error_std: float 224 abs_error_max: float 225 abs_error_std: float 226 abs_perc_error_std: float 227 error_normality: dict 228 underperformance: dict 229 error_bias: dict 230 near_zero_values: int 231 232 233 def calculate_regression_performance( 234 dataset: pd.DataFrame, 235 columns: DatasetColumns, 236 error_bias_prefix: str, 237 mape_zero_handling: str = "none", 238 mape_replace_value: float = 0.0, 239 mape_epsilon: Optional[float] = None, 240 ) -> RegressionPerformanceMetrics: 241 target_column = columns.utility_columns.target 242 prediction_column = columns.utility_columns.prediction 243 244 num_feature_names = columns.num_feature_names 245 cat_feature_names = columns.cat_feature_names 246 247 if target_column is None or prediction_column is None: 248 raise ValueError("Target and prediction should be present") 249 250 _prepare_dataset(dataset, target_column, prediction_column) 251 # calculate quality metrics 252 quality_metrics = _calculate_quality_metrics( 253 dataset, 254 prediction_column, 255 target_column, 256 mape_zero_handling=mape_zero_handling, 257 mape_replace_value=mape_replace_value, 258 mape_epsilon=mape_epsilon, 259 ) 260 # error normality 261 err_quantiles = error_with_quantiles(dataset, prediction_column, target_column, quantile=0.05) 262 quality_metrics["error_normality"] = _calculate_error_normality(err_quantiles) 263 # underperformance metrics 264 quality_metrics["underperformance"] = _calculate_underperformance(err_quantiles) 265 quality_metrics["error_bias"] = {} 266 feature_bias = error_bias_table(dataset, err_quantiles, num_feature_names, cat_feature_names) 267 # convert to old format 268 quality_metrics["error_bias"] = { 269 feature: dict(feature_type=bias.feature_type, **bias.as_dict(error_bias_prefix)) 270 for feature, bias in feature_bias.items() 271 } 272 return RegressionPerformanceMetrics(**quality_metrics)