ci-report-generator.py
1 #!/usr/bin/env python3 2 """ 3 CI Report Generator 4 Generates structured ci-report.yaml with actionable data for AI remediation 5 6 Output: ci-report.yaml with: 7 - Coverage gaps (file:line) 8 - Mutation survivors (with context) 9 - Security advisories 10 - Build/test errors 11 - Dead code findings 12 """ 13 14 import json 15 import os 16 import subprocess 17 import sys 18 from datetime import datetime 19 from pathlib import Path 20 from typing import Any 21 22 import yaml 23 24 25 def run_cmd(cmd: list[str], cwd: str = None, capture: bool = True) -> tuple[int, str, str]: 26 """Run command and return exit code, stdout, stderr""" 27 try: 28 result = subprocess.run( 29 cmd, 30 cwd=cwd, 31 capture_output=capture, 32 text=True, 33 timeout=300 34 ) 35 return result.returncode, result.stdout, result.stderr 36 except subprocess.TimeoutExpired: 37 return 124, "", "Command timed out" 38 except Exception as e: 39 return 1, "", str(e) 40 41 42 def detect_repo_type(repo_path: str) -> str: 43 """Detect if repo is Rust or TypeScript""" 44 if Path(repo_path, "Cargo.toml").exists(): 45 return "rust" 46 elif Path(repo_path, "package.json").exists(): 47 return "typescript" 48 return "unknown" 49 50 51 def get_coverage_gaps_rust(repo_path: str) -> list[dict]: 52 """Extract coverage gaps from llvm-cov output""" 53 gaps = [] 54 55 # Try to read existing coverage report 56 lcov_path = Path(repo_path, "coverage", "lcov.info") 57 if not lcov_path.exists(): 58 return [{"error": "No coverage report found. Run: just coverage"}] 59 60 # Parse LCOV for uncovered lines 61 current_file = None 62 with open(lcov_path, 'r') as f: 63 for line in f: 64 line = line.strip() 65 if line.startswith("SF:"): 66 current_file = line[3:] 67 elif line.startswith("DA:") and current_file: 68 parts = line[3:].split(",") 69 if len(parts) >= 2 and parts[1] == "0": 70 line_num = int(parts[0]) 71 gaps.append({ 72 "file": current_file, 73 "line": line_num, 74 "type": "uncovered" 75 }) 76 77 # Consolidate into ranges 78 return consolidate_line_ranges(gaps) 79 80 81 def get_coverage_gaps_ts(repo_path: str) -> list[dict]: 82 """Extract coverage gaps from TypeScript coverage""" 83 gaps = [] 84 85 # Try coverage-summary.json (vitest/jest) 86 summary_path = Path(repo_path, "coverage", "coverage-summary.json") 87 if not summary_path.exists(): 88 return [{"error": "No coverage report found. Run: just coverage"}] 89 90 try: 91 with open(summary_path, 'r') as f: 92 data = json.load(f) 93 94 for file_path, metrics in data.items(): 95 if file_path == "total": 96 continue 97 if isinstance(metrics, dict) and "lines" in metrics: 98 if metrics["lines"].get("pct", 100) < 100: 99 gaps.append({ 100 "file": file_path, 101 "lines_covered": metrics["lines"].get("covered", 0), 102 "lines_total": metrics["lines"].get("total", 0), 103 "pct": metrics["lines"].get("pct", 0) 104 }) 105 except Exception as e: 106 return [{"error": f"Failed to parse coverage: {e}"}] 107 108 return gaps 109 110 111 def consolidate_line_ranges(gaps: list[dict]) -> list[dict]: 112 """Convert individual line gaps to ranges for compactness""" 113 if not gaps or "error" in gaps[0]: 114 return gaps 115 116 # Group by file 117 by_file = {} 118 for gap in gaps: 119 file = gap.get("file", "unknown") 120 if file not in by_file: 121 by_file[file] = [] 122 by_file[file].append(gap.get("line", 0)) 123 124 # Convert to ranges 125 result = [] 126 for file, lines in by_file.items(): 127 lines = sorted(set(lines)) 128 ranges = [] 129 start = lines[0] if lines else 0 130 end = start 131 132 for line in lines[1:]: 133 if line == end + 1: 134 end = line 135 else: 136 ranges.append(f"{start}-{end}" if start != end else str(start)) 137 start = end = line 138 139 if lines: 140 ranges.append(f"{start}-{end}" if start != end else str(start)) 141 142 result.append({ 143 "file": file, 144 "uncovered_lines": ", ".join(ranges), 145 "count": len(lines) 146 }) 147 148 # Sort by count descending (most gaps first) 149 return sorted(result, key=lambda x: x.get("count", 0), reverse=True) 150 151 152 def get_mutation_results(repo_path: str) -> dict: 153 """Extract mutation testing results""" 154 mutants_path = Path(repo_path, "mutants.out", "results.json") 155 156 if not mutants_path.exists(): 157 return {"status": "not_run", "message": "No mutation results. Run: just mutants"} 158 159 try: 160 with open(mutants_path, 'r') as f: 161 data = json.load(f) 162 163 outcomes = data.get("outcomes", {}) 164 return { 165 "status": "completed", 166 "killed": outcomes.get("killed", 0), 167 "survived": outcomes.get("survived", 0), 168 "timeout": outcomes.get("timeout", 0), 169 "score": round( 170 outcomes.get("killed", 0) / 171 max(outcomes.get("killed", 0) + outcomes.get("survived", 0), 1) * 100, 172 1 173 ), 174 "survivors": data.get("survivors", [])[:10] # Top 10 survivors 175 } 176 except Exception as e: 177 return {"status": "error", "message": str(e)} 178 179 180 def get_security_advisories(repo_path: str, repo_type: str) -> list[dict]: 181 """Get security advisories from audit""" 182 advisories = [] 183 184 if repo_type == "rust": 185 # Run cargo audit 186 code, stdout, stderr = run_cmd( 187 ["cargo", "audit", "--json"], 188 cwd=repo_path 189 ) 190 if code == 0: 191 try: 192 data = json.loads(stdout) 193 for vuln in data.get("vulnerabilities", {}).get("list", []): 194 advisories.append({ 195 "id": vuln.get("advisory", {}).get("id", "unknown"), 196 "package": vuln.get("package", {}).get("name", "unknown"), 197 "severity": vuln.get("advisory", {}).get("severity", "unknown"), 198 "title": vuln.get("advisory", {}).get("title", "") 199 }) 200 except json.JSONDecodeError: 201 pass 202 else: 203 # Run npm audit 204 code, stdout, stderr = run_cmd( 205 ["npm", "audit", "--json"], 206 cwd=repo_path 207 ) 208 try: 209 data = json.loads(stdout) 210 for vuln_id, vuln in data.get("vulnerabilities", {}).items(): 211 advisories.append({ 212 "id": vuln_id, 213 "package": vuln.get("name", "unknown"), 214 "severity": vuln.get("severity", "unknown"), 215 "title": vuln.get("title", "") 216 }) 217 except json.JSONDecodeError: 218 pass 219 220 return advisories 221 222 223 def get_sbom_vulnerabilities(repo_path: str) -> list[dict]: 224 """Scan SBOM with grype for vulnerabilities""" 225 sbom_path = Path(repo_path, "sbom.json") 226 227 if not sbom_path.exists(): 228 return [{"status": "no_sbom", "message": "No SBOM found. Run: just sbom"}] 229 230 # Run grype scan 231 code, stdout, stderr = run_cmd( 232 ["grype", "sbom:sbom.json", "-o", "json"], 233 cwd=repo_path 234 ) 235 236 if code != 0 and "command not found" in stderr: 237 return [{"status": "grype_not_installed", "message": "Install grype: curl -sSfL https://raw.githubusercontent.com/anchore/grype/main/install.sh | sh"}] 238 239 try: 240 data = json.loads(stdout) 241 vulns = [] 242 for match in data.get("matches", []): 243 vuln = match.get("vulnerability", {}) 244 vulns.append({ 245 "id": vuln.get("id", "unknown"), 246 "severity": vuln.get("severity", "unknown"), 247 "package": match.get("artifact", {}).get("name", "unknown"), 248 "version": match.get("artifact", {}).get("version", "unknown"), 249 "fixed_in": vuln.get("fix", {}).get("versions", []) 250 }) 251 return vulns 252 except json.JSONDecodeError: 253 return [{"status": "parse_error", "message": stderr}] 254 255 256 def get_dead_code(repo_path: str, repo_type: str) -> list[dict]: 257 """Find dead code / unused dependencies""" 258 dead = [] 259 260 if repo_type == "rust": 261 # Run cargo machete 262 code, stdout, stderr = run_cmd( 263 ["cargo", "machete"], 264 cwd=repo_path 265 ) 266 for line in stdout.split("\n"): 267 if "unused" in line.lower(): 268 dead.append({"type": "unused_dep", "detail": line.strip()}) 269 else: 270 # Run knip 271 code, stdout, stderr = run_cmd( 272 ["npx", "knip", "--reporter", "compact"], 273 cwd=repo_path 274 ) 275 for line in stdout.split("\n"): 276 if line.strip(): 277 dead.append({"type": "dead_code", "detail": line.strip()}) 278 279 return dead[:20] # Limit to 20 items 280 281 282 def get_license_violations(repo_path: str) -> list[dict]: 283 """Check license compliance with cargo deny""" 284 deny_toml = Path(repo_path, "deny.toml") 285 286 if not deny_toml.exists(): 287 return [{"status": "not_configured", "message": "No deny.toml found"}] 288 289 code, stdout, stderr = run_cmd( 290 ["cargo", "deny", "check", "licenses", "--format", "json"], 291 cwd=repo_path 292 ) 293 294 violations = [] 295 try: 296 # cargo deny outputs multiple JSON objects, one per line 297 for line in stdout.split("\n"): 298 if line.strip(): 299 data = json.loads(line) 300 if data.get("type") == "error": 301 violations.append({ 302 "package": data.get("fields", {}).get("graphs", [{}])[0].get("name", "unknown"), 303 "license": data.get("fields", {}).get("license", "unknown"), 304 "message": data.get("message", "") 305 }) 306 except json.JSONDecodeError: 307 pass 308 309 return violations 310 311 312 def generate_report(repo_path: str) -> dict: 313 """Generate full CI report for a repository""" 314 repo_type = detect_repo_type(repo_path) 315 repo_name = Path(repo_path).name 316 317 report = { 318 "meta": { 319 "repo": repo_name, 320 "type": repo_type, 321 "generated_at": datetime.utcnow().isoformat() + "Z", 322 "generator": "ci-report-generator v1.0" 323 }, 324 "coverage": { 325 "gaps": get_coverage_gaps_rust(repo_path) if repo_type == "rust" else get_coverage_gaps_ts(repo_path) 326 }, 327 "mutations": get_mutation_results(repo_path), 328 "security": { 329 "advisories": get_security_advisories(repo_path, repo_type), 330 "sbom_vulns": get_sbom_vulnerabilities(repo_path) 331 }, 332 "licenses": get_license_violations(repo_path) if repo_type == "rust" else [], 333 "dead_code": get_dead_code(repo_path, repo_type), 334 "summary": {} 335 } 336 337 # Generate summary 338 coverage_gaps = report["coverage"]["gaps"] 339 total_uncovered = sum(g.get("count", 0) for g in coverage_gaps if isinstance(g, dict) and "count" in g) 340 341 mutations = report["mutations"] 342 mutation_score = mutations.get("score", 0) if mutations.get("status") == "completed" else None 343 344 advisories = report["security"]["advisories"] 345 high_critical = sum(1 for a in advisories if a.get("severity", "").lower() in ["high", "critical"]) 346 347 report["summary"] = { 348 "coverage_gaps_count": total_uncovered, 349 "mutation_score": mutation_score, 350 "security_advisories": len(advisories), 351 "high_critical_vulns": high_critical, 352 "dead_code_items": len(report["dead_code"]), 353 "license_violations": len([l for l in report["licenses"] if isinstance(l, dict) and "package" in l]), 354 "health": "green" if ( 355 total_uncovered < 100 and 356 (mutation_score is None or mutation_score >= 80) and 357 high_critical == 0 358 ) else "yellow" if ( 359 total_uncovered < 500 and 360 (mutation_score is None or mutation_score >= 60) and 361 high_critical < 3 362 ) else "red" 363 } 364 365 return report 366 367 368 def main(): 369 if len(sys.argv) < 2: 370 print("Usage: ci-report-generator.py <repo_path> [output_path]") 371 print("Example: ci-report-generator.py /home/devops/working-repos/alphavm") 372 sys.exit(1) 373 374 repo_path = sys.argv[1] 375 output_path = sys.argv[2] if len(sys.argv) > 2 else os.path.join(repo_path, "ci-report.yaml") 376 377 if not Path(repo_path).exists(): 378 print(f"Error: Repository path does not exist: {repo_path}") 379 sys.exit(1) 380 381 report = generate_report(repo_path) 382 383 with open(output_path, 'w') as f: 384 yaml.dump(report, f, default_flow_style=False, sort_keys=False) 385 386 print(f"Generated: {output_path}") 387 print(f"Health: {report['summary']['health']}") 388 print(f"Coverage gaps: {report['summary']['coverage_gaps_count']}") 389 print(f"Mutation score: {report['summary']['mutation_score']}") 390 print(f"Security advisories: {report['summary']['security_advisories']}") 391 392 393 if __name__ == "__main__": 394 main()