/ src / evidently / legacy / calculations / regression_performance.py
regression_performance.py
  1  from dataclasses import dataclass
  2  from typing import Dict
  3  from typing import Optional
  4  
  5  import numpy as np
  6  import pandas as pd
  7  from scipy.stats import probplot
  8  
  9  from evidently.legacy.metric_results import DatasetColumns
 10  
 11  
 12  class ErrorWithQuantiles:
 13      def __init__(self, error, quantile_top, quantile_other):
 14          self.error = error
 15          self.quantile_top = quantile_top
 16          self.quantile_other = quantile_other
 17  
 18  
 19  @dataclass
 20  class FeatureBias:
 21      feature_type: str
 22      majority: float
 23      under: float
 24      over: float
 25      range: float
 26  
 27      def as_dict(self, prefix):
 28          return {
 29              prefix + "majority": self.majority,
 30              prefix + "under": self.under,
 31              prefix + "over": self.over,
 32              prefix + "range": self.range,
 33          }
 34  
 35  
 36  def _calculate_error_normality(error: ErrorWithQuantiles):
 37      qq_lines = probplot(error.error, dist="norm", plot=None)
 38      qq_dots = [t.tolist() for t in qq_lines[0]]
 39      qq_line = list(qq_lines[1])
 40      return {
 41          "order_statistic_medians_x": [float(x) for x in qq_dots[0]],
 42          "order_statistic_medians_y": [float(x) for x in qq_dots[1]],
 43          "slope": float(qq_line[0]),
 44          "intercept": float(qq_line[1]),
 45          "r": float(qq_line[2]),
 46      }
 47  
 48  
 49  def _calculate_quality_metrics(
 50      dataset,
 51      prediction_column,
 52      target_column,
 53      conf_interval_n_sigmas=1,
 54      mape_zero_handling: str = "none",
 55      mape_replace_value: float = 1.0,
 56      mape_epsilon: Optional[float] = None,
 57  ):
 58      me = np.mean(dataset[prediction_column] - dataset[target_column])
 59      sde = np.std(dataset[prediction_column] - dataset[target_column], ddof=1)
 60  
 61      abs_err = np.abs(dataset[prediction_column] - dataset[target_column])
 62      abs_error_max = abs_err.max()
 63      mae = np.mean(abs_err)
 64      sdae = np.std(abs_err, ddof=1)
 65  
 66      data = dataset[[prediction_column, target_column]]
 67  
 68      if mape_epsilon is None:
 69          epsilon = np.finfo(np.float64).eps
 70      else:
 71          epsilon = mape_epsilon
 72  
 73      epsilon_values = data[~(abs(data[target_column]) > epsilon)]
 74      if mape_zero_handling == "drop":
 75          data.drop(epsilon_values.index, inplace=True)
 76  
 77      abs_perc_err = np.abs(data[prediction_column] - data[target_column]) / np.maximum(data[target_column], epsilon)
 78      if mape_zero_handling == "replace" and epsilon_values.size > 0:
 79          abs_perc_err[epsilon_values.index] = mape_replace_value
 80      mape = 100.0 * np.mean(abs_perc_err)
 81      sdape = np.std(abs_perc_err, ddof=1)
 82  
 83      return {
 84          "mean_error": float(me),
 85          "mean_abs_error": float(mae),
 86          "mean_abs_perc_error": float(mape),
 87          "abs_error_max": abs_error_max,
 88          "error_std": conf_interval_n_sigmas * float(sde),
 89          "abs_error_std": conf_interval_n_sigmas * float(sdae),
 90          "abs_perc_error_std": conf_interval_n_sigmas * float(sdape),
 91          "near_zero_values": epsilon_values.size,
 92      }
 93  
 94  
 95  def _prepare_dataset(dataset, target_column, prediction_column):
 96      dataset.replace([np.inf, -np.inf], np.nan, inplace=True)
 97      dataset.dropna(axis=0, how="any", inplace=True, subset=[target_column, prediction_column])
 98  
 99  
