/ packages / updater / __main__.py
__main__.py
  1  #!/usr/bin/env python3
  2  """
  3  Update third-party packages in ./packages directory.
  4  
  5  This script auto-discovers updatable packages:
  6  1. Packages with `nix-update-args` file -> run nix-update with those args
  7  2. Packages with `update.py` file -> import and call main()
  8  
  9  Usage:
 10      python3 -m updater [--dry-run] [--package NAME] [--list] [--pr]
 11  """
 12  
 13  import argparse
 14  import importlib.util
 15  import json
 16  import subprocess
 17  import sys
 18  import tempfile
 19  from dataclasses import dataclass
 20  from pathlib import Path
 21  
 22  
 23  @dataclass
 24  class Package:
 25      name: str
 26      method: str  # "nix-update" or "custom"
 27      path: Path
 28      extra_args: list[str] | None = None
 29  
 30  
 31  @dataclass
 32  class UpdateResult:
 33      package: Package
 34      success: bool
 35      changed: bool
 36      old_version: str | None = None
 37      new_version: str | None = None
 38  
 39  
 40  def get_pkgs_dir() -> Path:
 41      """Get the packages directory (parent of updater)."""
 42      return Path(__file__).parent.parent.resolve()
 43  
 44  
 45  def run_cmd(
 46      cmd: list[str], cwd: Path | None = None, check: bool = True
 47  ) -> subprocess.CompletedProcess[str]:
 48      """Run a command and return the result."""
 49      return subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, check=check)
 50  
 51  
 52  def git_has_changes(flake_root: Path) -> bool:
 53      """Check if there are uncommitted changes."""
 54      result = run_cmd(["git", "status", "--porcelain"], cwd=flake_root, check=False)
 55      return bool(result.stdout.strip())
 56  
 57  
 58  def git_get_changes(flake_root: Path) -> str:
 59      """Get list of changed files."""
 60      result = run_cmd(["git", "status", "--porcelain"], cwd=flake_root, check=False)
 61      return result.stdout.strip()
 62  
 63  
 64  def get_current_version(pkg: Package) -> str | None:
 65      """Get current version from srcs.json if it exists."""
 66      srcs_file = pkg.path / "srcs.json"
 67      if srcs_file.exists():
 68          try:
 69              data = json.loads(srcs_file.read_text())
 70              return data.get("version")
 71          except json.JSONDecodeError:
 72              pass
 73      return None
 74  
 75  
 76  def discover_packages(pkgs_dir: Path) -> list[Package]:
 77      """Discover all updatable packages."""
 78      packages: list[Package] = []
 79  
 80      for pkg_dir in pkgs_dir.iterdir():
 81          if not pkg_dir.is_dir():
 82              continue
 83  
 84          # Check for nix-update-args file
 85          nix_update_args_file = pkg_dir / "nix-update-args"
 86          if nix_update_args_file.exists():
 87              args = nix_update_args_file.read_text().strip().split()
 88              packages.append(
 89                  Package(
 90                      name=pkg_dir.name,
 91                      method="nix-update",
 92                      path=pkg_dir,
 93                      extra_args=args,
 94                  )
 95              )
 96              continue
 97  
 98          # Check for update.py file
 99          update_script = pkg_dir / "update.py"
