/ src / evidently / legacy / features / BERTScore_feature.py
BERTScore_feature.py
  1  from collections import defaultdict
  2  from typing import ClassVar
  3  from typing import Dict
  4  from typing import List
  5  
  6  import numpy as np
  7  import pandas as pd
  8  
  9  from evidently.legacy.base_metric import ColumnName
 10  from evidently.legacy.core import ColumnType
 11  from evidently.legacy.features.generated_features import GeneratedFeature
 12  from evidently.legacy.utils.data_preprocessing import DataDefinition
 13  
 14  
 15  class BERTScoreFeature(GeneratedFeature):
 16      class Config:
 17          type_alias = "evidently:feature:BERTScoreFeature"
 18  
 19      __feature_type__: ClassVar = ColumnType.Numerical
 20      columns: List[str]
 21      model: str = "bert-base-uncased"  # Pretrained BERT model
 22      tfidf_weighted: bool = False  # Whether to weight embeddings with IDF
 23  
 24      def generate_feature(self, data: pd.DataFrame, data_definition: DataDefinition) -> pd.DataFrame:
 25          # Load BERT model and tokenizer
 26          from transformers import BertModel
 27          from transformers import BertTokenizer
 28  
 29          tokenizer = BertTokenizer.from_pretrained(self.model)
 30          model = BertModel.from_pretrained(self.model)
 31  
 32          # Tokenize sentences
 33          tokens_first = tokenizer(
 34              data[self.columns[0]].fillna("").tolist(), return_tensors="pt", padding=True, truncation=True
 35          )
 36          tokens_second = tokenizer(
 37              data[self.columns[1]].fillna("").tolist(), return_tensors="pt", padding=True, truncation=True
 38          )
 39  
 40          # Get embeddings
 41          embeddings_first = model(**tokens_first).last_hidden_state.detach().numpy()
 42          embeddings_second = model(**tokens_second).last_hidden_state.detach().numpy()
 43          # Obtain IDF scores
 44          idf_scores = self.compute_idf_scores(data[self.columns[0]], data[self.columns[1]], tokenizer)
 45  
 46          scores = []
 47          for i, (emb1, emb2) in enumerate(zip(embeddings_first, embeddings_second)):
 48              recall, precision = self.calculate_scores(emb1, emb2, idf_scores, i)
 49              f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
 50              scores.append(f1_score)
 51  
 52          # Return as a DataFrame
 53          return pd.DataFrame(
 54              {
 55                  self._feature_name(): pd.Series(scores, index=data.index),
 56              }
 57          )
 58  
 59      def compute_idf_scores(self, col1: pd.Series, col2: pd.Series, tokenizer) -> tuple:
 60          # Combine reference sentences
 61          reference_sentences = pd.concat([col1, col2]).dropna().tolist()
 62          M = len(reference_sentences)
 63  
 64          # Compute IDF for each unique token
 65          token_counts: Dict[str, int] = defaultdict(int)
 66          for sentence in reference_sentences:
 67              tokens = [tokenizer.cls_token] + tokenizer.tokenize(sentence) + [tokenizer.sep_token]
 68              unique_tokens = set(tokens)
 69              for token in unique_tokens:
 70                  token_counts[token] += 1
 71  
 72          idf_scores = {token: -np.log(count / M) for token, count in token_counts.items()}
 73  
 74          # Convert IDF scores to numpy arrays
 75          def convert_to_idf_arrays(sentences):
 76              idf_arrays = []
 77              for sentence in sentences:
 78                  tokens = tokenizer.tokenize(sentence)
 79  
 80                  # Add special tokens
 81                  tokens = [tokenizer.cls_token] + tokens + [tokenizer.sep_token]
 82                  # Compute IDF scores for each token including plus one smoothing
 83                  idf_array = np.array([idf_scores.get(token, 0) + 1 for token in tokens])
 84                  idf_arrays.append(idf_array)
 85              # Pad sequences to the same length
 86              max_len = max(len(arr) for arr in idf_arrays)
 87              idf_arrays = np.array([np.pad(arr, (0, max_len - len(arr)), "constant") for arr in idf_arrays])
 88              return idf_arrays
 89  
 90          idf_arrays1 = convert_to_idf_arrays(col1.fillna("").tolist())
 91          idf_arrays2 = convert_to_idf_arrays(col2.fillna("").tolist())
 92          return idf_arrays1, idf_arrays2
 93  
 94      def max_similarity(self, embeddings1, embeddings2):
 95          # Compute max cosine similarity for each token in embeddings1 with respect to embeddings2
 96          similarity_matrix = np.dot(embeddings1, embeddings2.T) / (
 97              np.linalg.norm(embeddings1, axis=1, keepdims=True) * np.linalg.norm(embeddings2, axis=1, keepdims=True).T
 98          )
 99          return similarity_matrix.max(axis=1)
100  
101      def calculate_scores(self, emb1, emb2, idf_scores, index):
102          if self.tfidf_weighted:
103              weighted_scores = np.multiply(self.max_similarity(emb1, emb2), idf_scores[0][index])
104              recall = weighted_scores.sum() / idf_scores[0][index].sum()
105  
106              weighted_scores = np.multiply(self.max_similarity(emb2, emb1), idf_scores[1][index])
107              precision = weighted_scores.sum() / idf_scores[1][index].sum()
108          else:
109              recall = self.max_similarity(emb1, emb2).mean()
110              precision = self.max_similarity(emb2, emb1).mean()
111          return recall, precision
112  
113      def _feature_name(self):
114          return "|".join(self.columns)
115  
116      def _as_column(self) -> "ColumnName":
117          return self._create_column(
118              self._feature_name(),
119              default_display_name=f"BERTScore for {' '.join(self.columns)}.",
120          )