/ evaluation / evaluator.py
evaluator.py
  1  """
  2  Refactored evaluator with improved structure and readability.
  3  This module evaluates AI model performance against test cases using multiple evaluation strategies.
  4  """
  5  
  6  import concurrent.futures
  7  import json
  8  import logging
  9  import re
 10  from abc import ABC, abstractmethod
 11  from collections import defaultdict
 12  from dataclasses import dataclass, field
 13  from pathlib import Path
 14  
 15  import litellm
 16  import numpy as np
 17  from rouge import Rouge
 18  
 19  from .shared import (
 20      EvaluationConfigLoader,
 21      EvaluationOptions,
 22      TestSuiteConfiguration,
 23      load_test_case,
 24  )
 25  
 26  log = logging.getLogger(__name__)
 27  
 28  
 29  @dataclass
 30  class EvaluationResult:
 31      """Represents the evaluation result for a single run."""
 32  
 33      run_number: int
 34      test_case_id: str
 35      test_case_path: str
 36      tool_match_score: float | None = None
 37      response_match_score: float | None = None
 38      llm_eval_score: float | None = None
 39      llm_eval_reasoning: str | None = None
 40      duration_seconds: float | None = None
 41      errors: list[str] = field(default_factory=list)
 42  
 43      def to_dict(self) -> dict[str, any]:
 44          """Convert to dictionary format for JSON serialization."""
 45          result = {
 46              "run": self.run_number,
 47              "test_case_id": self.test_case_id,
 48              "test_case_path": self.test_case_path,
 49              "duration_seconds": self.duration_seconds,
 50          }
 51  
 52          if self.tool_match_score is not None:
 53              result["tool_match"] = self.tool_match_score
 54  
 55          if self.response_match_score is not None:
 56              result["response_match"] = self.response_match_score
 57  
 58          if self.llm_eval_score is not None:
 59              result["llm_eval"] = {
 60                  "score": self.llm_eval_score,
 61                  "reasoning": self.llm_eval_reasoning,
 62              }
 63  
 64          if self.errors:
 65              result["errors"] = self.errors
 66  
 67          return result
 68  
 69  
 70  @dataclass
 71  class ScoreStatistics:
 72      """Statistical summary of evaluation scores."""
 73  
 74      average: float
 75      distribution: dict[str, float]
 76  
 77      @classmethod
 78      def from_scores(cls, scores: list[float]) -> "ScoreStatistics":
 79          """Create statistics from a list of scores."""
 80          if not scores:
 81              return cls(
 82                  average=0.0,
 83                  distribution={"min": 0.0, "q1": 0.0, "q2": 0.0, "q3": 0.0, "max": 0.0},
 84              )
 85  
 86          return cls(
 87              average=float(np.mean(scores)),
 88              distribution={
 89                  "min": float(np.min(scores)),
 90                  "q1": float(np.percentile(scores, 25)),
 91                  "q2": float(np.median(scores)),
 92                  "q3": float(np.percentile(scores, 75)),
 93                  "max": float(np.max(scores)),
 94              },
 95          )
 96  
 97  
 98  @dataclass
 99  class TestCaseResults:
