/ src / evidently / legacy / utils / data_drift_utils.py
data_drift_utils.py
  1  from typing import Callable
  2  from typing import Dict
  3  from typing import List
  4  from typing import Optional
  5  from typing import Tuple
  6  
  7  import numpy as np
  8  import pandas as pd
  9  from sklearn.feature_extraction.text import TfidfVectorizer
 10  from sklearn.linear_model import SGDClassifier
 11  from sklearn.metrics import roc_auc_score
 12  from sklearn.model_selection import train_test_split
 13  from sklearn.pipeline import Pipeline
 14  
 15  from evidently.legacy.base_metric import Metric
 16  from evidently.legacy.calculations.stattests import PossibleStatTestType
 17  
 18  
 19  def resolve_stattest_threshold(
 20      feature_name: str,
 21      feature_type: str,
 22      stattest: Optional[PossibleStatTestType],
 23      cat_stattest: Optional[PossibleStatTestType],
 24      num_stattest: Optional[PossibleStatTestType],
 25      text_stattest: Optional[PossibleStatTestType],
 26      per_column_stattest: Optional[Dict[str, PossibleStatTestType]],
 27      stattest_threshold: Optional[float],
 28      cat_stattest_threshold: Optional[float],
 29      num_stattest_threshold: Optional[float],
 30      text_stattest_threshold: Optional[float],
 31      per_column_stattest_threshold: Optional[Dict[str, float]],
 32  ) -> Tuple[Optional[PossibleStatTestType], Optional[float]]:
 33      return (
 34          _calculate_stattest(
 35              feature_name,
 36              feature_type,
 37              stattest,
 38              cat_stattest,
 39              num_stattest,
 40              text_stattest,
 41              per_column_stattest,
 42          ),
 43          _calculate_threshold(
 44              feature_name,
 45              feature_type,
 46              stattest_threshold,
 47              cat_stattest_threshold,
 48              num_stattest_threshold,
 49              text_stattest_threshold,
 50              per_column_stattest_threshold,
 51          ),
 52      )
 53  
 54  
 55  def _calculate_stattest(
 56      feature_name: str,
 57      feature_type: str,
 58      stattest: Optional[PossibleStatTestType] = None,
 59      cat_stattest: Optional[PossibleStatTestType] = None,
 60      num_stattest: Optional[PossibleStatTestType] = None,
 61      text_stattest: Optional[PossibleStatTestType] = None,
 62      per_column_stattest: Optional[Dict[str, PossibleStatTestType]] = None,
 63  ) -> Optional[PossibleStatTestType]:
 64      func = None if stattest is None else stattest
 65      if feature_type == "cat":
 66          type_func = cat_stattest
 67      elif feature_type == "num":
 68          type_func = num_stattest
 69      elif feature_type == "text":
 70          type_func = text_stattest
 71      else:
 72          raise ValueError(f"Unexpected feature type {feature_type}.")
 73      func = func if type_func is None else type_func
 74      if per_column_stattest is None:
 75          return func
 76      return per_column_stattest.get(feature_name, func)
 77  
 78  
 79  def _calculate_threshold(
 80      feature_name: str,
 81      feature_type: str,
 82      stattest_threshold: Optional[float] = None,
 83      cat_stattest_threshold: Optional[float] = None,
 84      num_stattest_threshold: Optional[float] = None,
 85      text_stattest_threshold: Optional[float] = None,
 86      per_column_stattest_threshold: Optional[Dict[str, float]] = None,
 87  ) -> Optional[float]:
 88      if per_column_stattest_threshold is not None and feature_name in per_column_stattest_threshold.keys():
 89          return per_column_stattest_threshold.get(feature_name)
 90  
 91      if cat_stattest_threshold is not None and feature_type == "cat":
 92          return cat_stattest_threshold
 93  
 94      if num_stattest_threshold is not None and feature_type == "num":
 95          return num_stattest_threshold
 96  
 97      if text_stattest_threshold is not None and feature_type == "text":
 98          return text_stattest_threshold
 99  
