BERTScore_feature.py
1 from collections import defaultdict 2 from typing import ClassVar 3 from typing import Dict 4 from typing import List 5 6 import numpy as np 7 import pandas as pd 8 9 from evidently.legacy.base_metric import ColumnName 10 from evidently.legacy.core import ColumnType 11 from evidently.legacy.features.generated_features import GeneratedFeature 12 from evidently.legacy.utils.data_preprocessing import DataDefinition 13 14 15 class BERTScoreFeature(GeneratedFeature): 16 class Config: 17 type_alias = "evidently:feature:BERTScoreFeature" 18 19 __feature_type__: ClassVar = ColumnType.Numerical 20 columns: List[str] 21 model: str = "bert-base-uncased" # Pretrained BERT model 22 tfidf_weighted: bool = False # Whether to weight embeddings with IDF 23 24 def generate_feature(self, data: pd.DataFrame, data_definition: DataDefinition) -> pd.DataFrame: 25 # Load BERT model and tokenizer 26 from transformers import BertModel 27 from transformers import BertTokenizer 28 29 tokenizer = BertTokenizer.from_pretrained(self.model) 30 model = BertModel.from_pretrained(self.model) 31 32 # Tokenize sentences 33 tokens_first = tokenizer( 34 data[self.columns[0]].fillna("").tolist(), return_tensors="pt", padding=True, truncation=True 35 ) 36 tokens_second = tokenizer( 37 data[self.columns[1]].fillna("").tolist(), return_tensors="pt", padding=True, truncation=True 38 ) 39 40 # Get embeddings 41 embeddings_first = model(**tokens_first).last_hidden_state.detach().numpy() 42 embeddings_second = model(**tokens_second).last_hidden_state.detach().numpy() 43 # Obtain IDF scores 44 idf_scores = self.compute_idf_scores(data[self.columns[0]], data[self.columns[1]], tokenizer) 45 46 scores = [] 47 for i, (emb1, emb2) in enumerate(zip(embeddings_first, embeddings_second)): 48 recall, precision = self.calculate_scores(emb1, emb2, idf_scores, i) 49 f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 50 scores.append(f1_score) 51 52 # Return as a DataFrame 53 return pd.DataFrame( 54 { 55 self._feature_name(): pd.Series(scores, index=data.index), 56 } 57 ) 58 59 def compute_idf_scores(self, col1: pd.Series, col2: pd.Series, tokenizer) -> tuple: 60 # Combine reference sentences 61 reference_sentences = pd.concat([col1, col2]).dropna().tolist() 62 M = len(reference_sentences) 63 64 # Compute IDF for each unique token 65 token_counts: Dict[str, int] = defaultdict(int) 66 for sentence in reference_sentences: 67 tokens = [tokenizer.cls_token] + tokenizer.tokenize(sentence) + [tokenizer.sep_token] 68 unique_tokens = set(tokens) 69 for token in unique_tokens: 70 token_counts[token] += 1 71 72 idf_scores = {token: -np.log(count / M) for token, count in token_counts.items()} 73 74 # Convert IDF scores to numpy arrays 75 def convert_to_idf_arrays(sentences): 76 idf_arrays = [] 77 for sentence in sentences: 78 tokens = tokenizer.tokenize(sentence) 79 80 # Add special tokens 81 tokens = [tokenizer.cls_token] + tokens + [tokenizer.sep_token] 82 # Compute IDF scores for each token including plus one smoothing 83 idf_array = np.array([idf_scores.get(token, 0) + 1 for token in tokens]) 84 idf_arrays.append(idf_array) 85 # Pad sequences to the same length 86 max_len = max(len(arr) for arr in idf_arrays) 87 idf_arrays = np.array([np.pad(arr, (0, max_len - len(arr)), "constant") for arr in idf_arrays]) 88 return idf_arrays 89 90 idf_arrays1 = convert_to_idf_arrays(col1.fillna("").tolist()) 91 idf_arrays2 = convert_to_idf_arrays(col2.fillna("").tolist()) 92 return idf_arrays1, idf_arrays2 93 94 def max_similarity(self, embeddings1, embeddings2): 95 # Compute max cosine similarity for each token in embeddings1 with respect to embeddings2 96 similarity_matrix = np.dot(embeddings1, embeddings2.T) / ( 97 np.linalg.norm(embeddings1, axis=1, keepdims=True) * np.linalg.norm(embeddings2, axis=1, keepdims=True).T 98 ) 99 return similarity_matrix.max(axis=1) 100 101 def calculate_scores(self, emb1, emb2, idf_scores, index): 102 if self.tfidf_weighted: 103 weighted_scores = np.multiply(self.max_similarity(emb1, emb2), idf_scores[0][index]) 104 recall = weighted_scores.sum() / idf_scores[0][index].sum() 105 106 weighted_scores = np.multiply(self.max_similarity(emb2, emb1), idf_scores[1][index]) 107 precision = weighted_scores.sum() / idf_scores[1][index].sum() 108 else: 109 recall = self.max_similarity(emb1, emb2).mean() 110 precision = self.max_similarity(emb2, emb1).mean() 111 return recall, precision 112 113 def _feature_name(self): 114 return "|".join(self.columns) 115 116 def _as_column(self) -> "ColumnName": 117 return self._create_column( 118 self._feature_name(), 119 default_display_name=f"BERTScore for {' '.join(self.columns)}.", 120 )