100  def _calculate_underperformance(err_quantiles: ErrorWithQuantiles, conf_interval_n_sigmas: int = 1):
101      error = err_quantiles.error
102      quantile_top = err_quantiles.quantile_top
103      quantile_other = err_quantiles.quantile_other
104      mae_under = np.mean(error[error <= quantile_top])
105      mae_exp = np.mean(error[(error > quantile_top) & (error < quantile_other)])
106      mae_over = np.mean(error[error >= quantile_other])
107      sd_under = np.std(error[error <= quantile_top], ddof=1)
108      sd_exp = np.std(error[(error > quantile_top) & (error < quantile_other)], ddof=1)
109      sd_over = np.std(error[error >= quantile_other], ddof=1)
110  
111      return {
112          "majority": {
113              "mean_error": float(mae_exp),
114              "std_error": conf_interval_n_sigmas * float(sd_exp),
115          },
116          "underestimation": {
117              "mean_error": float(mae_under),
118              "std_error": conf_interval_n_sigmas * float(sd_under),
119          },
120          "overestimation": {
121              "mean_error": float(mae_over),
122              "std_error": conf_interval_n_sigmas * float(sd_over),
123          },
124      }
125  
126  
127  def error_bias_table(dataset, err_quantiles, num_feature_names, cat_feature_names) -> Dict[str, FeatureBias]:
128      num_bias = {
129          feature_name: _error_num_feature_bias(dataset, feature_name, err_quantiles)
130          for feature_name in num_feature_names
131      }
132      cat_bias = {
133          feature_name: _error_cat_feature_bias(dataset, feature_name, err_quantiles)
134          for feature_name in cat_feature_names
135      }
136      error_bias = num_bias.copy()
137      error_bias.update(cat_bias)
138      return error_bias
139  
140  
141  def _error_num_feature_bias(dataset, feature_name, err_quantiles: ErrorWithQuantiles) -> FeatureBias:
142      error = err_quantiles.error
143      quantile_top = err_quantiles.quantile_top
144      quantile_other = err_quantiles.quantile_other
145      ref_overal_value = np.mean(dataset[feature_name])
146      ref_under_value = np.mean(dataset[error <= quantile_top][feature_name])
147  
148      ref_over_value = np.mean(dataset[error >= quantile_other][feature_name])
149      if ref_over_value == ref_under_value:
150          ref_range_value = 0
151  
152      else:
153          ref_range_value = (
154              100
155              * abs(ref_over_value - ref_under_value)
156              / (np.max(dataset[feature_name]) - np.min(dataset[feature_name]))
157          )
158  
159      return FeatureBias(
160          feature_type="num",
161          majority=float(ref_overal_value),
162          under=float(ref_under_value),
163          over=float(ref_over_value),
164          range=float(ref_range_value),
165      )
166  
167  
168  def _stable_value_counts(series: pd.Series):
169      return series.value_counts().reindex(pd.unique(series.to_numpy()))
170  
171  
172  def _idmax_possibly_empty_column(series: pd.Series):
173      value_count = _stable_value_counts(series)
174      if all(pd.isna(value_count)):
175          return None
176      else:
177          value = value_count.idxmax()
178          if pd.isnull(value):
179              return None
180          return value
181  
182  
183  def _error_cat_feature_bias(dataset, feature_name, err_quantiles: ErrorWithQuantiles) -> FeatureBias:
184      error = err_quantiles.error
185      quantile_top = err_quantiles.quantile_top
186      quantile_other = err_quantiles.quantile_other
187      ref_overall_value = _idmax_possibly_empty_column(dataset[feature_name])
188      ref_under_value = _idmax_possibly_empty_column(dataset[error <= quantile_top][feature_name])
189      ref_over_value = _idmax_possibly_empty_column(dataset[error >= quantile_other][feature_name])
190      if (
191          (ref_overall_value is None and ref_under_value is None and ref_over_value is None)
192          or (ref_overall_value != ref_under_value)
193          or (ref_over_value != ref_overall_value)
194          or (ref_under_value != ref_overall_value)
195      ):
196          ref_range_value = 1
197      else:
198          ref_range_value = 0
199  
200      return FeatureBias(
201          feature_type="cat",
202          majority=ref_overall_value,
203          under=ref_under_value,
204          over=ref_over_value,
205          range=float(ref_range_value),
206      )
207  
208  
209  def error_with_quantiles(dataset, prediction_column, target_column, quantile: float):
210      error = dataset[prediction_column] - dataset[target_column]
211  
212      # underperformance metrics
213      quantile_top = np.quantile(error, quantile)
214      quantile_other = np.quantile(error, 1 - quantile)
215      return ErrorWithQuantiles(error, quantile_top, quantile_other)
216  
217  
218  @dataclass
219  class RegressionPerformanceMetrics:
220      mean_error: float
221      mean_abs_error: float
222      mean_abs_perc_error: float
223      error_std: float
224      abs_error_max: float
225      abs_error_std: float
226      abs_perc_error_std: float
227      error_normality: dict
228      underperformance: dict
229      error_bias: dict
230      near_zero_values: int
231  
232  
233  def calculate_regression_performance(
234      dataset: pd.DataFrame,
235      columns: DatasetColumns,
236      error_bias_prefix: str,
237      mape_zero_handling: str = "none",
238      mape_replace_value: float = 0.0,
239      mape_epsilon: Optional[float] = None,
240  ) -> RegressionPerformanceMetrics:
241      target_column = columns.utility_columns.target
242      prediction_column = columns.utility_columns.prediction
243  
244      num_feature_names = columns.num_feature_names
245      cat_feature_names = columns.cat_feature_names
246  
247      if target_column is None or prediction_column is None:
248          raise ValueError("Target and prediction should be present")
249  
250      _prepare_dataset(dataset, target_column, prediction_column)
251      # calculate quality metrics
252      quality_metrics = _calculate_quality_metrics(
253          dataset,
254          prediction_column,
255          target_column,
256          mape_zero_handling=mape_zero_handling,
257          mape_replace_value=mape_replace_value,
258          mape_epsilon=mape_epsilon,
259      )
260      # error normality
261      err_quantiles = error_with_quantiles(dataset, prediction_column, target_column, quantile=0.05)
262      quality_metrics["error_normality"] = _calculate_error_normality(err_quantiles)
263      # underperformance metrics
264      quality_metrics["underperformance"] = _calculate_underperformance(err_quantiles)
265      quality_metrics["error_bias"] = {}
266      feature_bias = error_bias_table(dataset, err_quantiles, num_feature_names, cat_feature_names)
267      # convert to old format
268      quality_metrics["error_bias"] = {
269          feature: dict(feature_type=bias.feature_type, **bias.as_dict(error_bias_prefix))
270          for feature, bias in feature_bias.items()
271      }
272      return RegressionPerformanceMetrics(**quality_metrics)