xtest_viz.py
1 # /// script 2 # dependencies = [ 3 # "aiohttp", 4 # ] 5 # /// 6 """ 7 Script to visualize cross-version test results for MLflow autologging and models. 8 9 This script fetches scheduled workflow run results from GitHub Actions and generates 10 a markdown table showing the test status for different package versions across 11 different dates. 12 13 Usage: 14 uv run dev/xtest_viz.py # Fetch last 14 days from mlflow/dev 15 uv run dev/xtest_viz.py --days 30 # Fetch last 30 days 16 uv run dev/xtest_viz.py --repo mlflow/mlflow # Use different repo 17 18 Example output (truncated for brevity): 19 | Name | 2024-01-15 | 2024-01-14 | 2024-01-13 | 20 |----------------------------------------|------------|------------|------------| 21 | test1 (sklearn, 1.3.1, autologging...) | [✅](link) | [✅](link) | [❌](link) | 22 | test1 (pytorch, 2.1.0, models...) | [✅](link) | [⚠️](link) | [✅](link) | 23 | test2 (xgboost, 2.0.0, autologging...) | [❌](link) | [✅](link) | — | 24 25 Where: 26 ✅ = success 27 ❌ = failure 28 ⚠️ = cancelled 29 ❓ = unknown status 30 — = no data 31 """ 32 33 import argparse 34 import asyncio 35 import os 36 import re 37 import sys 38 from dataclasses import dataclass 39 from datetime import datetime, timedelta 40 from typing import Any, cast 41 42 import aiohttp 43 44 45 @dataclass 46 class JobResult: 47 name: str 48 date: str 49 status: str 50 51 52 class XTestViz: 53 def __init__(self, github_token: str | None = None, repo: str = "mlflow/dev"): 54 self.github_token = github_token or os.environ.get("GH_TOKEN") 55 self.repo = repo 56 self.per_page = 30 57 self.headers: dict[str, str] = {} 58 if self.github_token: 59 self.headers["Authorization"] = f"token {self.github_token}" 60 self.headers["Accept"] = "application/vnd.github.v3+json" 61 62 def status_to_emoji(self, status: str) -> str | None: 63 """Convert job status to emoji representation. 64 65 Returns None for skipped status to indicate it should be filtered out. 66 """ 67 match status: 68 case "success": 69 return "✅" 70 case "failure": 71 return "❌" 72 case "cancelled": 73 return "⚠️" 74 case "skipped": 75 return None 76 case _: 77 return "❓" 78 79 def parse_job_name(self, job_name: str) -> str: 80 """Extract string inside parentheses from job name. 81 82 Examples: 83 - "test1 (sklearn / autologging / 1.3.1)" -> "sklearn / autologging / 1.3.1" 84 - "test2 (pytorch / models / 2.1.0)" -> "pytorch / models / 2.1.0" 85 86 Returns: 87 str: Content inside parentheses, or original name if no parentheses found 88 """ 89 # Pattern to match: anything (content) 90 pattern = r"\(([^)]+)\)" 91 if match := re.search(pattern, job_name.strip()): 92 return match.group(1).strip() 93 94 return job_name 95 96 async def _make_request( 97 self, 98 session: aiohttp.ClientSession, 99 url: str, 100 params: dict[str, str] | None = None, 101 ) -> dict[str, Any]: 102 """Make an async HTTP GET request and return JSON response.""" 103 async with session.get(url, headers=self.headers, params=params) as response: 104 response.raise_for_status() 105 return cast(dict[str, Any], await response.json()) 106 107 async def get_workflow_runs( 108 self, session: aiohttp.ClientSession, days_back: int = 30 109 ) -> list[dict[str, Any]]: 110 """Fetch cross-version test workflow runs from the last N days.""" 111 since_date = (datetime.now() - timedelta(days=days_back)).isoformat() 112 113 print(f"Fetching scheduled workflow runs from last {days_back} days...", file=sys.stderr) 114 115 all_runs: list[dict[str, Any]] = [] 116 page = 1 117 118 while True: 119 params = { 120 "per_page": str(self.per_page), 121 "page": str(page), 122 "created": f">={since_date}", 123 "status": "completed", 124 "event": "schedule", 125 } 126 url = f"https://api.github.com/repos/{self.repo}/actions/workflows/cross-version-tests.yml/runs" 127 128 data = await self._make_request(session, url, params=params) 129 runs = data.get("workflow_runs", []) 130 131 if not runs: 132 break 133 134 all_runs.extend(runs) 135 136 print(f" Fetched page {page} ({len(runs)} runs)", file=sys.stderr) 137 138 if len(runs) < self.per_page: 139 break 140 141 page += 1 142 143 print(f"Found {len(all_runs)} scheduled workflow runs total", file=sys.stderr) 144 145 return all_runs 146 147 async def get_workflow_jobs( 148 self, session: aiohttp.ClientSession, run_id: int 149 ) -> list[dict[str, Any]]: 150 """Get jobs for a specific workflow run.""" 151 all_jobs: list[dict[str, Any]] = [] 152 page = 1 153 154 while True: 155 params = {"per_page": str(self.per_page), "page": str(page)} 156 url = f"https://api.github.com/repos/{self.repo}/actions/runs/{run_id}/jobs" 157 158 data = await self._make_request(session, url, params=params) 159 jobs = data.get("jobs", []) 160 161 if not jobs: 162 break 163 164 all_jobs.extend(jobs) 165 166 if len(jobs) < self.per_page: 167 break 168 169 page += 1 170 171 return all_jobs 172 173 async def _fetch_run_jobs( 174 self, session: aiohttp.ClientSession, run: dict[str, Any] 175 ) -> list[JobResult]: 176 """Fetch jobs for a single workflow run.""" 177 run_id = run["id"] 178 run_date = datetime.fromisoformat(run["created_at"].replace("Z", "+00:00")).strftime( 179 "%m/%d" 180 ) 181 182 jobs = await self.get_workflow_jobs(session, run_id) 183 data_rows = [] 184 185 for job in jobs: 186 emoji = self.status_to_emoji(job["conclusion"]) 187 if emoji is None: # Skip this job 188 continue 189 190 job_url = job["html_url"] 191 status_link = f"[{emoji}]({job_url})" 192 193 parsed_name = self.parse_job_name(job["name"]) 194 195 data_rows.append( 196 JobResult( 197 name=parsed_name, 198 date=run_date, 199 status=status_link, 200 ) 201 ) 202 203 return data_rows 204 205 async def fetch_all_jobs(self, days_back: int = 30) -> list[JobResult]: 206 """Fetch all jobs from workflow runs in the specified time period.""" 207 async with aiohttp.ClientSession() as session: 208 workflow_runs = await self.get_workflow_runs(session, days_back) 209 210 if not workflow_runs: 211 return [] 212 213 print( 214 f"Fetching jobs for {len(workflow_runs)} workflow runs concurrently...", 215 file=sys.stderr, 216 ) 217 218 tasks = [self._fetch_run_jobs(session, run) for run in workflow_runs] 219 220 results = await asyncio.gather(*tasks, return_exceptions=True) 221 data_rows: list[JobResult] = [] 222 223 for i, result in enumerate(results, 1): 224 if isinstance(result, BaseException): 225 print(f" Error fetching jobs for run {i}: {result}", file=sys.stderr) 226 else: 227 data_rows.extend(result) 228 print( 229 f" Completed {i}/{len(workflow_runs)} ({len(result)} jobs)", 230 file=sys.stderr, 231 ) 232 233 return data_rows 234 235 def _pivot_job_results( 236 self, data_rows: list[JobResult] 237 ) -> tuple[dict[str, dict[str, str]], list[str], list[str]]: 238 """Pivot job results data into a format suitable for table rendering. 239 240 Args: 241 data_rows: List of job results to pivot 242 243 Returns: 244 Tuple of (pivot_data, sorted_dates, sorted_names) where: 245 - pivot_data: Dictionary mapping name -> date -> status 246 - sorted_dates: List of dates sorted in reverse chronological order 247 - sorted_names: List of test names sorted alphabetically 248 """ 249 pivot_data: dict[str, dict[str, str]] = {} 250 all_dates: set[str] = set() 251 252 for row in data_rows: 253 if row.name not in pivot_data: 254 pivot_data[row.name] = {} 255 # Use first occurrence for each name-date combination 256 if row.date not in pivot_data[row.name]: 257 pivot_data[row.name][row.date] = row.status 258 all_dates.add(row.date) 259 260 # Sort dates in reverse order (newest first) 261 sorted_dates = sorted(all_dates, reverse=True) 262 263 # Sort names alphabetically 264 sorted_names = sorted(pivot_data.keys()) 265 266 return pivot_data, sorted_dates, sorted_names 267 268 def _build_markdown_table( 269 self, 270 pivot_data: dict[str, dict[str, str]], 271 sorted_dates: list[str], 272 sorted_names: list[str], 273 ) -> str: 274 """Build a markdown table from pivoted data. 275 276 Args: 277 pivot_data: Dictionary mapping name -> date -> status 278 sorted_dates: List of dates (columns) in desired order 279 sorted_names: List of test names (rows) in desired order 280 281 Returns: 282 Markdown-formatted table as a string 283 """ 284 headers = ["Name"] + sorted_dates 285 286 # Calculate column widths 287 col_widths = [len(h) for h in headers] 288 for name in sorted_names: 289 col_widths[0] = max(col_widths[0], len(name)) 290 for i, date in enumerate(sorted_dates, 1): 291 value = pivot_data[name].get(date, "—") 292 col_widths[i] = max(col_widths[i], len(value)) 293 294 # Build table rows 295 lines = [] 296 297 # Header row 298 header_row = "| " + " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) + " |" 299 lines.append(header_row) 300 301 # Separator row 302 separator = "| " + " | ".join("-" * w for w in col_widths) + " |" 303 lines.append(separator) 304 305 # Data rows 306 for name in sorted_names: 307 row_values = [name.ljust(col_widths[0])] 308 for i, date in enumerate(sorted_dates, 1): 309 value = pivot_data[name].get(date, "—") 310 row_values.append(value.ljust(col_widths[i])) 311 lines.append("| " + " | ".join(row_values) + " |") 312 313 return "\n".join(lines) 314 315 def render_results_table(self, data_rows: list[JobResult]) -> str: 316 """Render job data as a markdown table.""" 317 if not data_rows: 318 return "No test jobs found." 319 320 pivot_data, sorted_dates, sorted_names = self._pivot_job_results(data_rows) 321 return self._build_markdown_table(pivot_data, sorted_dates, sorted_names) 322 323 async def generate_results_table(self, days_back: int = 30) -> str: 324 """Generate markdown table of cross-version test results.""" 325 data_rows = await self.fetch_all_jobs(days_back) 326 if not data_rows: 327 return "No workflow runs found in the specified time period." 328 return self.render_results_table(data_rows) 329 330 331 async def main() -> None: 332 parser = argparse.ArgumentParser(description="Visualize MLflow cross-version test results") 333 parser.add_argument( 334 "--days", type=int, default=14, help="Number of days back to fetch results (default: 14)" 335 ) 336 parser.add_argument( 337 "--repo", 338 default="mlflow/dev", 339 help="GitHub repository in owner/repo format (default: mlflow/dev)", 340 ) 341 parser.add_argument("--token", help="GitHub token (default: use GH_TOKEN env var)") 342 343 args = parser.parse_args() 344 345 token = args.token or os.environ.get("GH_TOKEN") 346 if not token: 347 print( 348 "Warning: No GitHub token provided. API requests may be rate-limited.", file=sys.stderr 349 ) 350 print("Set GH_TOKEN environment variable or use --token option.", file=sys.stderr) 351 352 visualizer = XTestViz(github_token=token, repo=args.repo) 353 output = await visualizer.generate_results_table(args.days) 354 print(output) 355 356 357 if __name__ == "__main__": 358 asyncio.run(main())