/ dev / xtest_viz.py
xtest_viz.py
  1  # /// script
  2  # dependencies = [
  3  #     "aiohttp",
  4  # ]
  5  # ///
  6  """
  7  Script to visualize cross-version test results for MLflow autologging and models.
  8  
  9  This script fetches scheduled workflow run results from GitHub Actions and generates
 10  a markdown table showing the test status for different package versions across
 11  different dates.
 12  
 13  Usage:
 14      uv run dev/xtest_viz.py                      # Fetch last 14 days from mlflow/dev
 15      uv run dev/xtest_viz.py --days 30            # Fetch last 30 days
 16      uv run dev/xtest_viz.py --repo mlflow/mlflow  # Use different repo
 17  
 18  Example output (truncated for brevity):
 19      | Name                                   | 2024-01-15 | 2024-01-14 | 2024-01-13 |
 20      |----------------------------------------|------------|------------|------------|
 21      | test1 (sklearn, 1.3.1, autologging...) | [✅](link) | [✅](link) | [❌](link) |
 22      | test1 (pytorch, 2.1.0, models...)      | [✅](link) | [⚠️](link) | [✅](link) |
 23      | test2 (xgboost, 2.0.0, autologging...) | [❌](link) | [✅](link) | —          |
 24  
 25  Where:
 26      ✅ = success
 27      ❌ = failure
 28      ⚠️ = cancelled
 29      ❓ = unknown status
 30      — = no data
 31  """
 32  
 33  import argparse
 34  import asyncio
 35  import os
 36  import re
 37  import sys
 38  from dataclasses import dataclass
 39  from datetime import datetime, timedelta
 40  from typing import Any, cast
 41  
 42  import aiohttp
 43  
 44  
 45  @dataclass
 46  class JobResult:
 47      name: str
 48      date: str
 49      status: str
 50  
 51  
 52  class XTestViz:
 53      def __init__(self, github_token: str | None = None, repo: str = "mlflow/dev"):
 54          self.github_token = github_token or os.environ.get("GH_TOKEN")
 55          self.repo = repo
 56          self.per_page = 30
 57          self.headers: dict[str, str] = {}
 58          if self.github_token:
 59              self.headers["Authorization"] = f"token {self.github_token}"
 60              self.headers["Accept"] = "application/vnd.github.v3+json"
 61  
 62      def status_to_emoji(self, status: str) -> str | None:
 63          """Convert job status to emoji representation.
 64  
 65          Returns None for skipped status to indicate it should be filtered out.
 66          """
 67          match status:
 68              case "success":
 69                  return "✅"
 70              case "failure":
 71                  return "❌"
 72              case "cancelled":
 73                  return "⚠️"
 74              case "skipped":
 75                  return None
 76              case _:
 77                  return "❓"
 78  
 79      def parse_job_name(self, job_name: str) -> str:
 80          """Extract string inside parentheses from job name.
 81  
 82          Examples:
 83          - "test1 (sklearn / autologging / 1.3.1)" -> "sklearn / autologging / 1.3.1"
 84          - "test2 (pytorch / models / 2.1.0)" -> "pytorch / models / 2.1.0"
 85  
 86          Returns:
 87              str: Content inside parentheses, or original name if no parentheses found
 88          """
 89          # Pattern to match: anything (content)
 90          pattern = r"\(([^)]+)\)"
 91          if match := re.search(pattern, job_name.strip()):
 92              return match.group(1).strip()
 93  
 94          return job_name
 95  
 96      async def _make_request(
 97          self,
 98          session: aiohttp.ClientSession,
 99          url: str,
100          params: dict[str, str] | None = None,
101      ) -> dict[str, Any]:
102          """Make an async HTTP GET request and return JSON response."""
103          async with session.get(url, headers=self.headers, params=params) as response:
104              response.raise_for_status()
105              return cast(dict[str, Any], await response.json())
106  
107      async def get_workflow_runs(
108          self, session: aiohttp.ClientSession, days_back: int = 30
109      ) -> list[dict[str, Any]]:
110          """Fetch cross-version test workflow runs from the last N days."""
111          since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
112  
113          print(f"Fetching scheduled workflow runs from last {days_back} days...", file=sys.stderr)
114  
115          all_runs: list[dict[str, Any]] = []
116          page = 1
117  
118          while True:
119              params = {
120                  "per_page": str(self.per_page),
121                  "page": str(page),
122                  "created": f">={since_date}",
123                  "status": "completed",
124                  "event": "schedule",
125              }
126              url = f"https://api.github.com/repos/{self.repo}/actions/workflows/cross-version-tests.yml/runs"
127  
128              data = await self._make_request(session, url, params=params)
129              runs = data.get("workflow_runs", [])
130  
131              if not runs:
132                  break
133  
134              all_runs.extend(runs)
135  
136              print(f"  Fetched page {page} ({len(runs)} runs)", file=sys.stderr)
137  
138              if len(runs) < self.per_page:
139                  break
140  
141              page += 1
142  
143          print(f"Found {len(all_runs)} scheduled workflow runs total", file=sys.stderr)
144  
145          return all_runs
146  
147      async def get_workflow_jobs(
148          self, session: aiohttp.ClientSession, run_id: int
149      ) -> list[dict[str, Any]]:
150          """Get jobs for a specific workflow run."""
151          all_jobs: list[dict[str, Any]] = []
152          page = 1
153  
154          while True:
155              params = {"per_page": str(self.per_page), "page": str(page)}
156              url = f"https://api.github.com/repos/{self.repo}/actions/runs/{run_id}/jobs"
157  
158              data = await self._make_request(session, url, params=params)
159              jobs = data.get("jobs", [])
160  
161              if not jobs:
162                  break
163  
164              all_jobs.extend(jobs)
165  
166              if len(jobs) < self.per_page:
167                  break
168  
169              page += 1
170  
171          return all_jobs
172  
173      async def _fetch_run_jobs(
174          self, session: aiohttp.ClientSession, run: dict[str, Any]
175      ) -> list[JobResult]:
176          """Fetch jobs for a single workflow run."""
177          run_id = run["id"]
178          run_date = datetime.fromisoformat(run["created_at"].replace("Z", "+00:00")).strftime(
179              "%m/%d"
180          )
181  
182          jobs = await self.get_workflow_jobs(session, run_id)
183          data_rows = []
184  
185          for job in jobs:
186              emoji = self.status_to_emoji(job["conclusion"])
187              if emoji is None:  # Skip this job
188                  continue
189  
190              job_url = job["html_url"]
191              status_link = f"[{emoji}]({job_url})"
192  
193              parsed_name = self.parse_job_name(job["name"])
194  
195              data_rows.append(
196                  JobResult(
197                      name=parsed_name,
198                      date=run_date,
199                      status=status_link,
200                  )
201              )
202  
203          return data_rows
204  
205      async def fetch_all_jobs(self, days_back: int = 30) -> list[JobResult]:
206          """Fetch all jobs from workflow runs in the specified time period."""
207          async with aiohttp.ClientSession() as session:
208              workflow_runs = await self.get_workflow_runs(session, days_back)
209  
210              if not workflow_runs:
211                  return []
212  
213              print(
214                  f"Fetching jobs for {len(workflow_runs)} workflow runs concurrently...",
215                  file=sys.stderr,
216              )
217  
218              tasks = [self._fetch_run_jobs(session, run) for run in workflow_runs]
219  
220              results = await asyncio.gather(*tasks, return_exceptions=True)
221              data_rows: list[JobResult] = []
222  
223              for i, result in enumerate(results, 1):
224                  if isinstance(result, BaseException):
225                      print(f"  Error fetching jobs for run {i}: {result}", file=sys.stderr)
226                  else:
227                      data_rows.extend(result)
228                      print(
229                          f"  Completed {i}/{len(workflow_runs)} ({len(result)} jobs)",
230                          file=sys.stderr,
231                      )
232  
233              return data_rows
234  
235      def _pivot_job_results(
236          self, data_rows: list[JobResult]
237      ) -> tuple[dict[str, dict[str, str]], list[str], list[str]]:
238          """Pivot job results data into a format suitable for table rendering.
239  
240          Args:
241              data_rows: List of job results to pivot
242  
243          Returns:
244              Tuple of (pivot_data, sorted_dates, sorted_names) where:
245              - pivot_data: Dictionary mapping name -> date -> status
246              - sorted_dates: List of dates sorted in reverse chronological order
247              - sorted_names: List of test names sorted alphabetically
248          """
249          pivot_data: dict[str, dict[str, str]] = {}
250          all_dates: set[str] = set()
251  
252          for row in data_rows:
253              if row.name not in pivot_data:
254                  pivot_data[row.name] = {}
255              # Use first occurrence for each name-date combination
256              if row.date not in pivot_data[row.name]:
257                  pivot_data[row.name][row.date] = row.status
258              all_dates.add(row.date)
259  
260          # Sort dates in reverse order (newest first)
261          sorted_dates = sorted(all_dates, reverse=True)
262  
263          # Sort names alphabetically
264          sorted_names = sorted(pivot_data.keys())
265  
266          return pivot_data, sorted_dates, sorted_names
267  
268      def _build_markdown_table(
269          self,
270          pivot_data: dict[str, dict[str, str]],
271          sorted_dates: list[str],
272          sorted_names: list[str],
273      ) -> str:
274          """Build a markdown table from pivoted data.
275  
276          Args:
277              pivot_data: Dictionary mapping name -> date -> status
278              sorted_dates: List of dates (columns) in desired order
279              sorted_names: List of test names (rows) in desired order
280  
281          Returns:
282              Markdown-formatted table as a string
283          """
284          headers = ["Name"] + sorted_dates
285  
286          # Calculate column widths
287          col_widths = [len(h) for h in headers]
288          for name in sorted_names:
289              col_widths[0] = max(col_widths[0], len(name))
290              for i, date in enumerate(sorted_dates, 1):
291                  value = pivot_data[name].get(date, "—")
292                  col_widths[i] = max(col_widths[i], len(value))
293  
294          # Build table rows
295          lines = []
296  
297          # Header row
298          header_row = "| " + " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) + " |"
299          lines.append(header_row)
300  
301          # Separator row
302          separator = "| " + " | ".join("-" * w for w in col_widths) + " |"
303          lines.append(separator)
304  
305          # Data rows
306          for name in sorted_names:
307              row_values = [name.ljust(col_widths[0])]
308              for i, date in enumerate(sorted_dates, 1):
309                  value = pivot_data[name].get(date, "—")
310                  row_values.append(value.ljust(col_widths[i]))
311              lines.append("| " + " | ".join(row_values) + " |")
312  
313          return "\n".join(lines)
314  
315      def render_results_table(self, data_rows: list[JobResult]) -> str:
316          """Render job data as a markdown table."""
317          if not data_rows:
318              return "No test jobs found."
319  
320          pivot_data, sorted_dates, sorted_names = self._pivot_job_results(data_rows)
321          return self._build_markdown_table(pivot_data, sorted_dates, sorted_names)
322  
323      async def generate_results_table(self, days_back: int = 30) -> str:
324          """Generate markdown table of cross-version test results."""
325          data_rows = await self.fetch_all_jobs(days_back)
326          if not data_rows:
327              return "No workflow runs found in the specified time period."
328          return self.render_results_table(data_rows)
329  
330  
331  async def main() -> None:
332      parser = argparse.ArgumentParser(description="Visualize MLflow cross-version test results")
333      parser.add_argument(
334          "--days", type=int, default=14, help="Number of days back to fetch results (default: 14)"
335      )
336      parser.add_argument(
337          "--repo",
338          default="mlflow/dev",
339          help="GitHub repository in owner/repo format (default: mlflow/dev)",
340      )
341      parser.add_argument("--token", help="GitHub token (default: use GH_TOKEN env var)")
342  
343      args = parser.parse_args()
344  
345      token = args.token or os.environ.get("GH_TOKEN")
346      if not token:
347          print(
348              "Warning: No GitHub token provided. API requests may be rate-limited.", file=sys.stderr
349          )
350          print("Set GH_TOKEN environment variable or use --token option.", file=sys.stderr)
351  
352      visualizer = XTestViz(github_token=token, repo=args.repo)
353      output = await visualizer.generate_results_table(args.days)
354      print(output)
355  
356  
357  if __name__ == "__main__":
358      asyncio.run(main())