/ evaluation / evaluator.py
evaluator.py
1 """ 2 Refactored evaluator with improved structure and readability. 3 This module evaluates AI model performance against test cases using multiple evaluation strategies. 4 """ 5 6 import concurrent.futures 7 import json 8 import logging 9 import re 10 from abc import ABC, abstractmethod 11 from collections import defaultdict 12 from dataclasses import dataclass, field 13 from pathlib import Path 14 15 import litellm 16 import numpy as np 17 from rouge import Rouge 18 19 from .shared import ( 20 EvaluationConfigLoader, 21 EvaluationOptions, 22 TestSuiteConfiguration, 23 load_test_case, 24 ) 25 26 log = logging.getLogger(__name__) 27 28 29 @dataclass 30 class EvaluationResult: 31 """Represents the evaluation result for a single run.""" 32 33 run_number: int 34 test_case_id: str 35 test_case_path: str 36 tool_match_score: float | None = None 37 response_match_score: float | None = None 38 llm_eval_score: float | None = None 39 llm_eval_reasoning: str | None = None 40 duration_seconds: float | None = None 41 errors: list[str] = field(default_factory=list) 42 43 def to_dict(self) -> dict[str, any]: 44 """Convert to dictionary format for JSON serialization.""" 45 result = { 46 "run": self.run_number, 47 "test_case_id": self.test_case_id, 48 "test_case_path": self.test_case_path, 49 "duration_seconds": self.duration_seconds, 50 } 51 52 if self.tool_match_score is not None: 53 result["tool_match"] = self.tool_match_score 54 55 if self.response_match_score is not None: 56 result["response_match"] = self.response_match_score 57 58 if self.llm_eval_score is not None: 59 result["llm_eval"] = { 60 "score": self.llm_eval_score, 61 "reasoning": self.llm_eval_reasoning, 62 } 63 64 if self.errors: 65 result["errors"] = self.errors 66 67 return result 68 69 70 @dataclass 71 class ScoreStatistics: 72 """Statistical summary of evaluation scores.""" 73 74 average: float 75 distribution: dict[str, float] 76 77 @classmethod 78 def from_scores(cls, scores: list[float]) -> "ScoreStatistics": 79 """Create statistics from a list of scores.""" 80 if not scores: 81 return cls( 82 average=0.0, 83 distribution={"min": 0.0, "q1": 0.0, "q2": 0.0, "q3": 0.0, "max": 0.0}, 84 ) 85 86 return cls( 87 average=float(np.mean(scores)), 88 distribution={ 89 "min": float(np.min(scores)), 90 "q1": float(np.percentile(scores, 25)), 91 "q2": float(np.median(scores)), 92 "q3": float(np.percentile(scores, 75)), 93 "max": float(np.max(scores)), 94 }, 95 ) 96 97 98 @dataclass 99 class TestCaseResults: 100 """Aggregated results for a test case across multiple runs.""" 101 102 test_case_id: str 103 category: str 104 runs: list[EvaluationResult] 105 average_duration: float 106 tool_match_scores: ScoreStatistics 107 response_match_scores: ScoreStatistics 108 llm_eval_scores: ScoreStatistics 109 110 def to_dict(self) -> dict[str, any]: 111 """Convert to dictionary format for JSON serialization.""" 112 return { 113 "test_case_id": self.test_case_id, 114 "category": self.category, 115 "runs": [run.to_dict() for run in self.runs], 116 "average_duration": self.average_duration, 117 "tool_match_scores": { 118 "average": self.tool_match_scores.average, 119 "distribution": self.tool_match_scores.distribution, 120 }, 121 "response_match_scores": { 122 "average": self.response_match_scores.average, 123 "distribution": self.response_match_scores.distribution, 124 }, 125 "llm_eval_scores": { 126 "average": self.llm_eval_scores.average, 127 "distribution": self.llm_eval_scores.distribution, 128 }, 129 } 130 131 132 @dataclass 133 class ModelResults: 134 """Complete evaluation results for a model.""" 135 136 model_name: str 137 total_execution_time: float | None 138 test_cases: list[TestCaseResults] 139 140 def to_dict(self) -> dict[str, any]: 141 """Convert to dictionary format for JSON serialization.""" 142 return { 143 "model_name": self.model_name, 144 "total_execution_time": self.total_execution_time, 145 "test_cases": [tc.to_dict() for tc in self.test_cases], 146 } 147 148 149 class ConfigurationService: 150 """Handles configuration loading and validation.""" 151 152 def __init__(self, config_path: str): 153 self.config_loader = EvaluationConfigLoader(config_path) 154 self._config_cache = None 155 self._evaluation_settings_cache = None 156 157 def get_config(self) -> TestSuiteConfiguration: 158 """Get the main configuration.""" 159 if self._config_cache is None: 160 self._config_cache = self.config_loader.load_configuration() 161 return self._config_cache 162 163 def get_evaluation_settings(self) -> EvaluationOptions: 164 """Get evaluation settings.""" 165 if self._evaluation_settings_cache is None: 166 self._evaluation_settings_cache = self.config_loader.get_evaluation_options() 167 return self._evaluation_settings_cache 168 169 170 class FileService: 171 """Handles file I/O operations.""" 172 173 @staticmethod 174 def load_json(filepath: Path) -> any: 175 """Load JSON data from file.""" 176 try: 177 with filepath.open() as f: 178 return json.load(f) 179 except (FileNotFoundError, json.JSONDecodeError) as e: 180 log.error(f"Failed to load JSON from {filepath}: {e}") 181 raise 182 183 @staticmethod 184 def save_json(data: any, filepath: Path): 185 """Save data as JSON to file.""" 186 try: 187 filepath.parent.mkdir(parents=True, exist_ok=True) 188 with filepath.open("w") as f: 189 json.dump(data, f, indent=4) 190 except Exception as e: 191 log.error(f"Failed to save JSON to {filepath}: {e}") 192 raise 193 194 @staticmethod 195 def file_exists(filepath: Path) -> bool: 196 """Check if file exists.""" 197 return filepath.exists() 198 199 200 class StatisticsService: 201 """Handles statistical calculations and aggregations.""" 202 203 @staticmethod 204 def calculate_score_statistics(scores: list[float]) -> ScoreStatistics: 205 """Calculate statistical summary for a list of scores.""" 206 return ScoreStatistics.from_scores(scores) 207 208 @staticmethod 209 def calculate_average_duration(durations: list[float]) -> float: 210 """Calculate average duration from a list of durations.""" 211 if not durations: 212 return 0.0 213 return float(np.mean(durations)) 214 215 216 class EvaluationStrategy(ABC): 217 """Abstract base class for evaluation strategies.""" 218 219 @abstractmethod 220 def evaluate( 221 self, test_case: dict[str, any], summary_data: dict[str, any] 222 ) -> float | None: 223 """Evaluate a test case run and return a score.""" 224 pass 225 226 227 class ToolMatchEvaluator(EvaluationStrategy): 228 """Evaluates tool usage against expected tools.""" 229 230 def evaluate( 231 self, test_case: dict[str, any], summary_data: dict[str, any] 232 ) -> float | None: 233 """Evaluate tool matching score.""" 234 try: 235 expected_tools = test_case["evaluation"]["expected_tools"] 236 actual_tools = [ 237 tool["tool_name"] for tool in summary_data.get("tool_calls", []) 238 ] 239 240 expected_set = set(expected_tools) 241 actual_set = set(actual_tools) 242 243 if not expected_set: 244 return 1.0 245 246 found_tools = expected_set.intersection(actual_set) 247 return len(found_tools) / len(expected_set) 248 249 except (KeyError, TypeError) as e: 250 log.warning(f"Error in tool match evaluation: {e}") 251 return None 252 253 254 class ResponseMatchEvaluator(EvaluationStrategy): 255 """Evaluates response quality using ROUGE metrics.""" 256 257 def __init__(self): 258 self.rouge = Rouge() 259 260 def evaluate( 261 self, test_case: dict[str, any], summary_data: dict[str, any] 262 ) -> float | None: 263 """Evaluate response matching score using a weighted ROUGE average.""" 264 try: 265 expected_response = test_case["evaluation"]["expected_response"] 266 actual_response = summary_data.get("final_message", "") 267 268 if not actual_response or not expected_response: 269 return 0.0 270 271 scores = self.rouge.get_scores(actual_response, expected_response)[0] 272 273 # Weighted average of ROUGE-1, ROUGE-2, and ROUGE-L f-scores 274 rouge_1_f = scores.get("rouge-1", {}).get("f", 0.0) 275 rouge_2_f = scores.get("rouge-2", {}).get("f", 0.0) 276 rouge_l_f = scores.get("rouge-l", {}).get("f", 0.0) 277 278 weighted_score = (0.2 * rouge_1_f) + (0.3 * rouge_2_f) + (0.5 * rouge_l_f) 279 280 return weighted_score 281 282 except (ValueError, KeyError, TypeError) as e: 283 log.warning(f"Error in response match evaluation: {e}") 284 return 0.0 285 286 287 class LLMEvaluator(EvaluationStrategy): 288 """Evaluates responses using an LLM judge.""" 289 290 def __init__(self, llm_config: dict[str, any]): 291 self.model = llm_config.get("LLM_SERVICE_PLANNING_MODEL_NAME") 292 self.api_key = llm_config.get("LLM_SERVICE_API_KEY") 293 self.api_base = llm_config.get("LLM_SERVICE_ENDPOINT") 294 295 if not all([self.model, self.api_key, self.api_base]): 296 raise ValueError( 297 "LLM evaluator requires model, api_key, and api_base configuration" 298 ) 299 300 def evaluate( 301 self, test_case: dict[str, any], summary_data: dict[str, any] 302 ) -> dict[str, any] | None: 303 """Evaluate response using LLM and return score with reasoning.""" 304 try: 305 query = test_case["query"] 306 expected_response = test_case["evaluation"]["expected_response"] 307 actual_response = summary_data.get("final_message", "") 308 criterion = test_case["evaluation"]["criterion"] 309 input_artifacts = summary_data.get("input_artifacts", []) 310 output_artifacts = summary_data.get("output_artifacts", []) 311 312 prompt = self._build_evaluation_prompt( 313 query, 314 expected_response, 315 actual_response, 316 criterion, 317 input_artifacts, 318 output_artifacts, 319 ) 320 321 response = litellm.completion( 322 model=self.model, 323 messages=[{"role": "user", "content": prompt}], 324 api_key=self.api_key, 325 base_url=self.api_base, 326 ) 327 328 response_content = response.choices[0].message.content.strip() 329 score = self._extract_score(response_content) 330 reasoning = self._extract_reasoning(response_content) 331 332 return {"score": score, "reasoning": reasoning} 333 334 except Exception as e: 335 log.error(f"Error in LLM evaluation: {e}") 336 return None 337 338 def _build_evaluation_prompt( 339 self, 340 query: str, 341 expected_response: str, 342 actual_response: str, 343 criterion: str, 344 input_artifacts: list[dict], 345 output_artifacts: list[dict], 346 ) -> str: 347 """Build the evaluation prompt for the LLM.""" 348 return f""" 349 Original Query: {query} 350 Expected Response: {expected_response} 351 Actual Response: {actual_response} 352 Criterion: {criterion} 353 Input Artifacts: {input_artifacts} 354 Output Artifacts: {output_artifacts} 355 356 Based on the criterion, please evaluate the actual response. 357 Format your response exactly as: 358 Score: [0.0-1.0] 359 Reasoning: [Your detailed explanation of why you gave this score, considering both the response and any artifacts created] 360 361 Provide a score from 0.0 to 1.0 where: 362 - 1.0 = Excellent: Fully meets the criterion and expectations 363 - 0.8-0.9 = Good: Mostly meets the criterion with minor issues 364 - 0.6-0.7 = Adequate: Partially meets the criterion but has notable gaps 365 - 0.4-0.5 = Poor: Minimally meets the criterion with significant issues 366 - 0.0-0.3 = Very Poor: Fails to meet the criterion 367 """ 368 369 def _extract_score(self, llm_response: str) -> float: 370 """Extract numerical score from LLM response.""" 371 # Try to find "Score: X.X" pattern first 372 score_match = re.search( 373 r"Score:\s*([0-9]*\.?[0-9]+)", llm_response, re.IGNORECASE 374 ) 375 if score_match: 376 try: 377 score = float(score_match.group(1)) 378 return max(0.0, min(1.0, score)) 379 except ValueError: 380 pass 381 382 # Fallback: look for any number between 0 and 1 383 number_match = re.search(r"\b([0-1]\.?[0-9]*)\b", llm_response) 384 if number_match: 385 try: 386 score = float(number_match.group(1)) 387 if 0.0 <= score <= 1.0: 388 return score 389 except ValueError: 390 pass 391 392 return 0.0 393 394 def _extract_reasoning(self, llm_response: str) -> str: 395 """Extract reasoning from LLM response.""" 396 reasoning_match = re.search( 397 r"Reasoning:\s*(.+)", llm_response, re.IGNORECASE | re.DOTALL 398 ) 399 if reasoning_match: 400 return reasoning_match.group(1).strip() 401 402 return llm_response.strip() 403 404 405 class RunEvaluator: 406 """Evaluates individual test runs.""" 407 408 def __init__(self, evaluation_settings: dict[str, any]): 409 self.evaluation_settings = evaluation_settings 410 self.file_service = FileService() 411 412 # Initialize evaluators based on settings 413 self.tool_evaluator = ( 414 ToolMatchEvaluator() 415 if evaluation_settings["tool_match"]["enabled"] 416 else None 417 ) 418 self.response_evaluator = ( 419 ResponseMatchEvaluator() 420 if evaluation_settings["response_match"]["enabled"] 421 else None 422 ) 423 424 self.llm_evaluator = None 425 if evaluation_settings["llm_evaluator"]["enabled"]: 426 try: 427 llm_config = evaluation_settings["llm_evaluator"]["env"] 428 self.llm_evaluator = LLMEvaluator(llm_config) 429 except Exception as e: 430 log.error(f"Failed to initialize LLM evaluator: {e}") 431 432 def evaluate_run( 433 self, 434 run_number: int, 435 run_path: Path, 436 test_case: dict[str, any], 437 test_case_path: str, 438 ) -> EvaluationResult | None: 439 """Evaluate a single test run.""" 440 log.info( 441 f" - Evaluating run {run_number} for test case {test_case['test_case_id']}" 442 ) 443 444 # Load summary data 445 summary_path = run_path / "summary.json" 446 log.info(f"Summary file path: {summary_path}") 447 if not self.file_service.file_exists(summary_path): 448 log.warning( 449 f" Summary file not found for run {run_number}, skipping." 450 ) 451 return None 452 453 try: 454 summary_data = self.file_service.load_json(summary_path) 455 except Exception as e: 456 log.error(f" Error loading summary.json for run {run_number}: {e}") 457 return None 458 459 # Create evaluation result 460 result = EvaluationResult( 461 run_number=run_number, 462 test_case_id=test_case["test_case_id"], 463 test_case_path=test_case_path, 464 duration_seconds=summary_data.get("duration_seconds"), 465 ) 466 467 # Run evaluations 468 if self.tool_evaluator: 469 result.tool_match_score = self.tool_evaluator.evaluate( 470 test_case, summary_data 471 ) 472 473 if self.response_evaluator: 474 result.response_match_score = self.response_evaluator.evaluate( 475 test_case, summary_data 476 ) 477 478 if self.llm_evaluator: 479 llm_result = self.llm_evaluator.evaluate(test_case, summary_data) 480 if llm_result: 481 result.llm_eval_score = llm_result["score"] 482 result.llm_eval_reasoning = llm_result["reasoning"] 483 484 return result 485 486 487 class ModelEvaluator: 488 """Evaluates all runs for a single model.""" 489 490 def __init__(self, config: dict[str, any], evaluation_settings: dict[str, any]): 491 self.config = config 492 self.evaluation_settings = evaluation_settings 493 self.run_evaluator = RunEvaluator(evaluation_settings) 494 self.statistics_service = StatisticsService() 495 496 def evaluate_model(self, model_name: str, base_results_path: str) -> ModelResults: 497 """Evaluate all test cases for a model.""" 498 log.info(f"Evaluating model: {model_name}") 499 500 model_results_path = Path(base_results_path) / model_name 501 502 # Collect all evaluation tasks 503 tasks = self._collect_evaluation_tasks(model_results_path) 504 505 # Run evaluations in parallel 506 model_results_data = defaultdict(list) 507 with concurrent.futures.ThreadPoolExecutor() as executor: 508 future_to_run = { 509 executor.submit(self.run_evaluator.evaluate_run, *task): task 510 for task in tasks 511 } 512 513 for future in concurrent.futures.as_completed(future_to_run): 514 try: 515 result = future.result() 516 if result: 517 model_results_data[result.test_case_id].append(result) 518 except Exception as e: 519 log.error(f"An error occurred during evaluation: {e}") 520 521 # Aggregate results by test case 522 test_case_results = [] 523 for test_case_id, runs in model_results_data.items(): 524 if runs: 525 test_case_result = self._aggregate_test_case_results(test_case_id, runs) 526 test_case_results.append(test_case_result) 527 528 return ModelResults( 529 model_name=model_name, 530 total_execution_time=None, # Will be set by orchestrator 531 test_cases=test_case_results, 532 ) 533 534 def _collect_evaluation_tasks( 535 self, model_results_path: Path 536 ) -> list[tuple[int, Path, dict[str, any], str]]: 537 """Collect all evaluation tasks for the model.""" 538 tasks = [] 539 540 for test_case_path in self.config["test_cases"]: 541 test_case = load_test_case(test_case_path) 542 test_case_name = Path(test_case_path).stem.replace(".test", "") 543 test_case_results_path = model_results_path / test_case_name 544 545 for i in range(1, self.config["runs"] + 1): 546 run_path = test_case_results_path / f"run_{i}" 547 tasks.append((i, run_path, test_case, test_case_path)) 548 549 return tasks 550 551 def _aggregate_test_case_results( 552 self, test_case_id: str, runs: list[EvaluationResult] 553 ) -> TestCaseResults: 554 """Aggregate results for a test case across multiple runs.""" 555 # Load test case to get category 556 test_case_path = runs[0].test_case_path 557 test_case = load_test_case(test_case_path) 558 559 # Extract scores for statistics 560 tool_scores = [ 561 r.tool_match_score for r in runs if r.tool_match_score is not None 562 ] 563 response_scores = [ 564 r.response_match_score for r in runs if r.response_match_score is not None 565 ] 566 llm_scores = [r.llm_eval_score for r in runs if r.llm_eval_score is not None] 567 duration_scores = [ 568 r.duration_seconds for r in runs if r.duration_seconds is not None 569 ] 570 571 return TestCaseResults( 572 test_case_id=test_case_id, 573 category=test_case["category"], 574 runs=runs, 575 average_duration=self.statistics_service.calculate_average_duration( 576 duration_scores 577 ), 578 tool_match_scores=self.statistics_service.calculate_score_statistics( 579 tool_scores 580 ), 581 response_match_scores=self.statistics_service.calculate_score_statistics( 582 response_scores 583 ), 584 llm_eval_scores=self.statistics_service.calculate_score_statistics( 585 llm_scores 586 ), 587 ) 588 589 590 class ResultsWriter: 591 """Handles writing evaluation results to files.""" 592 593 def __init__(self): 594 self.file_service = FileService() 595 596 def write_model_results(self, model_results: ModelResults, base_results_path: str): 597 """Write model results to file.""" 598 results_path = ( 599 Path(base_results_path) / model_results.model_name / "results.json" 600 ) 601 self.file_service.save_json(model_results.to_dict(), results_path) 602 log.info( 603 f"Results for model {model_results.model_name} written to {results_path}" 604 ) 605 606 607 class EvaluationOrchestrator: 608 """Main orchestrator that coordinates the entire evaluation process.""" 609 610 def __init__(self, config_path: str): 611 self.config_service = ConfigurationService(config_path) 612 self.results_writer = ResultsWriter() 613 614 def run_evaluation( 615 self, 616 base_results_path: str, 617 model_execution_times: dict[str, float] | None = None, 618 ): 619 """Main entry point for the evaluation process.""" 620 log.info("Starting evaluation") 621 622 # Resolve to an absolute path to ensure consistency 623 base_results_path = str(Path(base_results_path).resolve()) 624 625 if model_execution_times is None: 626 model_execution_times = {} 627 628 config = self.config_service.get_config() 629 evaluation_settings = self.config_service.get_evaluation_settings() 630 631 # Convert evaluation settings to dict format for backwards compatibility 632 settings_dict = { 633 "tool_match": {"enabled": evaluation_settings.tool_matching_enabled}, 634 "response_match": {"enabled": evaluation_settings.response_matching_enabled}, 635 "llm_evaluator": { 636 "enabled": evaluation_settings.llm_evaluation_enabled, 637 "env": evaluation_settings.llm_evaluator_environment.variables if evaluation_settings.llm_evaluator_environment else {} 638 } 639 } 640 641 # Convert config to dict format for backwards compatibility 642 config_dict = { 643 "test_cases": config.test_case_files, 644 "runs": config.run_count 645 } 646 647 model_evaluator = ModelEvaluator(config_dict, settings_dict) 648 649 if config.remote: 650 # Handle remote evaluation 651 model_name = "remote" 652 model_results = model_evaluator.evaluate_model(model_name, base_results_path) 653 execution_time = model_execution_times.get(model_name) 654 if execution_time is not None: 655 model_results.total_execution_time = execution_time 656 self.results_writer.write_model_results(model_results, base_results_path) 657 else: 658 # Handle local evaluation 659 for model_config in config.model_configurations: 660 model_name = model_config.name 661 662 # Evaluate the model 663 model_results = model_evaluator.evaluate_model( 664 model_name, base_results_path 665 ) 666 667 # Add execution time if available 668 execution_time = model_execution_times.get(model_name) 669 if execution_time is not None: 670 model_results.total_execution_time = execution_time 671 672 # Write results to file 673 self.results_writer.write_model_results(model_results, base_results_path) 674 675 log.info("--- Evaluation finished ---") 676 677 678 def main(config_path: str): 679 """Main entry point for command-line usage.""" 680 orchestrator = EvaluationOrchestrator(config_path) 681 # Results path should be based on the current working directory, not the package location. 682 # This main function is for standalone testing. 683 config = orchestrator.config_service.get_config() 684 results_path = Path.cwd() / "results" / config.results_directory 685 results_path.mkdir(parents=True, exist_ok=True) 686 orchestrator.run_evaluation(str(results_path)) 687 688 689 if __name__ == "__main__": 690 # This will be updated later to parse CLI args. 691 main()