/ tooling / shadow-evaluator / evaluator.py
evaluator.py
  1  #!/usr/bin/env python3
  2  """
  3  Shadow Evaluator - DeepSeek Parallel Code Evaluation Engine
  4  
  5  Runs in parallel with Claude to evaluate code changes for:
  6  - MECE Test Coverage: Are tests mutually exclusive and collectively exhaustive?
  7  - Implementation Efficiency: Is the code efficient? Could it be simpler?
  8  - Hallucination Detection: Does the change reference non-existent APIs/modules?
  9  
 10  Architecture:
 11      Claude proposes Edit/Write → Shadow Evaluator intercepts → DeepSeek + Qdrant analyze
 12      → Returns verdict (OK/WARN/BLOCK) → Claude continues or fixes
 13  """
 14  
 15  import os
 16  import sys
 17  import json
 18  import re
 19  import logging
 20  from dataclasses import dataclass, asdict
 21  from typing import List, Dict, Optional, Any
 22  from pathlib import Path
 23  from urllib.request import Request, urlopen
 24  from urllib.error import URLError
 25  
 26  import yaml
 27  
 28  # Local imports
 29  from qdrant_client import get_client as get_qdrant_client
 30  from hallucination_detector import HallucinationDetector
 31  from mece_analyzer import MECEAnalyzer
 32  
 33  logger = logging.getLogger(__name__)
 34  
 35  
 36  @dataclass
 37  class Finding:
 38      """A single evaluation finding"""
 39      type: str          # "HALLUCINATION", "SECURITY", "MECE_GAP", "EFFICIENCY"
 40      severity: str      # "critical", "warning", "info"
 41      message: str
 42      line: Optional[int] = None
 43      suggestion: Optional[str] = None
 44  
 45  
 46  @dataclass
 47  class EvaluationResult:
 48      """Result of a code evaluation"""
 49      verdict: str       # "OK", "WARN", "BLOCK"
 50      findings: List[Finding]
 51      metrics: Dict[str, Any]
 52      deepseek_analysis: Optional[str] = None
 53  
 54  
 55  # DeepSeek API configuration
 56  DEEPSEEK_PROMPT = """You are a code review assistant. Analyze this code change for:
 57  1. EFFICIENCY: Could this be simpler or more performant?
 58  2. SECURITY: Any potential security issues?
 59  3. CORRECTNESS: Any logic errors or edge cases missed?
 60  
 61  Code change (diff format):
 62  ```
 63  {diff}
 64  ```
 65  
 66  File: {file_path}
 67  Repository: {repo}
 68  
 69  Output ONLY a JSON object with this structure:
 70  {{
 71    "efficiency": {{"score": 1-5, "issues": ["issue1", ...]}},
 72    "security": {{"score": 1-5, "issues": ["issue1", ...]}},
 73    "correctness": {{"score": 1-5, "issues": ["issue1", ...]}}
 74  }}
 75  
 76  Scores: 5=excellent, 4=good, 3=acceptable, 2=needs work, 1=critical issues
 77  Be concise. If no issues, use empty array."""
 78  
 79  
 80  class ShadowEvaluator:
 81      """
 82      Main evaluation engine that orchestrates all analysis.
 83  
 84      Combines:
 85      - Qdrant semantic search for symbol verification
 86      - HallucinationDetector for API/module verification
 87      - MECEAnalyzer for test coverage gaps
 88      - DeepSeek for efficiency/security/correctness analysis
 89      """
 90  
 91      def __init__(self, config_path: Optional[str] = None):
 92          """Initialize evaluator with configuration."""
 93          # Load config
 94          if config_path is None:
 95              config_path = Path(__file__).parent / "config.yaml"
 96  
 97          if os.path.exists(config_path):
 98              with open(config_path) as f:
 99                  self.config = yaml.safe_load(f)
