/ src / evidently / legacy / metrics / data_drift / embedding_drift_methods.py
embedding_drift_methods.py
  1  import abc
  2  from typing import Callable
  3  from typing import Optional
  4  from typing import Tuple
  5  
  6  import numpy as np
  7  import pandas as pd
  8  from scipy.spatial.distance import chebyshev
  9  from scipy.spatial.distance import cityblock
 10  from scipy.spatial.distance import cosine
 11  from scipy.spatial.distance import euclidean
 12  from sklearn.decomposition import PCA
 13  from sklearn.linear_model import SGDClassifier
 14  from sklearn.metrics import pairwise_distances
 15  from sklearn.metrics import pairwise_kernels
 16  from sklearn.metrics import roc_auc_score
 17  from sklearn.model_selection import train_test_split
 18  
 19  from evidently.legacy.calculations.stattests import get_stattest
 20  from evidently.legacy.core import ColumnType
 21  from evidently.pydantic_utils import EvidentlyBaseModel
 22  from evidently.pydantic_utils import autoregister
 23  
 24  DISTANCE_DICT = {
 25      "euclidean": euclidean,
 26      "cosine": cosine,
 27      "cityblock": cityblock,
 28      "chebyshev": chebyshev,
 29  }
 30  N_BOOTSTRAP = 100
 31  
 32  
 33  def get_pca_df(
 34      reference_emb: pd.DataFrame, current_emb: pd.DataFrame, n_comp: int
 35  ) -> Tuple[pd.DataFrame, pd.DataFrame]:
 36      """Takes two dataframes and reduces their dimensionality using the PCA method
 37      Args:
 38          reference_emb: reference embeddings data
 39          current_emb: current embeddings data
 40          n_comp: number of components to keep
 41      Returns:
 42          reference_emb_new: transformed reference_emb
 43          current_emb_new: transformed current_emb
 44      """
 45      pca = PCA(n_components=n_comp, random_state=0)
 46      return pd.DataFrame(pca.fit_transform(reference_emb)), pd.DataFrame(pca.transform(current_emb))
 47  
 48  
 49  class DriftMethod(EvidentlyBaseModel):
 50      class Config:
 51          is_base_type = True
 52  
 53      @abc.abstractmethod
 54      def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]:
 55          raise NotImplementedError
 56  
 57  
 58  @autoregister
 59  class DistanceDriftMethod(DriftMethod):
 60      class Config:
 61          type_alias = "evidently:drift_method:DistanceDriftMethod"
 62  
 63      dist: str = "euclidean"
 64      threshold: float = 0.2
 65      bootstrap: Optional[bool] = None
 66      quantile_probability: float = 0.95
 67      pca_components: Optional[int] = None
 68  
 69      def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]:
 70          if self.pca_components:
 71              reference_emb, current_emb = get_pca_df(reference_emb, current_emb, self.pca_components)
 72          res = DISTANCE_DICT[self.dist](reference_emb.mean(axis=0), current_emb.mean(axis=0))
 73          if self.bootstrap:
 74              bstrp_res = []
 75              b_ref_size = int(reference_emb.shape[0] ** 2 / (reference_emb.shape[0] + current_emb.shape[0]))
 76              b_curr_size = int(
 77                  current_emb.shape[0] * reference_emb.shape[0] / (reference_emb.shape[0] + current_emb.shape[0])
 78              )
 79              for i in range(N_BOOTSTRAP):
 80                  np.random.seed(i)
 81                  b_ref_idx = np.random.choice(reference_emb.shape[0], b_ref_size)
 82                  b_curr_idx = np.random.choice(reference_emb.shape[0], b_curr_size)
 83                  bstrp_res.append(
 84                      DISTANCE_DICT[self.dist](
 85                          reference_emb.iloc[b_ref_idx, :].mean(axis=0), reference_emb.iloc[b_curr_idx, :].mean(axis=0)
 86                      )
 87                  )
 88              perc = np.percentile(bstrp_res, 100 * self.quantile_probability)
 89              return res, res > perc, "distance"
 90          return res, res > self.threshold, "distance"
 91  
 92  
 93  def distance(
 94      dist: str = "euclidean",
 95      threshold: float = 0.2,
 96      bootstrap: Optional[bool] = None,
 97      quantile_probability: float = 0.95,
 98      pca_components: Optional[int] = None,
 99  ) -> Callable:
