data_drift_utils.py
1 from typing import Callable 2 from typing import Dict 3 from typing import List 4 from typing import Optional 5 from typing import Tuple 6 7 import numpy as np 8 import pandas as pd 9 from sklearn.feature_extraction.text import TfidfVectorizer 10 from sklearn.linear_model import SGDClassifier 11 from sklearn.metrics import roc_auc_score 12 from sklearn.model_selection import train_test_split 13 from sklearn.pipeline import Pipeline 14 15 from evidently.legacy.base_metric import Metric 16 from evidently.legacy.calculations.stattests import PossibleStatTestType 17 18 19 def resolve_stattest_threshold( 20 feature_name: str, 21 feature_type: str, 22 stattest: Optional[PossibleStatTestType], 23 cat_stattest: Optional[PossibleStatTestType], 24 num_stattest: Optional[PossibleStatTestType], 25 text_stattest: Optional[PossibleStatTestType], 26 per_column_stattest: Optional[Dict[str, PossibleStatTestType]], 27 stattest_threshold: Optional[float], 28 cat_stattest_threshold: Optional[float], 29 num_stattest_threshold: Optional[float], 30 text_stattest_threshold: Optional[float], 31 per_column_stattest_threshold: Optional[Dict[str, float]], 32 ) -> Tuple[Optional[PossibleStatTestType], Optional[float]]: 33 return ( 34 _calculate_stattest( 35 feature_name, 36 feature_type, 37 stattest, 38 cat_stattest, 39 num_stattest, 40 text_stattest, 41 per_column_stattest, 42 ), 43 _calculate_threshold( 44 feature_name, 45 feature_type, 46 stattest_threshold, 47 cat_stattest_threshold, 48 num_stattest_threshold, 49 text_stattest_threshold, 50 per_column_stattest_threshold, 51 ), 52 ) 53 54 55 def _calculate_stattest( 56 feature_name: str, 57 feature_type: str, 58 stattest: Optional[PossibleStatTestType] = None, 59 cat_stattest: Optional[PossibleStatTestType] = None, 60 num_stattest: Optional[PossibleStatTestType] = None, 61 text_stattest: Optional[PossibleStatTestType] = None, 62 per_column_stattest: Optional[Dict[str, PossibleStatTestType]] = None, 63 ) -> Optional[PossibleStatTestType]: 64 func = None if stattest is None else stattest 65 if feature_type == "cat": 66 type_func = cat_stattest 67 elif feature_type == "num": 68 type_func = num_stattest 69 elif feature_type == "text": 70 type_func = text_stattest 71 else: 72 raise ValueError(f"Unexpected feature type {feature_type}.") 73 func = func if type_func is None else type_func 74 if per_column_stattest is None: 75 return func 76 return per_column_stattest.get(feature_name, func) 77 78 79 def _calculate_threshold( 80 feature_name: str, 81 feature_type: str, 82 stattest_threshold: Optional[float] = None, 83 cat_stattest_threshold: Optional[float] = None, 84 num_stattest_threshold: Optional[float] = None, 85 text_stattest_threshold: Optional[float] = None, 86 per_column_stattest_threshold: Optional[Dict[str, float]] = None, 87 ) -> Optional[float]: 88 if per_column_stattest_threshold is not None and feature_name in per_column_stattest_threshold.keys(): 89 return per_column_stattest_threshold.get(feature_name) 90 91 if cat_stattest_threshold is not None and feature_type == "cat": 92 return cat_stattest_threshold 93 94 if num_stattest_threshold is not None and feature_type == "num": 95 return num_stattest_threshold 96 97 if text_stattest_threshold is not None and feature_type == "text": 98 return text_stattest_threshold 99 100 if stattest_threshold is not None: 101 return stattest_threshold 102 return None 103 104 105 def roc_auc_domain_classifier(X_train, X_test, y_train, y_test) -> Tuple: 106 pipeline = Pipeline( 107 [ 108 ("vectorization", TfidfVectorizer(sublinear_tf=True, max_df=0.5, stop_words="english")), 109 ( 110 "classification", 111 SGDClassifier(alpha=0.0001, max_iter=50, penalty="l1", loss="modified_huber", random_state=42), 112 ), 113 ] 114 ) 115 116 pipeline.fit(X_train, y_train) 117 y_pred_proba = pipeline.predict_proba(X_test)[:, 1] 118 roc_auc = roc_auc_score(y_test, y_pred_proba) 119 return roc_auc, y_pred_proba, pipeline 120 121 122 def roc_auc_random_classifier_percentile(y_test: np.ndarray, p_value=0.05, iter_num=1000, seed=42) -> float: 123 def calc_roc_auc_random(y_test, seed=None): 124 np.random.seed(seed) 125 y_random_pred = np.random.rand(len(y_test)) 126 roc_auc_random = roc_auc_score(y_test, y_random_pred) 127 return roc_auc_random 128 129 np.random.seed(seed) 130 seeds = np.random.randint(0, iter_num * 10, size=iter_num) 131 roc_auc_values = [calc_roc_auc_random(y_test, seed=seed) for seed in seeds] 132 return np.percentile(roc_auc_values, 100 * (1 - p_value)) 133 134 135 def calculate_text_drift_score( 136 reference_data: pd.Series, 137 current_data: pd.Series, 138 bootstrap: bool, 139 p_value=0.05, 140 threshold=0.55, 141 ) -> Tuple[float, bool]: 142 domain_data = pd.concat( 143 [ 144 pd.DataFrame({"text": reference_data, "target": 0}), 145 pd.DataFrame({"text": current_data, "target": 1}), 146 ] 147 ) 148 149 X_train, X_test, y_train, y_test = train_test_split( 150 domain_data["text"], 151 domain_data["target"], 152 test_size=0.5, 153 random_state=42, 154 shuffle=True, 155 ) 156 157 domain_classifier_roc_auc, _, _ = roc_auc_domain_classifier( 158 X_train, 159 X_test, 160 y_train, 161 y_test, 162 ) 163 if not bootstrap: 164 return domain_classifier_roc_auc, domain_classifier_roc_auc > threshold 165 random_classifier_percentile = roc_auc_random_classifier_percentile(y_test, p_value=p_value) 166 return domain_classifier_roc_auc, domain_classifier_roc_auc > random_classifier_percentile 167 168 169 def get_typical_examples(X_test, y_test, y_pred_proba, examples_num=10) -> Tuple[List[str], List[str]]: 170 typical_examples = pd.DataFrame({"text": X_test, "label": y_test, "predict_proba": y_pred_proba}) 171 172 typical_current = typical_examples[typical_examples["label"] == 1] 173 typical_current.sort_values("predict_proba", ascending=False, inplace=True) 174 typical_current = typical_current.head(examples_num) 175 176 typical_reference = typical_examples[typical_examples["label"] == 0] 177 typical_reference.sort_values("predict_proba", ascending=True, inplace=True) 178 typical_reference = typical_reference.head(examples_num) 179 return list(typical_current["text"]), list(typical_reference["text"]) 180 181 182 def get_typical_words(pipeline, words_num=25) -> Tuple[List[str], List[str]]: 183 weights_df = pd.DataFrame({"weight": pipeline["classification"].coef_[0]}) 184 weights_df.reset_index(inplace=True) 185 weights_df.rename(columns={"index": "feature_ind"}, inplace=True) 186 weights_df = weights_df[weights_df["weight"] != 0] 187 188 # build inverted index for vocabulary 189 inverted_vocabulary = {value: key for key, value in pipeline["vectorization"].vocabulary_.items()} 190 words_typical_current = weights_df[weights_df["weight"] > 0] 191 words_typical_current.sort_values("weight", ascending=False, inplace=True) 192 words_typical_current = words_typical_current.head(words_num) 193 words_typical_current["word"] = words_typical_current["feature_ind"].apply(lambda x: inverted_vocabulary[x]) 194 195 words_typical_reference = weights_df[weights_df["weight"] < 0] 196 words_typical_reference.sort_values("weight", ascending=True, inplace=True) 197 words_typical_reference = words_typical_reference.head(words_num) 198 words_typical_reference["word"] = words_typical_reference["feature_ind"].apply(lambda x: inverted_vocabulary[x]) 199 200 return list(words_typical_current["word"]), list(words_typical_reference["word"]) 201 202 203 def get_text_data_for_plots(reference_data: pd.Series, current_data: pd.Series): 204 domain_data = pd.concat( 205 [ 206 pd.DataFrame({"text": reference_data, "target": 0}), 207 pd.DataFrame({"text": current_data, "target": 1}), 208 ] 209 ) 210 211 X_train, X_test, y_train, y_test = train_test_split( 212 domain_data["text"], 213 domain_data["target"], 214 test_size=0.5, 215 random_state=42, 216 shuffle=True, 217 ) 218 219 _, y_pred_proba, classifier_pipeline = roc_auc_domain_classifier( 220 X_train, 221 X_test, 222 y_train, 223 y_test, 224 ) 225 # get examples more characteristic of current or reference dataset 226 typical_examples_cur, typical_examples_ref = get_typical_examples(X_test, y_test, y_pred_proba) 227 228 # get words more characteristic of current or reference dataset 229 typical_words_cur, typical_words_ref = get_typical_words(classifier_pipeline) 230 231 return typical_examples_cur, typical_examples_ref, typical_words_cur, typical_words_ref 232 233 234 def add_emb_drift_to_reports( 235 embeddings_data, 236 embeddings, 237 embeddings_drift_method, 238 entity, 239 ) -> List[Metric]: 240 result: List[Metric] = [] 241 sets = list(embeddings_data.keys()) 242 if embeddings is not None: 243 sets = np.intersect1d(sets, embeddings) 244 if len(sets) == 0: 245 return result 246 f: Optional[Callable] 247 for emb_set in sets: 248 if embeddings_drift_method is not None: 249 f = embeddings_drift_method.get(emb_set) 250 else: 251 f = None 252 result.append(entity(embeddings_name=emb_set, drift_method=f)) 253 return result