evaluator.py
1 #!/usr/bin/env python3 2 """ 3 Shadow Evaluator - DeepSeek Parallel Code Evaluation Engine 4 5 Runs in parallel with Claude to evaluate code changes for: 6 - MECE Test Coverage: Are tests mutually exclusive and collectively exhaustive? 7 - Implementation Efficiency: Is the code efficient? Could it be simpler? 8 - Hallucination Detection: Does the change reference non-existent APIs/modules? 9 10 Architecture: 11 Claude proposes Edit/Write â Shadow Evaluator intercepts â DeepSeek + Qdrant analyze 12 â Returns verdict (OK/WARN/BLOCK) â Claude continues or fixes 13 """ 14 15 import os 16 import sys 17 import json 18 import re 19 import logging 20 from dataclasses import dataclass, asdict 21 from typing import List, Dict, Optional, Any 22 from pathlib import Path 23 from urllib.request import Request, urlopen 24 from urllib.error import URLError 25 26 import yaml 27 28 # Local imports 29 from qdrant_client import get_client as get_qdrant_client 30 from hallucination_detector import HallucinationDetector 31 from mece_analyzer import MECEAnalyzer 32 33 logger = logging.getLogger(__name__) 34 35 36 @dataclass 37 class Finding: 38 """A single evaluation finding""" 39 type: str # "HALLUCINATION", "SECURITY", "MECE_GAP", "EFFICIENCY" 40 severity: str # "critical", "warning", "info" 41 message: str 42 line: Optional[int] = None 43 suggestion: Optional[str] = None 44 45 46 @dataclass 47 class EvaluationResult: 48 """Result of a code evaluation""" 49 verdict: str # "OK", "WARN", "BLOCK" 50 findings: List[Finding] 51 metrics: Dict[str, Any] 52 deepseek_analysis: Optional[str] = None 53 54 55 # DeepSeek API configuration 56 DEEPSEEK_PROMPT = """You are a code review assistant. Analyze this code change for: 57 1. EFFICIENCY: Could this be simpler or more performant? 58 2. SECURITY: Any potential security issues? 59 3. CORRECTNESS: Any logic errors or edge cases missed? 60 61 Code change (diff format): 62 ``` 63 {diff} 64 ``` 65 66 File: {file_path} 67 Repository: {repo} 68 69 Output ONLY a JSON object with this structure: 70 {{ 71 "efficiency": {{"score": 1-5, "issues": ["issue1", ...]}}, 72 "security": {{"score": 1-5, "issues": ["issue1", ...]}}, 73 "correctness": {{"score": 1-5, "issues": ["issue1", ...]}} 74 }} 75 76 Scores: 5=excellent, 4=good, 3=acceptable, 2=needs work, 1=critical issues 77 Be concise. If no issues, use empty array.""" 78 79 80 class ShadowEvaluator: 81 """ 82 Main evaluation engine that orchestrates all analysis. 83 84 Combines: 85 - Qdrant semantic search for symbol verification 86 - HallucinationDetector for API/module verification 87 - MECEAnalyzer for test coverage gaps 88 - DeepSeek for efficiency/security/correctness analysis 89 """ 90 91 def __init__(self, config_path: Optional[str] = None): 92 """Initialize evaluator with configuration.""" 93 # Load config 94 if config_path is None: 95 config_path = Path(__file__).parent / "config.yaml" 96 97 if os.path.exists(config_path): 98 with open(config_path) as f: 99 self.config = yaml.safe_load(f) 100 else: 101 self.config = {} 102 103 self.deepseek_config = self.config.get("deepseek", {}) 104 self.eval_config = self.config.get("evaluation", {}) 105 106 # Initialize components 107 self.qdrant = get_qdrant_client() 108 self.hallucination_detector = HallucinationDetector(self.qdrant) 109 self.mece_analyzer = MECEAnalyzer(self.qdrant) 110 111 # Verdict rules 112 self.block_types = set(self.eval_config.get("verdicts", {}).get("block", [])) 113 self.warn_types = set(self.eval_config.get("verdicts", {}).get("warn", [])) 114 115 def _call_deepseek(self, diff: str, file_path: str, repo: str) -> Optional[Dict]: 116 """Call DeepSeek API for code analysis.""" 117 api_key = os.environ.get('DEEPSEEK_API_KEY') 118 if not api_key: 119 logger.warning("DEEPSEEK_API_KEY not set, skipping DeepSeek analysis") 120 return None 121 122 prompt = DEEPSEEK_PROMPT.format( 123 diff=diff[:4000], # Limit diff size 124 file_path=file_path, 125 repo=repo 126 ) 127 128 url = self.deepseek_config.get("api_url", "https://api.deepseek.com/chat/completions") 129 headers = { 130 "Content-Type": "application/json", 131 "Authorization": f"Bearer {api_key}" 132 } 133 payload = { 134 "model": self.deepseek_config.get("model", "deepseek-coder"), 135 "messages": [ 136 {"role": "system", "content": "You are a code review assistant. Output only JSON."}, 137 {"role": "user", "content": prompt} 138 ], 139 "temperature": self.deepseek_config.get("temperature", 0.2), 140 "max_tokens": self.deepseek_config.get("max_tokens", 1500) 141 } 142 143 try: 144 req = Request( 145 url, 146 data=json.dumps(payload).encode(), 147 headers=headers, 148 method='POST' 149 ) 150 timeout = self.deepseek_config.get("timeout", 30) 151 152 with urlopen(req, timeout=timeout) as resp: 153 data = json.loads(resp.read().decode()) 154 content = data['choices'][0]['message']['content'] 155 156 # Extract JSON from response 157 json_match = re.search(r'\{[\s\S]*\}', content) 158 if json_match: 159 return json.loads(json_match.group()) 160 return None 161 162 except Exception as e: 163 logger.error(f"DeepSeek API error: {e}") 164 return None 165 166 def _should_skip(self, file_path: str) -> bool: 167 """Check if file should be skipped based on config.""" 168 skip_patterns = self.eval_config.get("skip_paths", []) 169 for pattern in skip_patterns: 170 if Path(file_path).match(pattern): 171 return True 172 return False 173 174 def evaluate( 175 self, 176 diff: str, 177 file_path: str, 178 repo: Optional[str] = None, 179 tool_name: Optional[str] = None 180 ) -> EvaluationResult: 181 """ 182 Evaluate a code change. 183 184 Args: 185 diff: The diff or new code content 186 file_path: Path to the file being changed 187 repo: Repository name (optional, for scoped search) 188 tool_name: The tool that made the change (Edit/Write) 189 190 Returns: 191 EvaluationResult with verdict and findings 192 """ 193 findings: List[Finding] = [] 194 metrics: Dict[str, Any] = { 195 "file_path": file_path, 196 "repo": repo, 197 "tool": tool_name, 198 } 199 200 # Skip certain files 201 if self._should_skip(file_path): 202 logger.info(f"Skipping evaluation for {file_path}") 203 return EvaluationResult( 204 verdict="OK", 205 findings=[], 206 metrics={"skipped": True, "reason": "matches skip pattern"} 207 ) 208 209 # 1. Hallucination Detection 210 logger.info("Running hallucination detection...") 211 hallucination_report = self.hallucination_detector.analyze_diff(diff, repo, file_path) 212 metrics["hallucination"] = { 213 "symbols_checked": hallucination_report.symbols_checked, 214 "symbols_verified": hallucination_report.symbols_verified, 215 } 216 217 for unverified in hallucination_report.unverified: 218 severity = "critical" if unverified.confidence < 0.3 else "warning" 219 findings.append(Finding( 220 type="HALLUCINATION", 221 severity=severity, 222 message=f"Unverified {unverified.symbol_type}: '{unverified.symbol}'", 223 line=unverified.line_number, 224 suggestion=f"Verify that '{unverified.symbol}' exists in the codebase" 225 )) 226 227 # 2. MECE Test Coverage Analysis 228 logger.info("Running MECE analysis...") 229 mece_report = self.mece_analyzer.analyze_diff(diff, file_path, repo) 230 metrics["mece"] = { 231 "functions_analyzed": mece_report.functions_analyzed, 232 "coverage_score": mece_report.coverage_score, 233 } 234 235 for gap in mece_report.gaps: 236 findings.append(Finding( 237 type="MECE_GAP", 238 severity="warning" if gap.severity == "high" else "info", 239 message=gap.description, 240 line=gap.line_number, 241 suggestion=f"Consider adding a test for '{gap.function_name}'" 242 )) 243 244 # 3. DeepSeek Efficiency/Security Analysis 245 logger.info("Running DeepSeek analysis...") 246 deepseek_result = self._call_deepseek(diff, file_path, repo or "unknown") 247 deepseek_analysis = None 248 249 if deepseek_result: 250 deepseek_analysis = json.dumps(deepseek_result, indent=2) 251 metrics["deepseek"] = deepseek_result 252 253 # Process efficiency issues 254 efficiency = deepseek_result.get("efficiency", {}) 255 if efficiency.get("score", 5) < 3: 256 for issue in efficiency.get("issues", []): 257 findings.append(Finding( 258 type="EFFICIENCY", 259 severity="warning", 260 message=issue 261 )) 262 263 # Process security issues 264 security = deepseek_result.get("security", {}) 265 if security.get("score", 5) < 3: 266 for issue in security.get("issues", []): 267 findings.append(Finding( 268 type="SECURITY", 269 severity="critical" if security["score"] == 1 else "warning", 270 message=issue 271 )) 272 273 # Process correctness issues 274 correctness = deepseek_result.get("correctness", {}) 275 if correctness.get("score", 5) < 3: 276 for issue in correctness.get("issues", []): 277 findings.append(Finding( 278 type="CORRECTNESS", 279 severity="warning", 280 message=issue 281 )) 282 283 # Determine final verdict 284 verdict = "OK" 285 for finding in findings: 286 if finding.type in self.block_types and finding.severity == "critical": 287 verdict = "BLOCK" 288 break 289 elif finding.type in self.warn_types or finding.severity == "warning": 290 verdict = "WARN" 291 292 return EvaluationResult( 293 verdict=verdict, 294 findings=findings, 295 metrics=metrics, 296 deepseek_analysis=deepseek_analysis 297 ) 298 299 def evaluate_tool_result( 300 self, 301 tool_name: str, 302 tool_input: Dict[str, Any], 303 tool_output: str 304 ) -> EvaluationResult: 305 """ 306 Evaluate a tool call result (called from PostToolUse hook). 307 308 Args: 309 tool_name: Name of the tool (Edit/Write) 310 tool_input: Tool input parameters 311 tool_output: Tool output/result 312 313 Returns: 314 EvaluationResult 315 """ 316 file_path = tool_input.get("file_path", "") 317 318 # Only evaluate Rust files 319 if not file_path.endswith(".rs"): 320 return EvaluationResult( 321 verdict="OK", 322 findings=[], 323 metrics={"skipped": True, "reason": "not a Rust file"} 324 ) 325 326 # For Edit tool, construct diff from old_string -> new_string 327 if tool_name == "Edit": 328 old_str = tool_input.get("old_string", "") 329 new_str = tool_input.get("new_string", "") 330 diff = f"-{old_str}\n+{new_str}" 331 elif tool_name == "Write": 332 # For Write, treat entire content as new 333 content = tool_input.get("content", "") 334 diff = '\n'.join(f'+{line}' for line in content.split('\n')) 335 else: 336 return EvaluationResult( 337 verdict="OK", 338 findings=[], 339 metrics={"skipped": True, "reason": f"unsupported tool: {tool_name}"} 340 ) 341 342 # Extract repo from path 343 repo = None 344 path_parts = Path(file_path).parts 345 if "working-repos" in path_parts: 346 idx = path_parts.index("working-repos") 347 if idx + 1 < len(path_parts): 348 repo = path_parts[idx + 1] 349 350 return self.evaluate(diff, file_path, repo, tool_name) 351 352 353 def format_for_hook(result: EvaluationResult) -> str: 354 """Format evaluation result for hook output.""" 355 lines = [f"[SHADOW-EVAL] Verdict: {result.verdict}"] 356 357 if result.findings: 358 lines.append(f"Findings ({len(result.findings)}):") 359 for f in result.findings[:10]: # Limit to 10 findings 360 severity_icon = "đ´" if f.severity == "critical" else "đĄ" if f.severity == "warning" else "âšī¸" 361 line_info = f" (line {f.line})" if f.line else "" 362 lines.append(f" {severity_icon} [{f.type}]{line_info}: {f.message}") 363 if f.suggestion: 364 lines.append(f" ââ {f.suggestion}") 365 366 return '\n'.join(lines) 367 368 369 def main(): 370 """CLI for shadow evaluation.""" 371 import argparse 372 373 parser = argparse.ArgumentParser(description="Shadow Evaluator") 374 parser.add_argument("command", choices=["evaluate", "hook-test"]) 375 parser.add_argument("--diff", help="Diff file to evaluate") 376 parser.add_argument("--file", help="File path") 377 parser.add_argument("--repo", help="Repository name") 378 parser.add_argument("--stdin", action="store_true", help="Read from stdin") 379 parser.add_argument("--json", action="store_true", help="Output JSON") 380 args = parser.parse_args() 381 382 logging.basicConfig(level=logging.INFO) 383 evaluator = ShadowEvaluator() 384 385 if args.command == "evaluate": 386 if args.stdin: 387 diff = sys.stdin.read() 388 elif args.diff: 389 with open(args.diff) as f: 390 diff = f.read() 391 else: 392 print("Error: --diff or --stdin required") 393 return 1 394 395 file_path = args.file or "unknown" 396 result = evaluator.evaluate(diff, file_path, args.repo) 397 398 if args.json: 399 output = { 400 "verdict": result.verdict, 401 "findings": [asdict(f) for f in result.findings], 402 "metrics": result.metrics, 403 } 404 print(json.dumps(output, indent=2)) 405 else: 406 print(format_for_hook(result)) 407 408 elif args.command == "hook-test": 409 # Test hook integration 410 test_input = { 411 "file_path": "/home/devops/working-repos/alphavm/src/vm/executor.rs", 412 "old_string": "fn process()", 413 "new_string": "fn process_transaction()" 414 } 415 result = evaluator.evaluate_tool_result("Edit", test_input, "Success") 416 print(format_for_hook(result)) 417 418 419 if __name__ == "__main__": 420 main()