/ scripts / ci-report-generator.py
ci-report-generator.py
  1  #!/usr/bin/env python3
  2  """
  3  CI Report Generator
  4  Generates structured ci-report.yaml with actionable data for AI remediation
  5  
  6  Output: ci-report.yaml with:
  7  - Coverage gaps (file:line)
  8  - Mutation survivors (with context)
  9  - Security advisories
 10  - Build/test errors
 11  - Dead code findings
 12  """
 13  
 14  import json
 15  import os
 16  import subprocess
 17  import sys
 18  from datetime import datetime
 19  from pathlib import Path
 20  from typing import Any
 21  
 22  import yaml
 23  
 24  
 25  def run_cmd(cmd: list[str], cwd: str = None, capture: bool = True) -> tuple[int, str, str]:
 26      """Run command and return exit code, stdout, stderr"""
 27      try:
 28          result = subprocess.run(
 29              cmd,
 30              cwd=cwd,
 31              capture_output=capture,
 32              text=True,
 33              timeout=300
 34          )
 35          return result.returncode, result.stdout, result.stderr
 36      except subprocess.TimeoutExpired:
 37          return 124, "", "Command timed out"
 38      except Exception as e:
 39          return 1, "", str(e)
 40  
 41  
 42  def detect_repo_type(repo_path: str) -> str:
 43      """Detect if repo is Rust or TypeScript"""
 44      if Path(repo_path, "Cargo.toml").exists():
 45          return "rust"
 46      elif Path(repo_path, "package.json").exists():
 47          return "typescript"
 48      return "unknown"
 49  
 50  
 51  def get_coverage_gaps_rust(repo_path: str) -> list[dict]:
 52      """Extract coverage gaps from llvm-cov output"""
 53      gaps = []
 54  
 55      # Try to read existing coverage report
 56      lcov_path = Path(repo_path, "coverage", "lcov.info")
 57      if not lcov_path.exists():
 58          return [{"error": "No coverage report found. Run: just coverage"}]
 59  
 60      # Parse LCOV for uncovered lines
 61      current_file = None
 62      with open(lcov_path, 'r') as f:
 63          for line in f:
 64              line = line.strip()
 65              if line.startswith("SF:"):
 66                  current_file = line[3:]
 67              elif line.startswith("DA:") and current_file:
 68                  parts = line[3:].split(",")
 69                  if len(parts) >= 2 and parts[1] == "0":
 70                      line_num = int(parts[0])
 71                      gaps.append({
 72                          "file": current_file,
 73                          "line": line_num,
 74                          "type": "uncovered"
 75                      })
 76  
 77      # Consolidate into ranges
 78      return consolidate_line_ranges(gaps)
 79  
 80  
 81  def get_coverage_gaps_ts(repo_path: str) -> list[dict]:
 82      """Extract coverage gaps from TypeScript coverage"""
 83      gaps = []
 84  
 85      # Try coverage-summary.json (vitest/jest)
 86      summary_path = Path(repo_path, "coverage", "coverage-summary.json")
 87      if not summary_path.exists():
 88          return [{"error": "No coverage report found. Run: just coverage"}]
 89  
 90      try:
 91          with open(summary_path, 'r') as f:
 92              data = json.load(f)
 93  
 94          for file_path, metrics in data.items():
 95              if file_path == "total":
 96                  continue
 97              if isinstance(metrics, dict) and "lines" in metrics:
 98                  if metrics["lines"].get("pct", 100) < 100:
 99                      gaps.append({
100                          "file": file_path,
101                          "lines_covered": metrics["lines"].get("covered", 0),
102                          "lines_total": metrics["lines"].get("total", 0),
103                          "pct": metrics["lines"].get("pct", 0)
104                      })
105      except Exception as e:
106          return [{"error": f"Failed to parse coverage: {e}"}]
107  
108      return gaps
109  
110  
111  def consolidate_line_ranges(gaps: list[dict]) -> list[dict]:
112      """Convert individual line gaps to ranges for compactness"""
113      if not gaps or "error" in gaps[0]:
114          return gaps
115  
116      # Group by file
117      by_file = {}
118      for gap in gaps:
119          file = gap.get("file", "unknown")
120          if file not in by_file:
121              by_file[file] = []
122          by_file[file].append(gap.get("line", 0))
123  
124      # Convert to ranges
125      result = []
126      for file, lines in by_file.items():
127          lines = sorted(set(lines))
128          ranges = []
129          start = lines[0] if lines else 0
130          end = start
131  
132          for line in lines[1:]:
133              if line == end + 1:
134                  end = line
135              else:
136                  ranges.append(f"{start}-{end}" if start != end else str(start))
137                  start = end = line
138  
139          if lines:
140              ranges.append(f"{start}-{end}" if start != end else str(start))
141  
142          result.append({
143              "file": file,
144              "uncovered_lines": ", ".join(ranges),
145              "count": len(lines)
146          })
147  
148      # Sort by count descending (most gaps first)
149      return sorted(result, key=lambda x: x.get("count", 0), reverse=True)
150  
151  
152  def get_mutation_results(repo_path: str) -> dict:
153      """Extract mutation testing results"""
154      mutants_path = Path(repo_path, "mutants.out", "results.json")
155  
156      if not mutants_path.exists():
157          return {"status": "not_run", "message": "No mutation results. Run: just mutants"}
158  
159      try:
160          with open(mutants_path, 'r') as f:
161              data = json.load(f)
162  
163          outcomes = data.get("outcomes", {})
164          return {
165              "status": "completed",
166              "killed": outcomes.get("killed", 0),
167              "survived": outcomes.get("survived", 0),
168              "timeout": outcomes.get("timeout", 0),
169              "score": round(
170                  outcomes.get("killed", 0) /
171                  max(outcomes.get("killed", 0) + outcomes.get("survived", 0), 1) * 100,
172                  1
173              ),
174              "survivors": data.get("survivors", [])[:10]  # Top 10 survivors
175          }
176      except Exception as e:
177          return {"status": "error", "message": str(e)}
178  
179  
180  def get_security_advisories(repo_path: str, repo_type: str) -> list[dict]:
181      """Get security advisories from audit"""
182      advisories = []
183  
184      if repo_type == "rust":
185          # Run cargo audit
186          code, stdout, stderr = run_cmd(
187              ["cargo", "audit", "--json"],
188              cwd=repo_path
189          )
190          if code == 0:
191              try:
192                  data = json.loads(stdout)
193                  for vuln in data.get("vulnerabilities", {}).get("list", []):
194                      advisories.append({
195                          "id": vuln.get("advisory", {}).get("id", "unknown"),
196                          "package": vuln.get("package", {}).get("name", "unknown"),
197                          "severity": vuln.get("advisory", {}).get("severity", "unknown"),
198                          "title": vuln.get("advisory", {}).get("title", "")
199                      })
200              except json.JSONDecodeError:
201                  pass
202      else:
203          # Run npm audit
204          code, stdout, stderr = run_cmd(
205              ["npm", "audit", "--json"],
206              cwd=repo_path
207          )
208          try:
209              data = json.loads(stdout)
210              for vuln_id, vuln in data.get("vulnerabilities", {}).items():
211                  advisories.append({
212                      "id": vuln_id,
213                      "package": vuln.get("name", "unknown"),
214                      "severity": vuln.get("severity", "unknown"),
215                      "title": vuln.get("title", "")
216                  })
217          except json.JSONDecodeError:
218              pass
219  
220      return advisories
221  
222  
223  def get_sbom_vulnerabilities(repo_path: str) -> list[dict]:
224      """Scan SBOM with grype for vulnerabilities"""
225      sbom_path = Path(repo_path, "sbom.json")
226  
227      if not sbom_path.exists():
228          return [{"status": "no_sbom", "message": "No SBOM found. Run: just sbom"}]
229  
230      # Run grype scan
231      code, stdout, stderr = run_cmd(
232          ["grype", "sbom:sbom.json", "-o", "json"],
233          cwd=repo_path
234      )
235  
236      if code != 0 and "command not found" in stderr:
237          return [{"status": "grype_not_installed", "message": "Install grype: curl -sSfL https://raw.githubusercontent.com/anchore/grype/main/install.sh | sh"}]
238  
239      try:
240          data = json.loads(stdout)
241          vulns = []
242          for match in data.get("matches", []):
243              vuln = match.get("vulnerability", {})
244              vulns.append({
245                  "id": vuln.get("id", "unknown"),
246                  "severity": vuln.get("severity", "unknown"),
247                  "package": match.get("artifact", {}).get("name", "unknown"),
248                  "version": match.get("artifact", {}).get("version", "unknown"),
249                  "fixed_in": vuln.get("fix", {}).get("versions", [])
250              })
251          return vulns
252      except json.JSONDecodeError:
253          return [{"status": "parse_error", "message": stderr}]
254  
255  
256  def get_dead_code(repo_path: str, repo_type: str) -> list[dict]:
257      """Find dead code / unused dependencies"""
258      dead = []
259  
260      if repo_type == "rust":
261          # Run cargo machete
262          code, stdout, stderr = run_cmd(
263              ["cargo", "machete"],
264              cwd=repo_path
265          )
266          for line in stdout.split("\n"):
267              if "unused" in line.lower():
268                  dead.append({"type": "unused_dep", "detail": line.strip()})
269      else:
270          # Run knip
271          code, stdout, stderr = run_cmd(
272              ["npx", "knip", "--reporter", "compact"],
273              cwd=repo_path
274          )
275          for line in stdout.split("\n"):
276              if line.strip():
277                  dead.append({"type": "dead_code", "detail": line.strip()})
278  
279      return dead[:20]  # Limit to 20 items
280  
281  
282  def get_license_violations(repo_path: str) -> list[dict]:
283      """Check license compliance with cargo deny"""
284      deny_toml = Path(repo_path, "deny.toml")
285  
286      if not deny_toml.exists():
287          return [{"status": "not_configured", "message": "No deny.toml found"}]
288  
289      code, stdout, stderr = run_cmd(
290          ["cargo", "deny", "check", "licenses", "--format", "json"],
291          cwd=repo_path
292      )
293  
294      violations = []
295      try:
296          # cargo deny outputs multiple JSON objects, one per line
297          for line in stdout.split("\n"):
298              if line.strip():
299                  data = json.loads(line)
300                  if data.get("type") == "error":
301                      violations.append({
302                          "package": data.get("fields", {}).get("graphs", [{}])[0].get("name", "unknown"),
303                          "license": data.get("fields", {}).get("license", "unknown"),
304                          "message": data.get("message", "")
305                      })
306      except json.JSONDecodeError:
307          pass
308  
309      return violations
310  
311  
312  def generate_report(repo_path: str) -> dict:
313      """Generate full CI report for a repository"""
314      repo_type = detect_repo_type(repo_path)
315      repo_name = Path(repo_path).name
316  
317      report = {
318          "meta": {
319              "repo": repo_name,
320              "type": repo_type,
321              "generated_at": datetime.utcnow().isoformat() + "Z",
322              "generator": "ci-report-generator v1.0"
323          },
324          "coverage": {
325              "gaps": get_coverage_gaps_rust(repo_path) if repo_type == "rust" else get_coverage_gaps_ts(repo_path)
326          },
327          "mutations": get_mutation_results(repo_path),
328          "security": {
329              "advisories": get_security_advisories(repo_path, repo_type),
330              "sbom_vulns": get_sbom_vulnerabilities(repo_path)
331          },
332          "licenses": get_license_violations(repo_path) if repo_type == "rust" else [],
333          "dead_code": get_dead_code(repo_path, repo_type),
334          "summary": {}
335      }
336  
337      # Generate summary
338      coverage_gaps = report["coverage"]["gaps"]
339      total_uncovered = sum(g.get("count", 0) for g in coverage_gaps if isinstance(g, dict) and "count" in g)
340  
341      mutations = report["mutations"]
342      mutation_score = mutations.get("score", 0) if mutations.get("status") == "completed" else None
343  
344      advisories = report["security"]["advisories"]
345      high_critical = sum(1 for a in advisories if a.get("severity", "").lower() in ["high", "critical"])
346  
347      report["summary"] = {
348          "coverage_gaps_count": total_uncovered,
349          "mutation_score": mutation_score,
350          "security_advisories": len(advisories),
351          "high_critical_vulns": high_critical,
352          "dead_code_items": len(report["dead_code"]),
353          "license_violations": len([l for l in report["licenses"] if isinstance(l, dict) and "package" in l]),
354          "health": "green" if (
355              total_uncovered < 100 and
356              (mutation_score is None or mutation_score >= 80) and
357              high_critical == 0
358          ) else "yellow" if (
359              total_uncovered < 500 and
360              (mutation_score is None or mutation_score >= 60) and
361              high_critical < 3
362          ) else "red"
363      }
364  
365      return report
366  
367  
368  def main():
369      if len(sys.argv) < 2:
370          print("Usage: ci-report-generator.py <repo_path> [output_path]")
371          print("Example: ci-report-generator.py /home/devops/working-repos/alphavm")
372          sys.exit(1)
373  
374      repo_path = sys.argv[1]
375      output_path = sys.argv[2] if len(sys.argv) > 2 else os.path.join(repo_path, "ci-report.yaml")
376  
377      if not Path(repo_path).exists():
378          print(f"Error: Repository path does not exist: {repo_path}")
379          sys.exit(1)
380  
381      report = generate_report(repo_path)
382  
383      with open(output_path, 'w') as f:
384          yaml.dump(report, f, default_flow_style=False, sort_keys=False)
385  
386      print(f"Generated: {output_path}")
387      print(f"Health: {report['summary']['health']}")
388      print(f"Coverage gaps: {report['summary']['coverage_gaps_count']}")
389      print(f"Mutation score: {report['summary']['mutation_score']}")
390      print(f"Security advisories: {report['summary']['security_advisories']}")
391  
392  
393  if __name__ == "__main__":
394      main()