verifier.py
1 """ 2 Verifier Module 3 4 Spawns AI agents to verify code matches specifications. 5 Collects and aggregates findings. 6 """ 7 8 from dataclasses import dataclass, field 9 from datetime import datetime 10 from pathlib import Path 11 from typing import Optional 12 import re 13 import subprocess 14 15 try: 16 from . import config 17 from .providers.base import AIProvider, Finding 18 from .providers import get_provider 19 except ImportError: 20 import config 21 from providers.base import AIProvider, Finding 22 from providers import get_provider 23 24 25 @dataclass 26 class VerificationResult: 27 """Result from verifying a repo/component.""" 28 repo: str 29 component: str 30 verified_by: str 31 timestamp: datetime 32 findings: list[Finding] = field(default_factory=list) 33 error: Optional[str] = None 34 35 36 def verify_repo(repo: str, context: str, components: list[str]) -> list[VerificationResult]: 37 """ 38 Verify a repository against its specifications. 39 40 Args: 41 repo: Repository name 42 context: Pre-built context string 43 components: List of component IDs to verify 44 45 Returns: 46 List of verification results 47 """ 48 provider = get_provider() 49 results = [] 50 51 for component in components: 52 result = verify_component(repo, component, context, provider) 53 results.append(result) 54 55 return results 56 57 58 def verify_component(repo: str, component: str, context: str, provider: AIProvider) -> VerificationResult: 59 """ 60 Verify a single component. 61 62 Args: 63 repo: Repository name 64 component: Component ID 65 context: Context string 66 provider: AI provider instance 67 68 Returns: 69 VerificationResult with findings 70 """ 71 result = VerificationResult( 72 repo=repo, 73 component=component, 74 verified_by=provider.name, 75 timestamp=datetime.utcnow(), 76 ) 77 78 # Run each verification type 79 for verification_type in config.VERIFICATION_TYPES: 80 try: 81 findings = run_verification( 82 repo=repo, 83 component=component, 84 context=context, 85 verification_type=verification_type, 86 provider=provider, 87 ) 88 result.findings.extend(findings) 89 except Exception as e: 90 result.error = str(e) 91 92 return result 93 94 95 def run_verification( 96 repo: str, 97 component: str, 98 context: str, 99 verification_type: str, 100 provider: AIProvider 101 ) -> list[Finding]: 102 """ 103 Run a specific verification type. 104 105 Args: 106 repo: Repository name 107 component: Component ID 108 context: Context string 109 verification_type: Type of verification (threshold, modules, etc.) 110 provider: AI provider instance 111 112 Returns: 113 List of findings 114 """ 115 # Load verification prompt 116 prompt = load_prompt(verification_type) 117 if not prompt: 118 return [] 119 120 # For test_count, use local verification instead of AI 121 if verification_type == "test_count": 122 return verify_test_count_local(repo, component, context) 123 124 # For paths, use local verification 125 if verification_type == "paths": 126 return verify_paths_local(repo, component, context) 127 128 # Run AI verification 129 findings = provider.verify(context, prompt, repo, component) 130 return findings 131 132 133 def load_prompt(verification_type: str) -> Optional[str]: 134 """Load a verification prompt template.""" 135 prompt_path = Path(__file__).parent / "prompts" / f"{verification_type}.txt" 136 137 if not prompt_path.exists(): 138 return None 139 140 with open(prompt_path, 'r') as f: 141 return f.read() 142 143 144 def verify_test_count_local(repo: str, component: str, context: str) -> list[Finding]: 145 """ 146 Locally verify test counts match documentation. 147 148 Counts #[test] attributes in Rust files and compares to documented count. 149 """ 150 findings = [] 151 152 # Extract documented test count from context 153 documented_count = extract_test_count_from_context(context, component) 154 if documented_count is None: 155 return findings 156 157 # Count actual tests in repo 158 repo_path = Path(config.REPOS_BASE_PATH) / repo 159 if not repo_path.exists(): 160 return findings 161 162 actual_count = count_tests_in_repo(repo_path, component) 163 164 if actual_count != documented_count: 165 findings.append(Finding( 166 component=component, 167 severity=config.SEVERITY_MEDIUM, 168 finding_type="test_count_mismatch", 169 spec_file=f"status-components.cspec", 170 code_file=f"{repo}/", 171 spec_value=f"tests: {documented_count}", 172 code_value=f"actual tests: {actual_count}", 173 )) 174 175 return findings 176 177 178 def extract_test_count_from_context(context: str, component: str) -> Optional[int]: 179 """Extract documented test count from context string.""" 180 # Look for pattern like "tests: 123" 181 pattern = rf'{component}.*?tests:\s*(\d+)' 182 match = re.search(pattern, context, re.DOTALL | re.IGNORECASE) 183 if match: 184 return int(match.group(1)) 185 return None 186 187 188 def count_tests_in_repo(repo_path: Path, component: str) -> int: 189 """ 190 Count #[test] and #[tokio::test] attributes in Rust files. 191 192 Args: 193 repo_path: Path to repository 194 component: Component ID (used to narrow search) 195 196 Returns: 197 Count of test functions 198 """ 199 try: 200 # Use grep to count test attributes 201 result = subprocess.run( 202 ['grep', '-r', '-c', r'#\[test\]', str(repo_path)], 203 capture_output=True, 204 text=True, 205 ) 206 207 total = 0 208 for line in result.stdout.strip().split('\n'): 209 if ':' in line: 210 count = line.split(':')[-1] 211 if count.isdigit(): 212 total += int(count) 213 214 # Also count tokio::test 215 result2 = subprocess.run( 216 ['grep', '-r', '-c', r'#\[tokio::test\]', str(repo_path)], 217 capture_output=True, 218 text=True, 219 ) 220 221 for line in result2.stdout.strip().split('\n'): 222 if ':' in line: 223 count = line.split(':')[-1] 224 if count.isdigit(): 225 total += int(count) 226 227 return total 228 229 except Exception: 230 return 0 231 232 233 def verify_paths_local(repo: str, component: str, context: str) -> list[Finding]: 234 """ 235 Locally verify documented paths exist. 236 237 Checks that paths mentioned in specs actually exist in the repo. 238 """ 239 findings = [] 240 241 # Extract paths from context 242 paths = extract_paths_from_context(context, component) 243 244 repo_path = Path(config.REPOS_BASE_PATH) / repo 245 if not repo_path.exists(): 246 return findings 247 248 for documented_path in paths: 249 full_path = repo_path / documented_path 250 if not full_path.exists(): 251 findings.append(Finding( 252 component=component, 253 severity=config.SEVERITY_HIGH, 254 finding_type="missing_path", 255 spec_file=f"repo-mapping.cspec", 256 code_file=f"{repo}/{documented_path}", 257 spec_value=f"path: {documented_path}", 258 code_value="does not exist", 259 )) 260 261 return findings 262 263 264 def extract_paths_from_context(context: str, component: str) -> list[str]: 265 """Extract documented paths from context string.""" 266 paths = [] 267 268 # Look for paths section 269 in_paths_section = False 270 for line in context.split('\n'): 271 if 'paths:' in line.lower(): 272 in_paths_section = True 273 continue 274 275 if in_paths_section: 276 if line.strip().startswith('-'): 277 path = line.strip().lstrip('- ').strip() 278 if path and not path.startswith('#'): 279 paths.append(path) 280 elif line.strip() and not line.strip().startswith('#') and ':' not in line: 281 # Multi-value path on same line 282 path = line.strip() 283 if ',' in path: 284 paths.extend([p.strip() for p in path.split(',')]) 285 else: 286 paths.append(path) 287 elif line.strip() == '' or (line.strip() and not line.startswith(' ')): 288 in_paths_section = False 289 290 return paths