100          else:
101              self.config = {}
102  
103          self.deepseek_config = self.config.get("deepseek", {})
104          self.eval_config = self.config.get("evaluation", {})
105  
106          # Initialize components
107          self.qdrant = get_qdrant_client()
108          self.hallucination_detector = HallucinationDetector(self.qdrant)
109          self.mece_analyzer = MECEAnalyzer(self.qdrant)
110  
111          # Verdict rules
112          self.block_types = set(self.eval_config.get("verdicts", {}).get("block", []))
113          self.warn_types = set(self.eval_config.get("verdicts", {}).get("warn", []))
114  
115      def _call_deepseek(self, diff: str, file_path: str, repo: str) -> Optional[Dict]:
116          """Call DeepSeek API for code analysis."""
117          api_key = os.environ.get('DEEPSEEK_API_KEY')
118          if not api_key:
119              logger.warning("DEEPSEEK_API_KEY not set, skipping DeepSeek analysis")
120              return None
121  
122          prompt = DEEPSEEK_PROMPT.format(
123              diff=diff[:4000],  # Limit diff size
124              file_path=file_path,
125              repo=repo
126          )
127  
128          url = self.deepseek_config.get("api_url", "https://api.deepseek.com/chat/completions")
129          headers = {
130              "Content-Type": "application/json",
131              "Authorization": f"Bearer {api_key}"
132          }
133          payload = {
134              "model": self.deepseek_config.get("model", "deepseek-coder"),
135              "messages": [
136                  {"role": "system", "content": "You are a code review assistant. Output only JSON."},
137                  {"role": "user", "content": prompt}
138              ],
139              "temperature": self.deepseek_config.get("temperature", 0.2),
140              "max_tokens": self.deepseek_config.get("max_tokens", 1500)
141          }
142  
143          try:
144              req = Request(
145                  url,
146                  data=json.dumps(payload).encode(),
147                  headers=headers,
148                  method='POST'
149              )
150              timeout = self.deepseek_config.get("timeout", 30)
151  
152              with urlopen(req, timeout=timeout) as resp:
153                  data = json.loads(resp.read().decode())
154                  content = data['choices'][0]['message']['content']
155  
156                  # Extract JSON from response
157                  json_match = re.search(r'\{[\s\S]*\}', content)
158                  if json_match:
159                      return json.loads(json_match.group())
160                  return None
161  
162          except Exception as e:
163              logger.error(f"DeepSeek API error: {e}")
164              return None
165  
166      def _should_skip(self, file_path: str) -> bool:
167          """Check if file should be skipped based on config."""
168          skip_patterns = self.eval_config.get("skip_paths", [])
169          for pattern in skip_patterns:
170              if Path(file_path).match(pattern):
171                  return True
172          return False
173  
174      def evaluate(
175          self,
176          diff: str,
177          file_path: str,
178          repo: Optional[str] = None,
179          tool_name: Optional[str] = None
180      ) -> EvaluationResult:
181          """
182          Evaluate a code change.
183  
184          Args:
185              diff: The diff or new code content
186              file_path: Path to the file being changed
187              repo: Repository name (optional, for scoped search)
188              tool_name: The tool that made the change (Edit/Write)
189  
190          Returns:
191              EvaluationResult with verdict and findings
192          """
193          findings: List[Finding] = []
194          metrics: Dict[str, Any] = {
195              "file_path": file_path,
196              "repo": repo,
197              "tool": tool_name,
198          }
199  
200          # Skip certain files
201          if self._should_skip(file_path):
202              logger.info(f"Skipping evaluation for {file_path}")
203              return EvaluationResult(
204                  verdict="OK",
205                  findings=[],
206                  metrics={"skipped": True, "reason": "matches skip pattern"}
207              )
208  
209          # 1. Hallucination Detection
210          logger.info("Running hallucination detection...")
211          hallucination_report = self.hallucination_detector.analyze_diff(diff, repo, file_path)
212          metrics["hallucination"] = {
213              "symbols_checked": hallucination_report.symbols_checked,
214              "symbols_verified": hallucination_report.symbols_verified,
215          }
216  
217          for unverified in hallucination_report.unverified:
218              severity = "critical" if unverified.confidence < 0.3 else "warning"
219              findings.append(Finding(
220                  type="HALLUCINATION",
221                  severity=severity,
222                  message=f"Unverified {unverified.symbol_type}: '{unverified.symbol}'",
223                  line=unverified.line_number,
224                  suggestion=f"Verify that '{unverified.symbol}' exists in the codebase"
225              ))
226  
227          # 2. MECE Test Coverage Analysis
228          logger.info("Running MECE analysis...")
229          mece_report = self.mece_analyzer.analyze_diff(diff, file_path, repo)
230          metrics["mece"] = {
231              "functions_analyzed": mece_report.functions_analyzed,
232              "coverage_score": mece_report.coverage_score,
233          }
234  
235          for gap in mece_report.gaps:
236              findings.append(Finding(
237                  type="MECE_GAP",
238                  severity="warning" if gap.severity == "high" else "info",
239                  message=gap.description,
240                  line=gap.line_number,
241                  suggestion=f"Consider adding a test for '{gap.function_name}'"
242              ))
243  
244          # 3. DeepSeek Efficiency/Security Analysis
245          logger.info("Running DeepSeek analysis...")
246          deepseek_result = self._call_deepseek(diff, file_path, repo or "unknown")
247          deepseek_analysis = None
248  
249          if deepseek_result:
250              deepseek_analysis = json.dumps(deepseek_result, indent=2)
251              metrics["deepseek"] = deepseek_result
252  
253              # Process efficiency issues
254              efficiency = deepseek_result.get("efficiency", {})
255              if efficiency.get("score", 5) < 3:
256                  for issue in efficiency.get("issues", []):
257                      findings.append(Finding(
258                          type="EFFICIENCY",
259                          severity="warning",
260                          message=issue
261                      ))
262  
263              # Process security issues
264              security = deepseek_result.get("security", {})
265              if security.get("score", 5) < 3:
266                  for issue in security.get("issues", []):
267                      findings.append(Finding(
268                          type="SECURITY",
269                          severity="critical" if security["score"] == 1 else "warning",
270                          message=issue
271                      ))
272  
273              # Process correctness issues
274              correctness = deepseek_result.get("correctness", {})
275              if correctness.get("score", 5) < 3:
276                  for issue in correctness.get("issues", []):
277                      findings.append(Finding(
278                          type="CORRECTNESS",
279                          severity="warning",
280                          message=issue
281                      ))
282  
283          # Determine final verdict
284          verdict = "OK"
285          for finding in findings:
286              if finding.type in self.block_types and finding.severity == "critical":
287                  verdict = "BLOCK"
288                  break
289              elif finding.type in self.warn_types or finding.severity == "warning":
290                  verdict = "WARN"
291  
292          return EvaluationResult(
293              verdict=verdict,
294              findings=findings,
295              metrics=metrics,
296              deepseek_analysis=deepseek_analysis
297          )
298  
299      def evaluate_tool_result(
300          self,
301          tool_name: str,
302          tool_input: Dict[str, Any],
303          tool_output: str
304      ) -> EvaluationResult:
305          """
306          Evaluate a tool call result (called from PostToolUse hook).
307  
308          Args:
309              tool_name: Name of the tool (Edit/Write)
310              tool_input: Tool input parameters
311              tool_output: Tool output/result
312  
313          Returns:
314              EvaluationResult
315          """
316          file_path = tool_input.get("file_path", "")
317  
318          # Only evaluate Rust files
319          if not file_path.endswith(".rs"):
320              return EvaluationResult(
321                  verdict="OK",
322                  findings=[],
323                  metrics={"skipped": True, "reason": "not a Rust file"}
324              )
325  
326          # For Edit tool, construct diff from old_string -> new_string
327          if tool_name == "Edit":
328              old_str = tool_input.get("old_string", "")
329              new_str = tool_input.get("new_string", "")
330              diff = f"-{old_str}\n+{new_str}"
331          elif tool_name == "Write":
332              # For Write, treat entire content as new
333              content = tool_input.get("content", "")
334              diff = '\n'.join(f'+{line}' for line in content.split('\n'))
335          else:
336              return EvaluationResult(
337                  verdict="OK",
338                  findings=[],
339                  metrics={"skipped": True, "reason": f"unsupported tool: {tool_name}"}
340              )
341  
342          # Extract repo from path
343          repo = None
344          path_parts = Path(file_path).parts
345          if "working-repos" in path_parts:
346              idx = path_parts.index("working-repos")
347              if idx + 1 < len(path_parts):
348                  repo = path_parts[idx + 1]
349  
350          return self.evaluate(diff, file_path, repo, tool_name)
351  
352  
353  def format_for_hook(result: EvaluationResult) -> str:
354      """Format evaluation result for hook output."""
355      lines = [f"[SHADOW-EVAL] Verdict: {result.verdict}"]
356  
357      if result.findings:
358          lines.append(f"Findings ({len(result.findings)}):")
359          for f in result.findings[:10]:  # Limit to 10 findings
360              severity_icon = "🔴" if f.severity == "critical" else "🟡" if f.severity == "warning" else "â„šī¸"
361              line_info = f" (line {f.line})" if f.line else ""
362              lines.append(f"  {severity_icon} [{f.type}]{line_info}: {f.message}")
363              if f.suggestion:
364                  lines.append(f"     └─ {f.suggestion}")
365  
366      return '\n'.join(lines)
367  
368  
369  def main():
370      """CLI for shadow evaluation."""
371      import argparse
372  
373      parser = argparse.ArgumentParser(description="Shadow Evaluator")
374      parser.add_argument("command", choices=["evaluate", "hook-test"])
375      parser.add_argument("--diff", help="Diff file to evaluate")
376      parser.add_argument("--file", help="File path")
377      parser.add_argument("--repo", help="Repository name")
378      parser.add_argument("--stdin", action="store_true", help="Read from stdin")
379      parser.add_argument("--json", action="store_true", help="Output JSON")
380      args = parser.parse_args()
381  
382      logging.basicConfig(level=logging.INFO)
383      evaluator = ShadowEvaluator()
384  
385      if args.command == "evaluate":
386          if args.stdin:
387              diff = sys.stdin.read()
388          elif args.diff:
389              with open(args.diff) as f:
390                  diff = f.read()
391          else:
392              print("Error: --diff or --stdin required")
393              return 1
394  
395          file_path = args.file or "unknown"
396          result = evaluator.evaluate(diff, file_path, args.repo)
397  
398          if args.json:
399              output = {
400                  "verdict": result.verdict,
401                  "findings": [asdict(f) for f in result.findings],
402                  "metrics": result.metrics,
403              }
404              print(json.dumps(output, indent=2))
405          else:
406              print(format_for_hook(result))
407  
408      elif args.command == "hook-test":
409          # Test hook integration
410          test_input = {
411              "file_path": "/home/devops/working-repos/alphavm/src/vm/executor.rs",
412              "old_string": "fn process()",
413              "new_string": "fn process_transaction()"
414          }
415          result = evaluator.evaluate_tool_result("Edit", test_input, "Success")
416          print(format_for_hook(result))
417  
418  
419  if __name__ == "__main__":
420      main()