100          if update_script.exists():
101              packages.append(
102                  Package(
103                      name=pkg_dir.name,
104                      method="custom",
105                      path=pkg_dir,
106                  )
107              )
108  
109      return sorted(packages, key=lambda p: p.name)
110  
111  
112  def run_nix_update(pkg: Package, flake_root: Path, dry_run: bool = False) -> bool:
113      """Run nix-update for a package."""
114      cmd = [
115          "nix-update",
116          "--flake",
117          f".#packages.x86_64-linux.{pkg.name}",
118      ]
119  
120      if pkg.extra_args:
121          cmd.extend(pkg.extra_args)
122  
123      print(f"  Running: {' '.join(cmd)}")
124  
125      if dry_run:
126          print("  (dry-run, skipping)")
127          return True
128  
129      result = subprocess.run(
130          cmd, check=False, cwd=flake_root, capture_output=True, text=True
131      )
132  
133      if result.returncode != 0:
134          print(f"  Error: {result.stderr}")
135          return False
136  
137      if result.stdout:
138          print(f"  {result.stdout.strip()}")
139  
140      return True
141  
142  
143  def run_custom_update(pkg: Package, dry_run: bool = False) -> bool:
144      """Import and run main() from update.py."""
145      update_script = pkg.path / "update.py"
146  
147      print(f"  Running: {update_script}")
148  
149      if dry_run:
150          print("  (dry-run, skipping)")
151          return True
152  
153      spec = importlib.util.spec_from_file_location(f"update_{pkg.name}", update_script)
154      if spec is None or spec.loader is None:
155          print(f"  Error: Could not load {update_script}")
156          return False
157  
158      module = importlib.util.module_from_spec(spec)
159      sys.modules[f"update_{pkg.name}"] = module
160      spec.loader.exec_module(module)
161  
162      if not hasattr(module, "main"):
163          print(f"  Error: {update_script} has no main() function")
164          return False
165  
166      module.main()
167      return True
168  
169  
170  def update_package(
171      pkg: Package, flake_root: Path, dry_run: bool = False
172  ) -> UpdateResult:
173      """Update a single package and return the result."""
174      print(f"\nUpdating {pkg.name} (method: {pkg.method})...")
175  
176      old_version = get_current_version(pkg)
177  
178      # Check for existing changes before update
179      had_changes_before = git_has_changes(flake_root)
180  
181      if pkg.method == "nix-update":
182          success = run_nix_update(pkg, flake_root, dry_run)
183      elif pkg.method == "custom":
184          success = run_custom_update(pkg, dry_run)
185      else:
186          print(f"  Error: Unknown method: {pkg.method}")
187          success = False
188  
189      new_version = get_current_version(pkg)
190      changed = not had_changes_before and git_has_changes(flake_root)
191  
192      return UpdateResult(
193          package=pkg,
194          success=success,
195          changed=changed,
196          old_version=old_version,
197          new_version=new_version,
198      )
199  
200  
201  def create_pr_for_package(
202      pkg: Package, flake_root: Path, dry_run: bool = False
203  ) -> bool:
204      """Create a PR for a package update using a git worktree."""
205      branch_name = f"update/{pkg.name}"
206  
207      print(f"\nCreating PR for {pkg.name}...")
208  
209      # Check if branch already exists on remote
210      check_branch = run_cmd(
211          ["git", "ls-remote", "--heads", "origin", branch_name],
212          cwd=flake_root,
213          check=False,
214      )
215      if check_branch.stdout.strip():
216          print(f"  Branch {branch_name} already exists on remote, skipping")
217          return False
218  
219      if dry_run:
220          print("  (dry-run, skipping)")
221          return True
222  
223      return _create_pr_in_worktree(pkg, flake_root, branch_name)
224  
225  
226  def _create_pr_in_worktree(pkg: Package, flake_root: Path, branch_name: str) -> bool:
227      """Create PR using a temporary worktree."""
228      with tempfile.TemporaryDirectory() as tmpdir:
229          worktree_path = Path(tmpdir) / "worktree"
230  
231          # Create worktree with new branch
232          result = run_cmd(
233              ["git", "worktree", "add", "-b", branch_name, str(worktree_path)],
234              cwd=flake_root,
235              check=False,
236          )
237          if result.returncode != 0:
238              print(f"  Error creating worktree: {result.stderr}")
239              return False
240  
241          try:
242              return _run_update_and_create_pr(
243                  pkg, flake_root, worktree_path, branch_name
244              )
245          finally:
246              # Clean up worktree and branch
247              run_cmd(
248                  ["git", "worktree", "remove", "--force", str(worktree_path)],
249                  cwd=flake_root,
250                  check=False,
251              )
252              run_cmd(["git", "branch", "-D", branch_name], cwd=flake_root, check=False)
253  
254  
255  def _run_update_and_create_pr(
256      pkg: Package, flake_root: Path, worktree_path: Path, branch_name: str
257  ) -> bool:
258      """Run update in worktree and create PR."""
259      old_version = get_current_version(pkg)
260  
261      # Run the update in the worktree
262      worktree_pkg = Package(
263          name=pkg.name,
264          method=pkg.method,
265          path=worktree_path / "packages" / pkg.name,
266          extra_args=pkg.extra_args,
267      )
268  
269      if pkg.method == "nix-update":
270          success = run_nix_update(worktree_pkg, worktree_path, dry_run=False)
271      elif pkg.method == "custom":
272          success = run_custom_update(worktree_pkg, dry_run=False)
273      else:
274          success = False
275  
276      if not success:
277          print("  Update failed")
278          return False
279  
280      if not git_has_changes(worktree_path):
281          print("  No changes, already up to date")
282          return True  # Not a failure, just nothing to do
283  
284      new_version = get_current_version(worktree_pkg)
285      old_ver = old_version or "unknown"
286      new_ver = new_version or "unknown"
287  
288      # Commit and push
289      run_cmd(["git", "add", "-A"], cwd=worktree_path)
290      commit_msg = f"{pkg.name}: {old_ver} -> {new_ver}"
291      run_cmd(["git", "commit", "-m", commit_msg], cwd=worktree_path)
292  
293      push_result = run_cmd(
294          ["git", "push", "-u", "origin", branch_name], cwd=worktree_path, check=False
295      )
296      if push_result.returncode != 0:
297          print(f"  Error pushing: {push_result.stderr}")
298          return False
299  
300      # Create PR
301      pr_body = f"Automated update of {pkg.name} from {old_ver} to {new_ver}."
302      pr_result = run_cmd(
303          [
304              "gh", "pr", "create",
305              "--title", commit_msg,
306              "--body", pr_body,
307              "--label", "dependencies",
308          ],
309          cwd=worktree_path,
310          check=False,
311      )
312  
313      if pr_result.returncode != 0:
314          print(f"  Error creating PR: {pr_result.stderr}")
315          return False
316  
317      print(f"  Created PR: {pr_result.stdout.strip()}")
318      return True
319  
320  
321  def list_packages(packages: list[Package]) -> None:
322      """List all discovered packages."""
323      print("Packages with nix-update:")
324      for pkg in packages:
325          if pkg.method == "nix-update":
326              args = " ".join(pkg.extra_args or [])
327              print(f"  - {pkg.name}" + (f" ({args})" if args else ""))
328  
329      print("\nPackages with custom update.py:")
330      for pkg in packages:
331          if pkg.method == "custom":
332              print(f"  - {pkg.name}")
333  
334  
335  def main() -> int:
336      parser = argparse.ArgumentParser(
337          description="Update third-party packages in ./packages"
338      )
339      parser.add_argument(
340          "--dry-run",
341          action="store_true",
342          help="Show what would be done without making changes",
343      )
344      parser.add_argument(
345          "--package",
346          "-p",
347          help="Update only the specified package",
348      )
349      parser.add_argument(
350          "--list",
351          "-l",
352          action="store_true",
353          help="List all discovered packages",
354      )
355      parser.add_argument(
356          "--pr",
357          action="store_true",
358          help="Create a PR for each updated package (uses git worktrees)",
359      )
360  
361      args = parser.parse_args()
362  
363      pkgs_dir = get_pkgs_dir()
364      flake_root = pkgs_dir.parent
365  
366      packages = discover_packages(pkgs_dir)
367  
368      if args.list:
369          list_packages(packages)
370          return 0
371  
372      if args.package:
373          packages = [p for p in packages if p.name == args.package]
374          if not packages:
375              print(f"Error: Package '{args.package}' not found")
376              return 1
377  
378      success_count = 0
379      failure_count = 0
380  
381      for pkg in packages:
382          if args.pr:
383              # PR mode: use worktree to create PR without touching current checkout
384              if create_pr_for_package(pkg, flake_root, args.dry_run):
385                  success_count += 1
386              else:
387                  failure_count += 1
388          else:
389              # Normal mode: update in place
390              result = update_package(pkg, flake_root, args.dry_run)
391              if result.success:
392                  success_count += 1
393              else:
394                  failure_count += 1
395  
396      print(f"\n{'=' * 40}")
397      print(f"Results: {success_count} succeeded, {failure_count} failed")
398  
399      return 0 if failure_count == 0 else 1
400  
401  
402  if __name__ == "__main__":
403      sys.exit(main())