/ src / evidently / descriptors / _context_relevance.py
_context_relevance.py
  1  import abc
  2  from typing import Any
  3  from typing import Dict
  4  from typing import Generic
  5  from typing import List
  6  from typing import Optional
  7  from typing import Protocol
  8  from typing import Tuple
  9  from typing import Type
 10  from typing import TypeVar
 11  from typing import Union
 12  
 13  import numpy as np
 14  import pandas as pd
 15  
 16  from evidently.core.datasets import AnyDescriptorTest
 17  from evidently.core.datasets import Dataset
 18  from evidently.core.datasets import DatasetColumn
 19  from evidently.core.datasets import Descriptor
 20  from evidently.legacy.base_metric import DisplayName
 21  from evidently.legacy.core import ColumnType
 22  from evidently.legacy.features.llm_judge import BinaryClassificationPromptTemplate
 23  from evidently.legacy.options.base import Options
 24  from evidently.legacy.utils.llm.wrapper import LLMWrapper
 25  from evidently.legacy.utils.llm.wrapper import OpenAIWrapper
 26  from evidently.legacy.utils.llm.wrapper import get_llm_wrapper
 27  
 28  
 29  def semantic_similarity_scoring(question: DatasetColumn, context: DatasetColumn, options: Options) -> DatasetColumn:
 30      """Compute semantic similarity scores between question and context using sentence transformers."""
 31      from sentence_transformers import SentenceTransformer
 32  
 33      model_id: str = "all-MiniLM-L6-v2"
 34  
 35      def normalized_cosine_distance(left, right):
 36          """Calculate normalized cosine distance between two vectors."""
 37          return 1 - ((1 - np.dot(left, right) / (np.linalg.norm(left) * np.linalg.norm(right))) / 2)
 38  
 39      model = SentenceTransformer(model_id)
 40      context_column = context.data.name
 41      no_index_context = context.data.reset_index()
 42  
 43      first = model.encode(question.data.fillna(""))
 44      context_rows = no_index_context.explode([context_column]).reset_index()
 45      second = model.encode(context_rows[context_column].fillna(""))
 46  
 47      scores = pd.Series(data=[[x] for x in second], index=context_rows.index, dtype=object)
 48      scind = pd.DataFrame(data={"ind": context_rows["index"], "scores": scores})
 49      rsd = pd.Series([scind.iloc[x]["scores"] for x in scind.groupby("ind").groups.values()])
 50      pairs: list[tuple[Any, Any]] = list(zip(first, rsd))
 51      series_data: list[list[float]] = [[normalized_cosine_distance(x, y1[0]) for y1 in y] for x, y in pairs]
 52      return DatasetColumn(
 53          data=pd.Series(data=series_data, index=question.data.index),
 54          type=ColumnType.List,
 55      )
 56  
 57  
 58  def llm_scoring(
 59      question: DatasetColumn,
 60      context: DatasetColumn,
 61      options: Options,
 62      model: str = "gpt-4o-mini",
 63      provider: str = "openai",
 64  ) -> DatasetColumn:
 65      """Compute relevance scores between question and context using LLM."""
 66      # unwrap data to rows
 67      context_column = context.data.name
 68      no_index_context = context.data.reset_index()
 69      context_rows = no_index_context.explode([context_column]).reset_index()  #
 70  
 71      llm_wrapper: Optional[LLMWrapper]
 72      # do scoring
 73      if provider == "openai":
 74          llm_wrapper = OpenAIWrapper(model, options)
 75      else:
 76          llm_wrapper = get_llm_wrapper(provider, model, options)
 77      if llm_wrapper is None:
 78          raise ValueError(f"LLM Wrapper for found for {provider}")
 79      template = BinaryClassificationPromptTemplate(
 80          criteria="""A “RELEVANT” label means that the CONTEXT provides useful, supportive, or related information to the QUESTION.
 81  
 82          An “IRRELEVANT” label means that the CONTEXT is either contradictory or unrelated to the QUESTION.
 83  
 84                  Here is a QUESTION
 85                  -----question_starts-----
 86                  {input}
 87                  -----question_ends-----
 88  
 89                  Here is a CONTEXT
 90                  -----context_starts-----
 91                  {context}
 92                  -----context_ends-----
 93  
 94          """,
 95          target_category="RELEVANT",
 96          non_target_category="IRRELEVANT",
 97          uncertainty="unknown",
 98          include_reasoning=True,
 99          include_score=True,
100          pre_messages=[("system", "You are a judge which evaluates text.")],
101      )
102      df = pd.DataFrame({"input": question.data, "context": context.data}).explode("context").reset_index()
103      questions = template.iterate_messages(df, {"input": "input", "context": "context"})
104      results = llm_wrapper.run_batch_sync(questions)
105      result_data = pd.DataFrame(results)
106      # wrap scoring to lists back
107      scind = pd.DataFrame(data={"ind": context_rows["index"], "scores": result_data["score"]})
108      rsd = pd.Series(
109          [list(scind.iloc[x]["scores"].astype(float)) for x in scind.groupby("ind").groups.values()],
110          index=question.data.index,
111      )
112  
113      return DatasetColumn(
114          ColumnType.List,
115          rsd,
116      )
117  
118  
119  T = TypeVar("T")
120  
121  
122  class AggregationMethod(Generic[T]):
123      """Base class for aggregating relevance scores."""
124  
125      column_type: ColumnType
126  
127      @abc.abstractmethod
128      def do(self, scores: List[float]) -> T:
129          """Aggregate a list of scores into a single value."""
130          raise NotImplementedError
131  
132  
133  class MeanAggregation(AggregationMethod[float]):
134      """Aggregate scores by computing the mean."""
135  
136      column_type: ColumnType
137      """Output column type (Numerical)."""
138  
139      def __init__(self):
140          self.column_type = ColumnType.Numerical
141  
142      def do(self, scores: List[float]) -> float:
143          """Return the average of scores."""
144          return float(np.average(scores))
145  
146  
147  class HitAggregation(AggregationMethod[int]):
148      """Aggregate scores by checking if any score exceeds threshold."""
149  
150      column_type: ColumnType
151      """Output column type (Categorical)."""
152      threshold: float
153      """Score threshold for hit detection."""
154  
155      def __init__(self, threshold: float = 0.8):
156          self.column_type = ColumnType.Categorical
157          self.threshold = threshold
158  
159      def do(self, scores: List[float]) -> int:
160          """Return 1 if any score >= threshold, else 0."""
161          return 1 if any([x >= self.threshold for x in scores]) else 0
162  
163  
164  class HitShareAggregation(AggregationMethod[float]):
165      """Aggregate scores by computing the share of scores above threshold."""
166  
167      column_type: ColumnType
168      """Output column type (Categorical)."""
169      threshold: float
170      """Score threshold for hit detection."""
171  
172      def __init__(self, threshold: float = 0.8):
173          self.column_type = ColumnType.Categorical
174          self.threshold = threshold
175  
176      def do(self, scores: List[float]) -> float:
177          """Return the fraction of scores >= threshold."""
178          return float(sum([1 if x >= self.threshold else 0 for x in scores])) / len(scores)
179  
180  
181  class ScoringMethod(Protocol):
182      """Protocol for scoring methods that compute relevance between question and context."""
183  
184      def __call__(
185          self,
186          question: DatasetColumn,
187          context: DatasetColumn,
188          options: Options,
189      ) -> DatasetColumn: ...
190  
191  
192  METHODS: Dict[str, Tuple[ScoringMethod, Type[MeanAggregation]]] = {
193      "semantic_similarity": (semantic_similarity_scoring, MeanAggregation),
194      "llm": (llm_scoring, MeanAggregation),
195  }
196  
197  
198  AGGREGATION_METHODS = {
199      "mean": MeanAggregation,
200      "hit": HitAggregation,
201      "hit_share": HitShareAggregation,
202  }
203  
204  
205  class ContextRelevance(Descriptor):
206      """Evaluate relevance of context to input using semantic similarity or LLM scoring."""
207  
208      input: str
209      """Column name containing input/question text."""
210      contexts: str
211      """Column name containing context text (list of strings per row)."""
212      method: str = "semantic_similarity"
213      """Scoring method: "semantic_similarity" or "llm"."""
214      method_params: Optional[Dict[str, object]] = None
215      """Additional parameters for scoring method."""
216      aggregation_method: Optional[str] = None
217      """How to aggregate scores: "mean", "hit", or "hit_share"."""
218      aggregation_method_params: Optional[Dict[str, object]] = None
219      """Parameters for aggregation method."""
220      output_scores: bool = False
221      """Whether to output individual scores in addition to aggregated score."""
222  
223      def __init__(
224          self,
225          input: str,
226          contexts: str,
227          method: str = "semantic_similarity",
228          method_params: Optional[Dict[str, object]] = None,
229          aggregation_method: Optional[str] = None,
230          aggregation_method_params: Optional[Dict[str, object]] = None,
231          output_scores: bool = False,
232          alias: Optional[str] = None,
233          tests: Optional[List[AnyDescriptorTest]] = None,
234      ):
235          self.output_scores = output_scores
236          self.aggregation_method = aggregation_method
237          self.aggregation_method_params = aggregation_method_params
238          self.method = method
239          self.method_params = method_params
240          self.input = input
241          self.contexts = contexts
242          super().__init__(alias=alias or f"Ranking for {input} with {contexts}", tests=tests)
243  
244      def generate_data(
245          self,
246          dataset: Dataset,
247          options: Options,
248      ) -> Union[DatasetColumn, Dict[DisplayName, DatasetColumn]]:
249          """Generate relevance scores for input-context pairs."""
250          data = dataset.column(self.contexts)
251  
252          (method, aggregation_method) = METHODS.get(self.method)
253          if method is None:
254              raise ValueError(f"Method {self.method} not found")
255          if self.aggregation_method is not None:
256              aggregation_method = AGGREGATION_METHODS.get(self.aggregation_method)
257          if aggregation_method is None:
258              raise ValueError(f"Aggregation method {self.aggregation_method} not found")
259  
260          scored_contexts = method(dataset.column(self.input), data, options, **(self.method_params or {}))
261          aggregation = aggregation_method(**(self.aggregation_method_params or {}))
262          aggregated_scores = scored_contexts.data.apply(aggregation.do)
263          result = {
264              f"{self.alias}": DatasetColumn(ColumnType.Numerical, aggregated_scores),
265          }
266          if self.output_scores:
267              result[f"{self.alias} scores"] = scored_contexts
268          return result
269  
270      def list_input_columns(self) -> Optional[List[str]]:
271          """Return list of required input column names."""
272          return [self.input, self.contexts]