_context_relevance.py
1 import abc 2 from typing import Any 3 from typing import Dict 4 from typing import Generic 5 from typing import List 6 from typing import Optional 7 from typing import Protocol 8 from typing import Tuple 9 from typing import Type 10 from typing import TypeVar 11 from typing import Union 12 13 import numpy as np 14 import pandas as pd 15 16 from evidently.core.datasets import AnyDescriptorTest 17 from evidently.core.datasets import Dataset 18 from evidently.core.datasets import DatasetColumn 19 from evidently.core.datasets import Descriptor 20 from evidently.legacy.base_metric import DisplayName 21 from evidently.legacy.core import ColumnType 22 from evidently.legacy.features.llm_judge import BinaryClassificationPromptTemplate 23 from evidently.legacy.options.base import Options 24 from evidently.legacy.utils.llm.wrapper import LLMWrapper 25 from evidently.legacy.utils.llm.wrapper import OpenAIWrapper 26 from evidently.legacy.utils.llm.wrapper import get_llm_wrapper 27 28 29 def semantic_similarity_scoring(question: DatasetColumn, context: DatasetColumn, options: Options) -> DatasetColumn: 30 """Compute semantic similarity scores between question and context using sentence transformers.""" 31 from sentence_transformers import SentenceTransformer 32 33 model_id: str = "all-MiniLM-L6-v2" 34 35 def normalized_cosine_distance(left, right): 36 """Calculate normalized cosine distance between two vectors.""" 37 return 1 - ((1 - np.dot(left, right) / (np.linalg.norm(left) * np.linalg.norm(right))) / 2) 38 39 model = SentenceTransformer(model_id) 40 context_column = context.data.name 41 no_index_context = context.data.reset_index() 42 43 first = model.encode(question.data.fillna("")) 44 context_rows = no_index_context.explode([context_column]).reset_index() 45 second = model.encode(context_rows[context_column].fillna("")) 46 47 scores = pd.Series(data=[[x] for x in second], index=context_rows.index, dtype=object) 48 scind = pd.DataFrame(data={"ind": context_rows["index"], "scores": scores}) 49 rsd = pd.Series([scind.iloc[x]["scores"] for x in scind.groupby("ind").groups.values()]) 50 pairs: list[tuple[Any, Any]] = list(zip(first, rsd)) 51 series_data: list[list[float]] = [[normalized_cosine_distance(x, y1[0]) for y1 in y] for x, y in pairs] 52 return DatasetColumn( 53 data=pd.Series(data=series_data, index=question.data.index), 54 type=ColumnType.List, 55 ) 56 57 58 def llm_scoring( 59 question: DatasetColumn, 60 context: DatasetColumn, 61 options: Options, 62 model: str = "gpt-4o-mini", 63 provider: str = "openai", 64 ) -> DatasetColumn: 65 """Compute relevance scores between question and context using LLM.""" 66 # unwrap data to rows 67 context_column = context.data.name 68 no_index_context = context.data.reset_index() 69 context_rows = no_index_context.explode([context_column]).reset_index() # 70 71 llm_wrapper: Optional[LLMWrapper] 72 # do scoring 73 if provider == "openai": 74 llm_wrapper = OpenAIWrapper(model, options) 75 else: 76 llm_wrapper = get_llm_wrapper(provider, model, options) 77 if llm_wrapper is None: 78 raise ValueError(f"LLM Wrapper for found for {provider}") 79 template = BinaryClassificationPromptTemplate( 80 criteria="""A “RELEVANT” label means that the CONTEXT provides useful, supportive, or related information to the QUESTION. 81 82 An “IRRELEVANT” label means that the CONTEXT is either contradictory or unrelated to the QUESTION. 83 84 Here is a QUESTION 85 -----question_starts----- 86 {input} 87 -----question_ends----- 88 89 Here is a CONTEXT 90 -----context_starts----- 91 {context} 92 -----context_ends----- 93 94 """, 95 target_category="RELEVANT", 96 non_target_category="IRRELEVANT", 97 uncertainty="unknown", 98 include_reasoning=True, 99 include_score=True, 100 pre_messages=[("system", "You are a judge which evaluates text.")], 101 ) 102 df = pd.DataFrame({"input": question.data, "context": context.data}).explode("context").reset_index() 103 questions = template.iterate_messages(df, {"input": "input", "context": "context"}) 104 results = llm_wrapper.run_batch_sync(questions) 105 result_data = pd.DataFrame(results) 106 # wrap scoring to lists back 107 scind = pd.DataFrame(data={"ind": context_rows["index"], "scores": result_data["score"]}) 108 rsd = pd.Series( 109 [list(scind.iloc[x]["scores"].astype(float)) for x in scind.groupby("ind").groups.values()], 110 index=question.data.index, 111 ) 112 113 return DatasetColumn( 114 ColumnType.List, 115 rsd, 116 ) 117 118 119 T = TypeVar("T") 120 121 122 class AggregationMethod(Generic[T]): 123 """Base class for aggregating relevance scores.""" 124 125 column_type: ColumnType 126 127 @abc.abstractmethod 128 def do(self, scores: List[float]) -> T: 129 """Aggregate a list of scores into a single value.""" 130 raise NotImplementedError 131 132 133 class MeanAggregation(AggregationMethod[float]): 134 """Aggregate scores by computing the mean.""" 135 136 column_type: ColumnType 137 """Output column type (Numerical).""" 138 139 def __init__(self): 140 self.column_type = ColumnType.Numerical 141 142 def do(self, scores: List[float]) -> float: 143 """Return the average of scores.""" 144 return float(np.average(scores)) 145 146 147 class HitAggregation(AggregationMethod[int]): 148 """Aggregate scores by checking if any score exceeds threshold.""" 149 150 column_type: ColumnType 151 """Output column type (Categorical).""" 152 threshold: float 153 """Score threshold for hit detection.""" 154 155 def __init__(self, threshold: float = 0.8): 156 self.column_type = ColumnType.Categorical 157 self.threshold = threshold 158 159 def do(self, scores: List[float]) -> int: 160 """Return 1 if any score >= threshold, else 0.""" 161 return 1 if any([x >= self.threshold for x in scores]) else 0 162 163 164 class HitShareAggregation(AggregationMethod[float]): 165 """Aggregate scores by computing the share of scores above threshold.""" 166 167 column_type: ColumnType 168 """Output column type (Categorical).""" 169 threshold: float 170 """Score threshold for hit detection.""" 171 172 def __init__(self, threshold: float = 0.8): 173 self.column_type = ColumnType.Categorical 174 self.threshold = threshold 175 176 def do(self, scores: List[float]) -> float: 177 """Return the fraction of scores >= threshold.""" 178 return float(sum([1 if x >= self.threshold else 0 for x in scores])) / len(scores) 179 180 181 class ScoringMethod(Protocol): 182 """Protocol for scoring methods that compute relevance between question and context.""" 183 184 def __call__( 185 self, 186 question: DatasetColumn, 187 context: DatasetColumn, 188 options: Options, 189 ) -> DatasetColumn: ... 190 191 192 METHODS: Dict[str, Tuple[ScoringMethod, Type[MeanAggregation]]] = { 193 "semantic_similarity": (semantic_similarity_scoring, MeanAggregation), 194 "llm": (llm_scoring, MeanAggregation), 195 } 196 197 198 AGGREGATION_METHODS = { 199 "mean": MeanAggregation, 200 "hit": HitAggregation, 201 "hit_share": HitShareAggregation, 202 } 203 204 205 class ContextRelevance(Descriptor): 206 """Evaluate relevance of context to input using semantic similarity or LLM scoring.""" 207 208 input: str 209 """Column name containing input/question text.""" 210 contexts: str 211 """Column name containing context text (list of strings per row).""" 212 method: str = "semantic_similarity" 213 """Scoring method: "semantic_similarity" or "llm".""" 214 method_params: Optional[Dict[str, object]] = None 215 """Additional parameters for scoring method.""" 216 aggregation_method: Optional[str] = None 217 """How to aggregate scores: "mean", "hit", or "hit_share".""" 218 aggregation_method_params: Optional[Dict[str, object]] = None 219 """Parameters for aggregation method.""" 220 output_scores: bool = False 221 """Whether to output individual scores in addition to aggregated score.""" 222 223 def __init__( 224 self, 225 input: str, 226 contexts: str, 227 method: str = "semantic_similarity", 228 method_params: Optional[Dict[str, object]] = None, 229 aggregation_method: Optional[str] = None, 230 aggregation_method_params: Optional[Dict[str, object]] = None, 231 output_scores: bool = False, 232 alias: Optional[str] = None, 233 tests: Optional[List[AnyDescriptorTest]] = None, 234 ): 235 self.output_scores = output_scores 236 self.aggregation_method = aggregation_method 237 self.aggregation_method_params = aggregation_method_params 238 self.method = method 239 self.method_params = method_params 240 self.input = input 241 self.contexts = contexts 242 super().__init__(alias=alias or f"Ranking for {input} with {contexts}", tests=tests) 243 244 def generate_data( 245 self, 246 dataset: Dataset, 247 options: Options, 248 ) -> Union[DatasetColumn, Dict[DisplayName, DatasetColumn]]: 249 """Generate relevance scores for input-context pairs.""" 250 data = dataset.column(self.contexts) 251 252 (method, aggregation_method) = METHODS.get(self.method) 253 if method is None: 254 raise ValueError(f"Method {self.method} not found") 255 if self.aggregation_method is not None: 256 aggregation_method = AGGREGATION_METHODS.get(self.aggregation_method) 257 if aggregation_method is None: 258 raise ValueError(f"Aggregation method {self.aggregation_method} not found") 259 260 scored_contexts = method(dataset.column(self.input), data, options, **(self.method_params or {})) 261 aggregation = aggregation_method(**(self.aggregation_method_params or {})) 262 aggregated_scores = scored_contexts.data.apply(aggregation.do) 263 result = { 264 f"{self.alias}": DatasetColumn(ColumnType.Numerical, aggregated_scores), 265 } 266 if self.output_scores: 267 result[f"{self.alias} scores"] = scored_contexts 268 return result 269 270 def list_input_columns(self) -> Optional[List[str]]: 271 """Return list of required input column names.""" 272 return [self.input, self.contexts]