embedding_drift_methods.py
1 import abc 2 from typing import Callable 3 from typing import Optional 4 from typing import Tuple 5 6 import numpy as np 7 import pandas as pd 8 from scipy.spatial.distance import chebyshev 9 from scipy.spatial.distance import cityblock 10 from scipy.spatial.distance import cosine 11 from scipy.spatial.distance import euclidean 12 from sklearn.decomposition import PCA 13 from sklearn.linear_model import SGDClassifier 14 from sklearn.metrics import pairwise_distances 15 from sklearn.metrics import pairwise_kernels 16 from sklearn.metrics import roc_auc_score 17 from sklearn.model_selection import train_test_split 18 19 from evidently.legacy.calculations.stattests import get_stattest 20 from evidently.legacy.core import ColumnType 21 from evidently.pydantic_utils import EvidentlyBaseModel 22 from evidently.pydantic_utils import autoregister 23 24 DISTANCE_DICT = { 25 "euclidean": euclidean, 26 "cosine": cosine, 27 "cityblock": cityblock, 28 "chebyshev": chebyshev, 29 } 30 N_BOOTSTRAP = 100 31 32 33 def get_pca_df( 34 reference_emb: pd.DataFrame, current_emb: pd.DataFrame, n_comp: int 35 ) -> Tuple[pd.DataFrame, pd.DataFrame]: 36 """Takes two dataframes and reduces their dimensionality using the PCA method 37 Args: 38 reference_emb: reference embeddings data 39 current_emb: current embeddings data 40 n_comp: number of components to keep 41 Returns: 42 reference_emb_new: transformed reference_emb 43 current_emb_new: transformed current_emb 44 """ 45 pca = PCA(n_components=n_comp, random_state=0) 46 return pd.DataFrame(pca.fit_transform(reference_emb)), pd.DataFrame(pca.transform(current_emb)) 47 48 49 class DriftMethod(EvidentlyBaseModel): 50 class Config: 51 is_base_type = True 52 53 @abc.abstractmethod 54 def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]: 55 raise NotImplementedError 56 57 58 @autoregister 59 class DistanceDriftMethod(DriftMethod): 60 class Config: 61 type_alias = "evidently:drift_method:DistanceDriftMethod" 62 63 dist: str = "euclidean" 64 threshold: float = 0.2 65 bootstrap: Optional[bool] = None 66 quantile_probability: float = 0.95 67 pca_components: Optional[int] = None 68 69 def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]: 70 if self.pca_components: 71 reference_emb, current_emb = get_pca_df(reference_emb, current_emb, self.pca_components) 72 res = DISTANCE_DICT[self.dist](reference_emb.mean(axis=0), current_emb.mean(axis=0)) 73 if self.bootstrap: 74 bstrp_res = [] 75 b_ref_size = int(reference_emb.shape[0] ** 2 / (reference_emb.shape[0] + current_emb.shape[0])) 76 b_curr_size = int( 77 current_emb.shape[0] * reference_emb.shape[0] / (reference_emb.shape[0] + current_emb.shape[0]) 78 ) 79 for i in range(N_BOOTSTRAP): 80 np.random.seed(i) 81 b_ref_idx = np.random.choice(reference_emb.shape[0], b_ref_size) 82 b_curr_idx = np.random.choice(reference_emb.shape[0], b_curr_size) 83 bstrp_res.append( 84 DISTANCE_DICT[self.dist]( 85 reference_emb.iloc[b_ref_idx, :].mean(axis=0), reference_emb.iloc[b_curr_idx, :].mean(axis=0) 86 ) 87 ) 88 perc = np.percentile(bstrp_res, 100 * self.quantile_probability) 89 return res, res > perc, "distance" 90 return res, res > self.threshold, "distance" 91 92 93 def distance( 94 dist: str = "euclidean", 95 threshold: float = 0.2, 96 bootstrap: Optional[bool] = None, 97 quantile_probability: float = 0.95, 98 pca_components: Optional[int] = None, 99 ) -> Callable: 100 """Returns a function for calculating drift on embeddings using the average distance method with specified parameters 101 Args: 102 dist: "euclidean", "cosine", "cityblock" or "chebyshev" 103 threshold: all values above this threshold means data drift. Applies when bootstrap != True 104 bootstrap: boolean parameter to determine whether to apply statistical hypothesis testing 105 quantile_probability: applies when bootstrap == True 106 pca_components: number of components to keep 107 Returns: 108 func: a function for calculating drift, which takes in reference and current embeddings data 109 and returns a tuple: drift score, whether there is drift, and the name of the drift calculation method. 110 """ 111 112 return DistanceDriftMethod( 113 dist=dist, 114 threshold=threshold, 115 bootstrap=bootstrap, 116 quantile_probability=quantile_probability, 117 pca_components=pca_components, 118 ) 119 120 121 def calc_roc_auc_random(y_test, i): 122 np.random.seed(i) 123 y_random_pred = np.random.rand( 124 len(y_test), 125 ) 126 roc_auc_random = roc_auc_score(y_test, y_random_pred) 127 return roc_auc_random 128 129 130 @autoregister 131 class ModelDriftMethod(DriftMethod): 132 class Config: 133 type_alias = "evidently:drift_method:ModelDriftMethod" 134 135 threshold: float = 0.55 136 bootstrap: Optional[bool] = None 137 quantile_probability: float = 0.95 138 pca_components: Optional[int] = None 139 140 def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]: 141 if self.pca_components: 142 reference_emb, current_emb = get_pca_df(reference_emb, current_emb, self.pca_components) 143 reference_emb["target"] = [1] * reference_emb.shape[0] 144 current_emb["target"] = [0] * current_emb.shape[0] 145 data = pd.concat((reference_emb, current_emb)) 146 147 X_train, X_test, y_train, y_test = train_test_split( 148 data.drop("target", axis=1), data["target"], test_size=0.5, random_state=42, shuffle=True 149 ) 150 clf = SGDClassifier(loss="log_loss", random_state=42) 151 clf.fit(X_train, y_train) 152 y_pred_proba = clf.predict_proba(X_test)[:, 1] 153 roc_auc = roc_auc_score(y_test, y_pred_proba) 154 if self.bootstrap: 155 roc_auc_values = [calc_roc_auc_random(y_test, i) for i in range(100)] 156 rand_roc_auc = np.percentile(roc_auc_values, 100 * self.quantile_probability) 157 return roc_auc, roc_auc > rand_roc_auc, "model" 158 return roc_auc, roc_auc > self.threshold, "model" 159 160 161 def model( 162 threshold: float = 0.55, 163 bootstrap: Optional[bool] = None, 164 quantile_probability: float = 0.95, 165 pca_components: Optional[int] = None, 166 ) -> DriftMethod: 167 """Returns a function for calculating drift on embeddings using the classifier method with specified parameters 168 Args: 169 threshold: all values above this threshold means data drift. Applies when bootstrap != True 170 bootstrap: boolean parameter to determine whether to apply statistical hypothesis testing 171 quantile_probability: applies when bootstrap == True 172 pca_components: number of components to keep 173 Returns: 174 func: a function for calculating drift, which takes in reference and current embeddings data 175 and returns a tuple: drift score, whether there is drift, and the name of the drift calculation method. 176 """ 177 return ModelDriftMethod( 178 threshold=threshold, 179 bootstrap=bootstrap, 180 quantile_probability=quantile_probability, 181 pca_components=pca_components, 182 ) 183 184 185 @autoregister 186 class RatioDriftMethod(DriftMethod): 187 class Config: 188 type_alias = "evidently:drift_method:RatioDriftMethod" 189 190 component_stattest: str = "wasserstein" 191 component_stattest_threshold: float = 0.1 192 threshold: float = 0.2 193 pca_components: Optional[int] = None 194 195 def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]: 196 if self.pca_components: 197 reference_emb, current_emb = get_pca_df(reference_emb, current_emb, self.pca_components) 198 stattest_func = get_stattest( 199 reference_emb.iloc[:, 0], current_emb.iloc[:, 0], ColumnType.Numerical, self.component_stattest 200 ) 201 n_drifted = 0 202 for i in range(reference_emb.shape[1]): 203 drift_result = stattest_func( 204 reference_emb.iloc[:, i], 205 current_emb.iloc[:, i], 206 feature_type=ColumnType.Numerical, 207 threshold=self.component_stattest_threshold, 208 ) 209 if drift_result.drifted: 210 n_drifted += 1 211 return n_drifted / reference_emb.shape[1], n_drifted / reference_emb.shape[1] > self.threshold, "ratio" 212 213 214 def ratio( 215 component_stattest: str = "wasserstein", 216 component_stattest_threshold: float = 0.1, 217 threshold: float = 0.2, 218 pca_components: Optional[int] = None, 219 ) -> DriftMethod: 220 """Returns a function for calculating drift on embeddings using the ratio of drifted embeddings method 221 with specified parameters 222 Args: 223 component_stattest: method for testing drift in a single embedding. Any drift detection method 224 for a numerical feature implemented in evidently 225 component_stattest_threshold: threshold for testing drift in a single embedding 226 threshold: all values above this threshold means data drift 227 pca_components: number of components to keep 228 Returns: 229 func: a function for calculating drift, which takes in reference and current embeddings data 230 and returns a tuple: drift score, whether there is drift, and the name of the drift calculation method. 231 """ 232 return RatioDriftMethod( 233 component_stattest=component_stattest, 234 component_stattest_threshold=component_stattest_threshold, 235 threshold=threshold, 236 pca_components=pca_components, 237 ) 238 239 240 def MMD2u(K, m, n): 241 """The MMD^2_u unbiased statistic.""" 242 Kx = K[:m, :m] 243 Ky = K[m:, m:] 244 Kxy = K[:m, m:] 245 return ( 246 1.0 / (m * (m - 1.0)) * (Kx.sum() - Kx.diagonal().sum()) 247 + 1.0 / (n * (n - 1.0)) * (Ky.sum() - Ky.diagonal().sum()) 248 - 2.0 / (m * n) * Kxy.sum() 249 ) 250 251 252 def MMD2u_bstrp(K, m, n, x_idx, y_idx): 253 """The MMD^2_u unbiased statistic for bootstrap subsample.""" 254 Kx = K[[[idx] for idx in x_idx], x_idx] 255 Ky = K[[[idx] for idx in y_idx], y_idx] 256 Kxy = K[[[idx] for idx in x_idx], y_idx] 257 return ( 258 1.0 / (m * (m - 1.0)) * (Kx.sum() - Kx.diagonal().sum()) 259 + 1.0 / (n * (n - 1.0)) * (Ky.sum() - Ky.diagonal().sum()) 260 - 2.0 / (m * n) * Kxy.sum() 261 ) 262 263 264 @autoregister 265 class MMDDriftMethod(DriftMethod): 266 class Config: 267 type_alias = "evidently:drift_method:MMDDriftMethod" 268 269 threshold: float = 0.015 270 bootstrap: Optional[bool] = None 271 quantile_probability: float = 0.05 272 pca_components: Optional[int] = None 273 274 def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]: 275 if self.pca_components: 276 reference_emb, current_emb = get_pca_df(reference_emb, current_emb, self.pca_components) 277 x = reference_emb 278 y = current_emb 279 m = len(x) 280 n = len(y) 281 282 pair_dists = pairwise_distances( 283 x.sample(min(m, 1000), random_state=0), 284 y.sample(min(n, 1000), random_state=0), 285 metric="euclidean", 286 n_jobs=-1, 287 ) 288 sigma2 = np.median(pair_dists) ** 2 289 xy = np.vstack([x, y]) 290 K = pairwise_kernels(xy, metric="rbf", gamma=1.0 / sigma2) 291 mmd2u = MMD2u(K, m, n) 292 if self.bootstrap: 293 pair_dists_bstrp = pairwise_distances(x.sample(min(m, 1000), random_state=0), metric="euclidean", n_jobs=-1) 294 sigma2_x = np.median(pair_dists_bstrp) ** 2 295 gamma_x = 1.0 / sigma2_x 296 K = pairwise_kernels(x, metric="rbf", gamma=gamma_x) 297 x_size = max(int(m * m / (m + n)), 1) 298 y_size = max(int(m * n / (m + n)), 1) 299 bstrp_res = [] 300 for i in range(N_BOOTSTRAP): 301 np.random.seed(i) 302 x_idxs = np.random.choice(m, x_size) 303 y_idxs = np.random.choice(m, y_size) 304 bstrp_res.append(MMD2u_bstrp(K, x_size, y_size, x_idxs, y_idxs)) 305 perc = np.percentile(bstrp_res, 100 * self.quantile_probability) 306 return max(mmd2u, 0), mmd2u > perc, "mmd" 307 else: 308 return max(mmd2u, 0), mmd2u > self.threshold, "mmd" 309 310 311 def mmd( 312 threshold: float = 0.015, 313 bootstrap: Optional[bool] = None, 314 quantile_probability: float = 0.05, 315 pca_components: Optional[int] = None, 316 ) -> DriftMethod: 317 """Returns a function for calculating drift on embeddings using the mmd method with specified parameters 318 Args: 319 threshold: all values above this threshold means data drift. Applies when bootstrap != True 320 bootstrap: boolean parameter to determine whether to apply statistical hypothesis testing 321 quantile_probability: applies when bootstrap == True 322 pca_components: number of components to keep 323 Returns: 324 func: a function for calculating drift, which takes in reference and current embeddings data 325 and returns a tuple: drift score, whether there is drift, and the name of the drift calculation method. 326 """ 327 return MMDDriftMethod( 328 threshold=threshold, 329 bootstrap=bootstrap, 330 quantile_probability=quantile_probability, 331 pca_components=pca_components, 332 )