100      """Aggregated results for a test case across multiple runs."""
101  
102      test_case_id: str
103      category: str
104      runs: list[EvaluationResult]
105      average_duration: float
106      tool_match_scores: ScoreStatistics
107      response_match_scores: ScoreStatistics
108      llm_eval_scores: ScoreStatistics
109  
110      def to_dict(self) -> dict[str, any]:
111          """Convert to dictionary format for JSON serialization."""
112          return {
113              "test_case_id": self.test_case_id,
114              "category": self.category,
115              "runs": [run.to_dict() for run in self.runs],
116              "average_duration": self.average_duration,
117              "tool_match_scores": {
118                  "average": self.tool_match_scores.average,
119                  "distribution": self.tool_match_scores.distribution,
120              },
121              "response_match_scores": {
122                  "average": self.response_match_scores.average,
123                  "distribution": self.response_match_scores.distribution,
124              },
125              "llm_eval_scores": {
126                  "average": self.llm_eval_scores.average,
127                  "distribution": self.llm_eval_scores.distribution,
128              },
129          }
130  
131  
132  @dataclass
133  class ModelResults:
134      """Complete evaluation results for a model."""
135  
136      model_name: str
137      total_execution_time: float | None
138      test_cases: list[TestCaseResults]
139  
140      def to_dict(self) -> dict[str, any]:
141          """Convert to dictionary format for JSON serialization."""
142          return {
143              "model_name": self.model_name,
144              "total_execution_time": self.total_execution_time,
145              "test_cases": [tc.to_dict() for tc in self.test_cases],
146          }
147  
148  
149  class ConfigurationService:
150      """Handles configuration loading and validation."""
151  
152      def __init__(self, config_path: str):
153          self.config_loader = EvaluationConfigLoader(config_path)
154          self._config_cache = None
155          self._evaluation_settings_cache = None
156  
157      def get_config(self) -> TestSuiteConfiguration:
158          """Get the main configuration."""
159          if self._config_cache is None:
160              self._config_cache = self.config_loader.load_configuration()
161          return self._config_cache
162  
163      def get_evaluation_settings(self) -> EvaluationOptions:
164          """Get evaluation settings."""
165          if self._evaluation_settings_cache is None:
166              self._evaluation_settings_cache = self.config_loader.get_evaluation_options()
167          return self._evaluation_settings_cache
168  
169  
170  class FileService:
171      """Handles file I/O operations."""
172  
173      @staticmethod
174      def load_json(filepath: Path) -> any:
175          """Load JSON data from file."""
176          try:
177              with filepath.open() as f:
178                  return json.load(f)
179          except (FileNotFoundError, json.JSONDecodeError) as e:
180              log.error(f"Failed to load JSON from {filepath}: {e}")
181              raise
182  
183      @staticmethod
184      def save_json(data: any, filepath: Path):
185          """Save data as JSON to file."""
186          try:
187              filepath.parent.mkdir(parents=True, exist_ok=True)
188              with filepath.open("w") as f:
189                  json.dump(data, f, indent=4)
190          except Exception as e:
191              log.error(f"Failed to save JSON to {filepath}: {e}")
192              raise
193  
194      @staticmethod
195      def file_exists(filepath: Path) -> bool:
196          """Check if file exists."""
197          return filepath.exists()
198  
199  
200  class StatisticsService:
201      """Handles statistical calculations and aggregations."""
202  
203      @staticmethod
204      def calculate_score_statistics(scores: list[float]) -> ScoreStatistics:
205          """Calculate statistical summary for a list of scores."""
206          return ScoreStatistics.from_scores(scores)
207  
208      @staticmethod
209      def calculate_average_duration(durations: list[float]) -> float:
210          """Calculate average duration from a list of durations."""
211          if not durations:
212              return 0.0
213          return float(np.mean(durations))
214  
215  
216  class EvaluationStrategy(ABC):
217      """Abstract base class for evaluation strategies."""
218  
219      @abstractmethod
220      def evaluate(
221          self, test_case: dict[str, any], summary_data: dict[str, any]
222      ) -> float | None:
223          """Evaluate a test case run and return a score."""
224          pass
225  
226  
227  class ToolMatchEvaluator(EvaluationStrategy):
228      """Evaluates tool usage against expected tools."""
229  
230      def evaluate(
231          self, test_case: dict[str, any], summary_data: dict[str, any]
232      ) -> float | None:
233          """Evaluate tool matching score."""
234          try:
235              expected_tools = test_case["evaluation"]["expected_tools"]
236              actual_tools = [
237                  tool["tool_name"] for tool in summary_data.get("tool_calls", [])
238              ]
239  
240              expected_set = set(expected_tools)
241              actual_set = set(actual_tools)
242  
243              if not expected_set:
244                  return 1.0
245  
246              found_tools = expected_set.intersection(actual_set)
247              return len(found_tools) / len(expected_set)
248  
249          except (KeyError, TypeError) as e:
250              log.warning(f"Error in tool match evaluation: {e}")
251              return None
252  
253  
254  class ResponseMatchEvaluator(EvaluationStrategy):
255      """Evaluates response quality using ROUGE metrics."""
256  
257      def __init__(self):
258          self.rouge = Rouge()
259  
260      def evaluate(
261          self, test_case: dict[str, any], summary_data: dict[str, any]
262      ) -> float | None:
263          """Evaluate response matching score using a weighted ROUGE average."""
264          try:
265              expected_response = test_case["evaluation"]["expected_response"]
266              actual_response = summary_data.get("final_message", "")
267  
268              if not actual_response or not expected_response:
269                  return 0.0
270  
271              scores = self.rouge.get_scores(actual_response, expected_response)[0]
272  
273              # Weighted average of ROUGE-1, ROUGE-2, and ROUGE-L f-scores
274              rouge_1_f = scores.get("rouge-1", {}).get("f", 0.0)
275              rouge_2_f = scores.get("rouge-2", {}).get("f", 0.0)
276              rouge_l_f = scores.get("rouge-l", {}).get("f", 0.0)
277  
278              weighted_score = (0.2 * rouge_1_f) + (0.3 * rouge_2_f) + (0.5 * rouge_l_f)
279  
280              return weighted_score
281  
282          except (ValueError, KeyError, TypeError) as e:
283              log.warning(f"Error in response match evaluation: {e}")
284              return 0.0
285  
286  
287  class LLMEvaluator(EvaluationStrategy):
288      """Evaluates responses using an LLM judge."""
289  
290      def __init__(self, llm_config: dict[str, any]):
291          self.model = llm_config.get("LLM_SERVICE_PLANNING_MODEL_NAME")
292          self.api_key = llm_config.get("LLM_SERVICE_API_KEY")
293          self.api_base = llm_config.get("LLM_SERVICE_ENDPOINT")
294  
295          if not all([self.model, self.api_key, self.api_base]):
296              raise ValueError(
297                  "LLM evaluator requires model, api_key, and api_base configuration"
298              )
299  
300      def evaluate(
301          self, test_case: dict[str, any], summary_data: dict[str, any]
302      ) -> dict[str, any] | None:
303          """Evaluate response using LLM and return score with reasoning."""
304          try:
305              query = test_case["query"]
306              expected_response = test_case["evaluation"]["expected_response"]
307              actual_response = summary_data.get("final_message", "")
308              criterion = test_case["evaluation"]["criterion"]
309              input_artifacts = summary_data.get("input_artifacts", [])
310              output_artifacts = summary_data.get("output_artifacts", [])
311  
312              prompt = self._build_evaluation_prompt(
313                  query,
314                  expected_response,
315                  actual_response,
316                  criterion,
317                  input_artifacts,
318                  output_artifacts,
319              )
320  
321              response = litellm.completion(
322                  model=self.model,
323                  messages=[{"role": "user", "content": prompt}],
324                  api_key=self.api_key,
325                  base_url=self.api_base,
326              )
327  
328              response_content = response.choices[0].message.content.strip()
329              score = self._extract_score(response_content)
330              reasoning = self._extract_reasoning(response_content)
331  
332              return {"score": score, "reasoning": reasoning}
333  
334          except Exception as e:
335              log.error(f"Error in LLM evaluation: {e}")
336              return None
337  
338      def _build_evaluation_prompt(
339          self,
340          query: str,
341          expected_response: str,
342          actual_response: str,
343          criterion: str,
344          input_artifacts: list[dict],
345          output_artifacts: list[dict],
346      ) -> str:
347          """Build the evaluation prompt for the LLM."""
348          return f"""
349          Original Query: {query}
350          Expected Response: {expected_response}
351          Actual Response: {actual_response}
352          Criterion: {criterion}
353          Input Artifacts: {input_artifacts}
354          Output Artifacts: {output_artifacts}
355  
356          Based on the criterion, please evaluate the actual response.
357          Format your response exactly as:
358          Score: [0.0-1.0]
359          Reasoning: [Your detailed explanation of why you gave this score, considering both the response and any artifacts created]
360  
361          Provide a score from 0.0 to 1.0 where:
362          - 1.0 = Excellent: Fully meets the criterion and expectations
363          - 0.8-0.9 = Good: Mostly meets the criterion with minor issues
364          - 0.6-0.7 = Adequate: Partially meets the criterion but has notable gaps
365          - 0.4-0.5 = Poor: Minimally meets the criterion with significant issues
366          - 0.0-0.3 = Very Poor: Fails to meet the criterion
367          """
368  
369      def _extract_score(self, llm_response: str) -> float:
370          """Extract numerical score from LLM response."""
371          # Try to find "Score: X.X" pattern first
372          score_match = re.search(
373              r"Score:\s*([0-9]*\.?[0-9]+)", llm_response, re.IGNORECASE
374          )
375          if score_match:
376              try:
377                  score = float(score_match.group(1))
378                  return max(0.0, min(1.0, score))
379              except ValueError:
380                  pass
381  
382          # Fallback: look for any number between 0 and 1
383          number_match = re.search(r"\b([0-1]\.?[0-9]*)\b", llm_response)
384          if number_match:
385              try:
386                  score = float(number_match.group(1))
387                  if 0.0 <= score <= 1.0:
388                      return score
389              except ValueError:
390                  pass
391  
392          return 0.0
393  
394      def _extract_reasoning(self, llm_response: str) -> str:
395          """Extract reasoning from LLM response."""
396          reasoning_match = re.search(
397              r"Reasoning:\s*(.+)", llm_response, re.IGNORECASE | re.DOTALL
398          )
399          if reasoning_match:
400              return reasoning_match.group(1).strip()
401  
402          return llm_response.strip()
403  
404  
405  class RunEvaluator:
406      """Evaluates individual test runs."""
407  
408      def __init__(self, evaluation_settings: dict[str, any]):
409          self.evaluation_settings = evaluation_settings
410          self.file_service = FileService()
411  
412          # Initialize evaluators based on settings
413          self.tool_evaluator = (
414              ToolMatchEvaluator()
415              if evaluation_settings["tool_match"]["enabled"]
416              else None
417          )
418          self.response_evaluator = (
419              ResponseMatchEvaluator()
420              if evaluation_settings["response_match"]["enabled"]
421              else None
422          )
423  
424          self.llm_evaluator = None
425          if evaluation_settings["llm_evaluator"]["enabled"]:
426              try:
427                  llm_config = evaluation_settings["llm_evaluator"]["env"]
428                  self.llm_evaluator = LLMEvaluator(llm_config)
429              except Exception as e:
430                  log.error(f"Failed to initialize LLM evaluator: {e}")
431  
432      def evaluate_run(
433          self,
434          run_number: int,
435          run_path: Path,
436          test_case: dict[str, any],
437          test_case_path: str,
438      ) -> EvaluationResult | None:
439          """Evaluate a single test run."""
440          log.info(
441              f"    - Evaluating run {run_number} for test case {test_case['test_case_id']}"
442          )
443  
444          # Load summary data
445          summary_path = run_path / "summary.json"
446          log.info(f"Summary file path: {summary_path}")
447          if not self.file_service.file_exists(summary_path):
448              log.warning(
449                  f"      Summary file not found for run {run_number}, skipping."
450              )
451              return None
452  
453          try:
454              summary_data = self.file_service.load_json(summary_path)
455          except Exception as e:
456              log.error(f"      Error loading summary.json for run {run_number}: {e}")
457              return None
458  
459          # Create evaluation result
460          result = EvaluationResult(
461              run_number=run_number,
462              test_case_id=test_case["test_case_id"],
463              test_case_path=test_case_path,
464              duration_seconds=summary_data.get("duration_seconds"),
465          )
466  
467          # Run evaluations
468          if self.tool_evaluator:
469              result.tool_match_score = self.tool_evaluator.evaluate(
470                  test_case, summary_data
471              )
472  
473          if self.response_evaluator:
474              result.response_match_score = self.response_evaluator.evaluate(
475                  test_case, summary_data
476              )
477  
478          if self.llm_evaluator:
479              llm_result = self.llm_evaluator.evaluate(test_case, summary_data)
480              if llm_result:
481                  result.llm_eval_score = llm_result["score"]
482                  result.llm_eval_reasoning = llm_result["reasoning"]
483  
484          return result
485  
486  
487  class ModelEvaluator:
488      """Evaluates all runs for a single model."""
489  
490      def __init__(self, config: dict[str, any], evaluation_settings: dict[str, any]):
491          self.config = config
492          self.evaluation_settings = evaluation_settings
493          self.run_evaluator = RunEvaluator(evaluation_settings)
494          self.statistics_service = StatisticsService()
495  
496      def evaluate_model(self, model_name: str, base_results_path: str) -> ModelResults:
497          """Evaluate all test cases for a model."""
498          log.info(f"Evaluating model: {model_name}")
499  
500          model_results_path = Path(base_results_path) / model_name
501  
502          # Collect all evaluation tasks
503          tasks = self._collect_evaluation_tasks(model_results_path)
504  
505          # Run evaluations in parallel
506          model_results_data = defaultdict(list)
507          with concurrent.futures.ThreadPoolExecutor() as executor:
508              future_to_run = {
509                  executor.submit(self.run_evaluator.evaluate_run, *task): task
510                  for task in tasks
511              }
512  
513              for future in concurrent.futures.as_completed(future_to_run):
514                  try:
515                      result = future.result()
516                      if result:
517                          model_results_data[result.test_case_id].append(result)
518                  except Exception as e:
519                      log.error(f"An error occurred during evaluation: {e}")
520  
521          # Aggregate results by test case
522          test_case_results = []
523          for test_case_id, runs in model_results_data.items():
524              if runs:
525                  test_case_result = self._aggregate_test_case_results(test_case_id, runs)
526                  test_case_results.append(test_case_result)
527  
528          return ModelResults(
529              model_name=model_name,
530              total_execution_time=None,  # Will be set by orchestrator
531              test_cases=test_case_results,
532          )
533  
534      def _collect_evaluation_tasks(
535          self, model_results_path: Path
536      ) -> list[tuple[int, Path, dict[str, any], str]]:
537          """Collect all evaluation tasks for the model."""
538          tasks = []
539  
540          for test_case_path in self.config["test_cases"]:
541              test_case = load_test_case(test_case_path)
542              test_case_name = Path(test_case_path).stem.replace(".test", "")
543              test_case_results_path = model_results_path / test_case_name
544  
545              for i in range(1, self.config["runs"] + 1):
546                  run_path = test_case_results_path / f"run_{i}"
547                  tasks.append((i, run_path, test_case, test_case_path))
548  
549          return tasks
550  
551      def _aggregate_test_case_results(
552          self, test_case_id: str, runs: list[EvaluationResult]
553      ) -> TestCaseResults:
554          """Aggregate results for a test case across multiple runs."""
555          # Load test case to get category
556          test_case_path = runs[0].test_case_path
557          test_case = load_test_case(test_case_path)
558  
559          # Extract scores for statistics
560          tool_scores = [
561              r.tool_match_score for r in runs if r.tool_match_score is not None
562          ]
563          response_scores = [
564              r.response_match_score for r in runs if r.response_match_score is not None
565          ]
566          llm_scores = [r.llm_eval_score for r in runs if r.llm_eval_score is not None]
567          duration_scores = [
568              r.duration_seconds for r in runs if r.duration_seconds is not None
569          ]
570  
571          return TestCaseResults(
572              test_case_id=test_case_id,
573              category=test_case["category"],
574              runs=runs,
575              average_duration=self.statistics_service.calculate_average_duration(
576                  duration_scores
577              ),
578              tool_match_scores=self.statistics_service.calculate_score_statistics(
579                  tool_scores
580              ),
581              response_match_scores=self.statistics_service.calculate_score_statistics(
582                  response_scores
583              ),
584              llm_eval_scores=self.statistics_service.calculate_score_statistics(
585                  llm_scores
586              ),
587          )
588  
589  
590  class ResultsWriter:
591      """Handles writing evaluation results to files."""
592  
593      def __init__(self):
594          self.file_service = FileService()
595  
596      def write_model_results(self, model_results: ModelResults, base_results_path: str):
597          """Write model results to file."""
598          results_path = (
599              Path(base_results_path) / model_results.model_name / "results.json"
600          )
601          self.file_service.save_json(model_results.to_dict(), results_path)
602          log.info(
603              f"Results for model {model_results.model_name} written to {results_path}"
604          )
605  
606  
607  class EvaluationOrchestrator:
608      """Main orchestrator that coordinates the entire evaluation process."""
609  
610      def __init__(self, config_path: str):
611          self.config_service = ConfigurationService(config_path)
612          self.results_writer = ResultsWriter()
613  
614      def run_evaluation(
615          self,
616          base_results_path: str,
617          model_execution_times: dict[str, float] | None = None,
618      ):
619          """Main entry point for the evaluation process."""
620          log.info("Starting evaluation")
621  
622          # Resolve to an absolute path to ensure consistency
623          base_results_path = str(Path(base_results_path).resolve())
624  
625          if model_execution_times is None:
626              model_execution_times = {}
627  
628          config = self.config_service.get_config()
629          evaluation_settings = self.config_service.get_evaluation_settings()
630  
631          # Convert evaluation settings to dict format for backwards compatibility
632          settings_dict = {
633              "tool_match": {"enabled": evaluation_settings.tool_matching_enabled},
634              "response_match": {"enabled": evaluation_settings.response_matching_enabled},
635              "llm_evaluator": {
636                  "enabled": evaluation_settings.llm_evaluation_enabled,
637                  "env": evaluation_settings.llm_evaluator_environment.variables if evaluation_settings.llm_evaluator_environment else {}
638              }
639          }
640  
641          # Convert config to dict format for backwards compatibility
642          config_dict = {
643              "test_cases": config.test_case_files,
644              "runs": config.run_count
645          }
646  
647          model_evaluator = ModelEvaluator(config_dict, settings_dict)
648  
649          if config.remote:
650              # Handle remote evaluation
651              model_name = "remote"
652              model_results = model_evaluator.evaluate_model(model_name, base_results_path)
653              execution_time = model_execution_times.get(model_name)
654              if execution_time is not None:
655                  model_results.total_execution_time = execution_time
656              self.results_writer.write_model_results(model_results, base_results_path)
657          else:
658              # Handle local evaluation
659              for model_config in config.model_configurations:
660                  model_name = model_config.name
661  
662                  # Evaluate the model
663                  model_results = model_evaluator.evaluate_model(
664                      model_name, base_results_path
665                  )
666  
667                  # Add execution time if available
668                  execution_time = model_execution_times.get(model_name)
669                  if execution_time is not None:
670                      model_results.total_execution_time = execution_time
671  
672                  # Write results to file
673                  self.results_writer.write_model_results(model_results, base_results_path)
674  
675          log.info("--- Evaluation finished ---")
676  
677  
678  def main(config_path: str):
679      """Main entry point for command-line usage."""
680      orchestrator = EvaluationOrchestrator(config_path)
681      # Results path should be based on the current working directory, not the package location.
682      # This main function is for standalone testing.
683      config = orchestrator.config_service.get_config()
684      results_path = Path.cwd() / "results" / config.results_directory
685      results_path.mkdir(parents=True, exist_ok=True)
686      orchestrator.run_evaluation(str(results_path))
687  
688  
689  if __name__ == "__main__":
690      # This will be updated later to parse CLI args.
691      main()