100      if stattest_threshold is not None:
101          return stattest_threshold
102      return None
103  
104  
105  def roc_auc_domain_classifier(X_train, X_test, y_train, y_test) -> Tuple:
106      pipeline = Pipeline(
107          [
108              ("vectorization", TfidfVectorizer(sublinear_tf=True, max_df=0.5, stop_words="english")),
109              (
110                  "classification",
111                  SGDClassifier(alpha=0.0001, max_iter=50, penalty="l1", loss="modified_huber", random_state=42),
112              ),
113          ]
114      )
115  
116      pipeline.fit(X_train, y_train)
117      y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
118      roc_auc = roc_auc_score(y_test, y_pred_proba)
119      return roc_auc, y_pred_proba, pipeline
120  
121  
122  def roc_auc_random_classifier_percentile(y_test: np.ndarray, p_value=0.05, iter_num=1000, seed=42) -> float:
123      def calc_roc_auc_random(y_test, seed=None):
124          np.random.seed(seed)
125          y_random_pred = np.random.rand(len(y_test))
126          roc_auc_random = roc_auc_score(y_test, y_random_pred)
127          return roc_auc_random
128  
129      np.random.seed(seed)
130      seeds = np.random.randint(0, iter_num * 10, size=iter_num)
131      roc_auc_values = [calc_roc_auc_random(y_test, seed=seed) for seed in seeds]
132      return np.percentile(roc_auc_values, 100 * (1 - p_value))
133  
134  
135  def calculate_text_drift_score(
136      reference_data: pd.Series,
137      current_data: pd.Series,
138      bootstrap: bool,
139      p_value=0.05,
140      threshold=0.55,
141  ) -> Tuple[float, bool]:
142      domain_data = pd.concat(
143          [
144              pd.DataFrame({"text": reference_data, "target": 0}),
145              pd.DataFrame({"text": current_data, "target": 1}),
146          ]
147      )
148  
149      X_train, X_test, y_train, y_test = train_test_split(
150          domain_data["text"],
151          domain_data["target"],
152          test_size=0.5,
153          random_state=42,
154          shuffle=True,
155      )
156  
157      domain_classifier_roc_auc, _, _ = roc_auc_domain_classifier(
158          X_train,
159          X_test,
160          y_train,
161          y_test,
162      )
163      if not bootstrap:
164          return domain_classifier_roc_auc, domain_classifier_roc_auc > threshold
165      random_classifier_percentile = roc_auc_random_classifier_percentile(y_test, p_value=p_value)
166      return domain_classifier_roc_auc, domain_classifier_roc_auc > random_classifier_percentile
167  
168  
169  def get_typical_examples(X_test, y_test, y_pred_proba, examples_num=10) -> Tuple[List[str], List[str]]:
170      typical_examples = pd.DataFrame({"text": X_test, "label": y_test, "predict_proba": y_pred_proba})
171  
172      typical_current = typical_examples[typical_examples["label"] == 1]
173      typical_current.sort_values("predict_proba", ascending=False, inplace=True)
174      typical_current = typical_current.head(examples_num)
175  
176      typical_reference = typical_examples[typical_examples["label"] == 0]
177      typical_reference.sort_values("predict_proba", ascending=True, inplace=True)
178      typical_reference = typical_reference.head(examples_num)
179      return list(typical_current["text"]), list(typical_reference["text"])
180  
181  
182  def get_typical_words(pipeline, words_num=25) -> Tuple[List[str], List[str]]:
183      weights_df = pd.DataFrame({"weight": pipeline["classification"].coef_[0]})
184      weights_df.reset_index(inplace=True)
185      weights_df.rename(columns={"index": "feature_ind"}, inplace=True)
186      weights_df = weights_df[weights_df["weight"] != 0]
187  
188      # build inverted index for vocabulary
189      inverted_vocabulary = {value: key for key, value in pipeline["vectorization"].vocabulary_.items()}
190      words_typical_current = weights_df[weights_df["weight"] > 0]
191      words_typical_current.sort_values("weight", ascending=False, inplace=True)
192      words_typical_current = words_typical_current.head(words_num)
193      words_typical_current["word"] = words_typical_current["feature_ind"].apply(lambda x: inverted_vocabulary[x])
194  
195      words_typical_reference = weights_df[weights_df["weight"] < 0]
196      words_typical_reference.sort_values("weight", ascending=True, inplace=True)
197      words_typical_reference = words_typical_reference.head(words_num)
198      words_typical_reference["word"] = words_typical_reference["feature_ind"].apply(lambda x: inverted_vocabulary[x])
199  
200      return list(words_typical_current["word"]), list(words_typical_reference["word"])
201  
202  
203  def get_text_data_for_plots(reference_data: pd.Series, current_data: pd.Series):
204      domain_data = pd.concat(
205          [
206              pd.DataFrame({"text": reference_data, "target": 0}),
207              pd.DataFrame({"text": current_data, "target": 1}),
208          ]
209      )
210  
211      X_train, X_test, y_train, y_test = train_test_split(
212          domain_data["text"],
213          domain_data["target"],
214          test_size=0.5,
215          random_state=42,
216          shuffle=True,
217      )
218  
219      _, y_pred_proba, classifier_pipeline = roc_auc_domain_classifier(
220          X_train,
221          X_test,
222          y_train,
223          y_test,
224      )
225      # get examples more characteristic of current or reference dataset
226      typical_examples_cur, typical_examples_ref = get_typical_examples(X_test, y_test, y_pred_proba)
227  
228      # get words more characteristic of current or reference dataset
229      typical_words_cur, typical_words_ref = get_typical_words(classifier_pipeline)
230  
231      return typical_examples_cur, typical_examples_ref, typical_words_cur, typical_words_ref
232  
233  
234  def add_emb_drift_to_reports(
235      embeddings_data,
236      embeddings,
237      embeddings_drift_method,
238      entity,
239  ) -> List[Metric]:
240      result: List[Metric] = []
241      sets = list(embeddings_data.keys())
242      if embeddings is not None:
243          sets = np.intersect1d(sets, embeddings)
244      if len(sets) == 0:
245          return result
246      f: Optional[Callable]
247      for emb_set in sets:
248          if embeddings_drift_method is not None:
249              f = embeddings_drift_method.get(emb_set)
250          else:
251              f = None
252          result.append(entity(embeddings_name=emb_set, drift_method=f))
253      return result