100      """Returns a function for calculating drift on embeddings using the average distance method with specified parameters
101      Args:
102          dist: "euclidean", "cosine", "cityblock" or "chebyshev"
103          threshold: all values above this threshold means data drift. Applies when bootstrap != True
104          bootstrap: boolean parameter to determine whether to apply statistical hypothesis testing
105          quantile_probability: applies when bootstrap == True
106          pca_components: number of components to keep
107      Returns:
108          func: a function for calculating drift, which takes in reference and current embeddings data
109          and returns a tuple: drift score, whether there is drift, and the name of the drift calculation method.
110      """
111  
112      return DistanceDriftMethod(
113          dist=dist,
114          threshold=threshold,
115          bootstrap=bootstrap,
116          quantile_probability=quantile_probability,
117          pca_components=pca_components,
118      )
119  
120  
121  def calc_roc_auc_random(y_test, i):
122      np.random.seed(i)
123      y_random_pred = np.random.rand(
124          len(y_test),
125      )
126      roc_auc_random = roc_auc_score(y_test, y_random_pred)
127      return roc_auc_random
128  
129  
130  @autoregister
131  class ModelDriftMethod(DriftMethod):
132      class Config:
133          type_alias = "evidently:drift_method:ModelDriftMethod"
134  
135      threshold: float = 0.55
136      bootstrap: Optional[bool] = None
137      quantile_probability: float = 0.95
138      pca_components: Optional[int] = None
139  
140      def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]:
141          if self.pca_components:
142              reference_emb, current_emb = get_pca_df(reference_emb, current_emb, self.pca_components)
143          reference_emb["target"] = [1] * reference_emb.shape[0]
144          current_emb["target"] = [0] * current_emb.shape[0]
145          data = pd.concat((reference_emb, current_emb))
146  
147          X_train, X_test, y_train, y_test = train_test_split(
148              data.drop("target", axis=1), data["target"], test_size=0.5, random_state=42, shuffle=True
149          )
150          clf = SGDClassifier(loss="log_loss", random_state=42)
151          clf.fit(X_train, y_train)
152          y_pred_proba = clf.predict_proba(X_test)[:, 1]
153          roc_auc = roc_auc_score(y_test, y_pred_proba)
154          if self.bootstrap:
155              roc_auc_values = [calc_roc_auc_random(y_test, i) for i in range(100)]
156              rand_roc_auc = np.percentile(roc_auc_values, 100 * self.quantile_probability)
157              return roc_auc, roc_auc > rand_roc_auc, "model"
158          return roc_auc, roc_auc > self.threshold, "model"
159  
160  
161  def model(
162      threshold: float = 0.55,
163      bootstrap: Optional[bool] = None,
164      quantile_probability: float = 0.95,
165      pca_components: Optional[int] = None,
166  ) -> DriftMethod:
167      """Returns a function for calculating drift on embeddings using the classifier method with specified parameters
168      Args:
169          threshold: all values above this threshold means data drift. Applies when bootstrap != True
170          bootstrap: boolean parameter to determine whether to apply statistical hypothesis testing
171          quantile_probability: applies when bootstrap == True
172          pca_components: number of components to keep
173      Returns:
174          func: a function for calculating drift, which takes in reference and current embeddings data
175          and returns a tuple: drift score, whether there is drift, and the name of the drift calculation method.
176      """
177      return ModelDriftMethod(
178          threshold=threshold,
179          bootstrap=bootstrap,
180          quantile_probability=quantile_probability,
181          pca_components=pca_components,
182      )
183  
184  
185  @autoregister
186  class RatioDriftMethod(DriftMethod):
187      class Config:
188          type_alias = "evidently:drift_method:RatioDriftMethod"
189  
190      component_stattest: str = "wasserstein"
191      component_stattest_threshold: float = 0.1
192      threshold: float = 0.2
193      pca_components: Optional[int] = None
194  
195      def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]:
196          if self.pca_components:
197              reference_emb, current_emb = get_pca_df(reference_emb, current_emb, self.pca_components)
198          stattest_func = get_stattest(
199              reference_emb.iloc[:, 0], current_emb.iloc[:, 0], ColumnType.Numerical, self.component_stattest
200          )
201          n_drifted = 0
202          for i in range(reference_emb.shape[1]):
203              drift_result = stattest_func(
204                  reference_emb.iloc[:, i],
205                  current_emb.iloc[:, i],
206                  feature_type=ColumnType.Numerical,
207                  threshold=self.component_stattest_threshold,
208              )
209              if drift_result.drifted:
210                  n_drifted += 1
211          return n_drifted / reference_emb.shape[1], n_drifted / reference_emb.shape[1] > self.threshold, "ratio"
212  
213  
214  def ratio(
215      component_stattest: str = "wasserstein",
216      component_stattest_threshold: float = 0.1,
217      threshold: float = 0.2,
218      pca_components: Optional[int] = None,
219  ) -> DriftMethod:
220      """Returns a function for calculating drift on embeddings using the ratio of drifted embeddings method
221      with specified parameters
222      Args:
223          component_stattest: method for testing drift in a single embedding. Any drift detection method
224          for a numerical feature implemented in evidently
225          component_stattest_threshold: threshold for testing drift in a single embedding
226          threshold: all values above this threshold means data drift
227          pca_components: number of components to keep
228      Returns:
229          func: a function for calculating drift, which takes in reference and current embeddings data
230          and returns a tuple: drift score, whether there is drift, and the name of the drift calculation method.
231      """
232      return RatioDriftMethod(
233          component_stattest=component_stattest,
234          component_stattest_threshold=component_stattest_threshold,
235          threshold=threshold,
236          pca_components=pca_components,
237      )
238  
239  
240  def MMD2u(K, m, n):
241      """The MMD^2_u unbiased statistic."""
242      Kx = K[:m, :m]
243      Ky = K[m:, m:]
244      Kxy = K[:m, m:]
245      return (
246          1.0 / (m * (m - 1.0)) * (Kx.sum() - Kx.diagonal().sum())
247          + 1.0 / (n * (n - 1.0)) * (Ky.sum() - Ky.diagonal().sum())
248          - 2.0 / (m * n) * Kxy.sum()
249      )
250  
251  
252  def MMD2u_bstrp(K, m, n, x_idx, y_idx):
253      """The MMD^2_u unbiased statistic for bootstrap subsample."""
254      Kx = K[[[idx] for idx in x_idx], x_idx]
255      Ky = K[[[idx] for idx in y_idx], y_idx]
256      Kxy = K[[[idx] for idx in x_idx], y_idx]
257      return (
258          1.0 / (m * (m - 1.0)) * (Kx.sum() - Kx.diagonal().sum())
259          + 1.0 / (n * (n - 1.0)) * (Ky.sum() - Ky.diagonal().sum())
260          - 2.0 / (m * n) * Kxy.sum()
261      )
262  
263  
264  @autoregister
265  class MMDDriftMethod(DriftMethod):
266      class Config:
267          type_alias = "evidently:drift_method:MMDDriftMethod"
268  
269      threshold: float = 0.015
270      bootstrap: Optional[bool] = None
271      quantile_probability: float = 0.05
272      pca_components: Optional[int] = None
273  
274      def __call__(self, current_emb: pd.DataFrame, reference_emb: pd.DataFrame) -> Tuple[float, bool, str]:
275          if self.pca_components:
276              reference_emb, current_emb = get_pca_df(reference_emb, current_emb, self.pca_components)
277          x = reference_emb
278          y = current_emb
279          m = len(x)
280          n = len(y)
281  
282          pair_dists = pairwise_distances(
283              x.sample(min(m, 1000), random_state=0),
284              y.sample(min(n, 1000), random_state=0),
285              metric="euclidean",
286              n_jobs=-1,
287          )
288          sigma2 = np.median(pair_dists) ** 2
289          xy = np.vstack([x, y])
290          K = pairwise_kernels(xy, metric="rbf", gamma=1.0 / sigma2)
291          mmd2u = MMD2u(K, m, n)
292          if self.bootstrap:
293              pair_dists_bstrp = pairwise_distances(x.sample(min(m, 1000), random_state=0), metric="euclidean", n_jobs=-1)
294              sigma2_x = np.median(pair_dists_bstrp) ** 2
295              gamma_x = 1.0 / sigma2_x
296              K = pairwise_kernels(x, metric="rbf", gamma=gamma_x)
297              x_size = max(int(m * m / (m + n)), 1)
298              y_size = max(int(m * n / (m + n)), 1)
299              bstrp_res = []
300              for i in range(N_BOOTSTRAP):
301                  np.random.seed(i)
302                  x_idxs = np.random.choice(m, x_size)
303                  y_idxs = np.random.choice(m, y_size)
304                  bstrp_res.append(MMD2u_bstrp(K, x_size, y_size, x_idxs, y_idxs))
305                  perc = np.percentile(bstrp_res, 100 * self.quantile_probability)
306              return max(mmd2u, 0), mmd2u > perc, "mmd"
307          else:
308              return max(mmd2u, 0), mmd2u > self.threshold, "mmd"
309  
310  
311  def mmd(
312      threshold: float = 0.015,
313      bootstrap: Optional[bool] = None,
314      quantile_probability: float = 0.05,
315      pca_components: Optional[int] = None,
316  ) -> DriftMethod:
317      """Returns a function for calculating drift on embeddings using the mmd method with specified parameters
318      Args:
319          threshold: all values above this threshold means data drift. Applies when bootstrap != True
320          bootstrap: boolean parameter to determine whether to apply statistical hypothesis testing
321          quantile_probability: applies when bootstrap == True
322          pca_components: number of components to keep
323      Returns:
324          func: a function for calculating drift, which takes in reference and current embeddings data
325          and returns a tuple: drift score, whether there is drift, and the name of the drift calculation method.
326      """
327      return MMDDriftMethod(
328          threshold=threshold,
329          bootstrap=bootstrap,
330          quantile_probability=quantile_probability,
331          pca_components=pca_components,
332      )