__main__.py
1 #!/usr/bin/env python3 2 """ 3 Update third-party packages in ./packages directory. 4 5 This script auto-discovers updatable packages: 6 1. Packages with `nix-update-args` file -> run nix-update with those args 7 2. Packages with `update.py` file -> import and call main() 8 9 Usage: 10 python3 -m updater [--dry-run] [--package NAME] [--list] [--pr] 11 """ 12 13 import argparse 14 import importlib.util 15 import json 16 import subprocess 17 import sys 18 import tempfile 19 from dataclasses import dataclass 20 from pathlib import Path 21 22 23 @dataclass 24 class Package: 25 name: str 26 method: str # "nix-update" or "custom" 27 path: Path 28 extra_args: list[str] | None = None 29 30 31 @dataclass 32 class UpdateResult: 33 package: Package 34 success: bool 35 changed: bool 36 old_version: str | None = None 37 new_version: str | None = None 38 39 40 def get_pkgs_dir() -> Path: 41 """Get the packages directory (parent of updater).""" 42 return Path(__file__).parent.parent.resolve() 43 44 45 def run_cmd( 46 cmd: list[str], cwd: Path | None = None, check: bool = True 47 ) -> subprocess.CompletedProcess[str]: 48 """Run a command and return the result.""" 49 return subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, check=check) 50 51 52 def git_has_changes(flake_root: Path) -> bool: 53 """Check if there are uncommitted changes.""" 54 result = run_cmd(["git", "status", "--porcelain"], cwd=flake_root, check=False) 55 return bool(result.stdout.strip()) 56 57 58 def git_get_changes(flake_root: Path) -> str: 59 """Get list of changed files.""" 60 result = run_cmd(["git", "status", "--porcelain"], cwd=flake_root, check=False) 61 return result.stdout.strip() 62 63 64 def get_current_version(pkg: Package) -> str | None: 65 """Get current version from srcs.json if it exists.""" 66 srcs_file = pkg.path / "srcs.json" 67 if srcs_file.exists(): 68 try: 69 data = json.loads(srcs_file.read_text()) 70 return data.get("version") 71 except json.JSONDecodeError: 72 pass 73 return None 74 75 76 def discover_packages(pkgs_dir: Path) -> list[Package]: 77 """Discover all updatable packages.""" 78 packages: list[Package] = [] 79 80 for pkg_dir in pkgs_dir.iterdir(): 81 if not pkg_dir.is_dir(): 82 continue 83 84 # Check for nix-update-args file 85 nix_update_args_file = pkg_dir / "nix-update-args" 86 if nix_update_args_file.exists(): 87 args = nix_update_args_file.read_text().strip().split() 88 packages.append( 89 Package( 90 name=pkg_dir.name, 91 method="nix-update", 92 path=pkg_dir, 93 extra_args=args, 94 ) 95 ) 96 continue 97 98 # Check for update.py file 99 update_script = pkg_dir / "update.py" 100 if update_script.exists(): 101 packages.append( 102 Package( 103 name=pkg_dir.name, 104 method="custom", 105 path=pkg_dir, 106 ) 107 ) 108 109 return sorted(packages, key=lambda p: p.name) 110 111 112 def run_nix_update(pkg: Package, flake_root: Path, dry_run: bool = False) -> bool: 113 """Run nix-update for a package.""" 114 cmd = [ 115 "nix-update", 116 "--flake", 117 f".#packages.x86_64-linux.{pkg.name}", 118 ] 119 120 if pkg.extra_args: 121 cmd.extend(pkg.extra_args) 122 123 print(f" Running: {' '.join(cmd)}") 124 125 if dry_run: 126 print(" (dry-run, skipping)") 127 return True 128 129 result = subprocess.run( 130 cmd, check=False, cwd=flake_root, capture_output=True, text=True 131 ) 132 133 if result.returncode != 0: 134 print(f" Error: {result.stderr}") 135 return False 136 137 if result.stdout: 138 print(f" {result.stdout.strip()}") 139 140 return True 141 142 143 def run_custom_update(pkg: Package, dry_run: bool = False) -> bool: 144 """Import and run main() from update.py.""" 145 update_script = pkg.path / "update.py" 146 147 print(f" Running: {update_script}") 148 149 if dry_run: 150 print(" (dry-run, skipping)") 151 return True 152 153 spec = importlib.util.spec_from_file_location(f"update_{pkg.name}", update_script) 154 if spec is None or spec.loader is None: 155 print(f" Error: Could not load {update_script}") 156 return False 157 158 module = importlib.util.module_from_spec(spec) 159 sys.modules[f"update_{pkg.name}"] = module 160 spec.loader.exec_module(module) 161 162 if not hasattr(module, "main"): 163 print(f" Error: {update_script} has no main() function") 164 return False 165 166 module.main() 167 return True 168 169 170 def update_package( 171 pkg: Package, flake_root: Path, dry_run: bool = False 172 ) -> UpdateResult: 173 """Update a single package and return the result.""" 174 print(f"\nUpdating {pkg.name} (method: {pkg.method})...") 175 176 old_version = get_current_version(pkg) 177 178 # Check for existing changes before update 179 had_changes_before = git_has_changes(flake_root) 180 181 if pkg.method == "nix-update": 182 success = run_nix_update(pkg, flake_root, dry_run) 183 elif pkg.method == "custom": 184 success = run_custom_update(pkg, dry_run) 185 else: 186 print(f" Error: Unknown method: {pkg.method}") 187 success = False 188 189 new_version = get_current_version(pkg) 190 changed = not had_changes_before and git_has_changes(flake_root) 191 192 return UpdateResult( 193 package=pkg, 194 success=success, 195 changed=changed, 196 old_version=old_version, 197 new_version=new_version, 198 ) 199 200 201 def create_pr_for_package( 202 pkg: Package, flake_root: Path, dry_run: bool = False 203 ) -> bool: 204 """Create a PR for a package update using a git worktree.""" 205 branch_name = f"update/{pkg.name}" 206 207 print(f"\nCreating PR for {pkg.name}...") 208 209 # Check if branch already exists on remote 210 check_branch = run_cmd( 211 ["git", "ls-remote", "--heads", "origin", branch_name], 212 cwd=flake_root, 213 check=False, 214 ) 215 if check_branch.stdout.strip(): 216 print(f" Branch {branch_name} already exists on remote, skipping") 217 return False 218 219 if dry_run: 220 print(" (dry-run, skipping)") 221 return True 222 223 return _create_pr_in_worktree(pkg, flake_root, branch_name) 224 225 226 def _create_pr_in_worktree(pkg: Package, flake_root: Path, branch_name: str) -> bool: 227 """Create PR using a temporary worktree.""" 228 with tempfile.TemporaryDirectory() as tmpdir: 229 worktree_path = Path(tmpdir) / "worktree" 230 231 # Create worktree with new branch 232 result = run_cmd( 233 ["git", "worktree", "add", "-b", branch_name, str(worktree_path)], 234 cwd=flake_root, 235 check=False, 236 ) 237 if result.returncode != 0: 238 print(f" Error creating worktree: {result.stderr}") 239 return False 240 241 try: 242 return _run_update_and_create_pr( 243 pkg, flake_root, worktree_path, branch_name 244 ) 245 finally: 246 # Clean up worktree and branch 247 run_cmd( 248 ["git", "worktree", "remove", "--force", str(worktree_path)], 249 cwd=flake_root, 250 check=False, 251 ) 252 run_cmd(["git", "branch", "-D", branch_name], cwd=flake_root, check=False) 253 254 255 def _run_update_and_create_pr( 256 pkg: Package, flake_root: Path, worktree_path: Path, branch_name: str 257 ) -> bool: 258 """Run update in worktree and create PR.""" 259 old_version = get_current_version(pkg) 260 261 # Run the update in the worktree 262 worktree_pkg = Package( 263 name=pkg.name, 264 method=pkg.method, 265 path=worktree_path / "packages" / pkg.name, 266 extra_args=pkg.extra_args, 267 ) 268 269 if pkg.method == "nix-update": 270 success = run_nix_update(worktree_pkg, worktree_path, dry_run=False) 271 elif pkg.method == "custom": 272 success = run_custom_update(worktree_pkg, dry_run=False) 273 else: 274 success = False 275 276 if not success: 277 print(" Update failed") 278 return False 279 280 if not git_has_changes(worktree_path): 281 print(" No changes, already up to date") 282 return True # Not a failure, just nothing to do 283 284 new_version = get_current_version(worktree_pkg) 285 old_ver = old_version or "unknown" 286 new_ver = new_version or "unknown" 287 288 # Commit and push 289 run_cmd(["git", "add", "-A"], cwd=worktree_path) 290 commit_msg = f"{pkg.name}: {old_ver} -> {new_ver}" 291 run_cmd(["git", "commit", "-m", commit_msg], cwd=worktree_path) 292 293 push_result = run_cmd( 294 ["git", "push", "-u", "origin", branch_name], cwd=worktree_path, check=False 295 ) 296 if push_result.returncode != 0: 297 print(f" Error pushing: {push_result.stderr}") 298 return False 299 300 # Create PR 301 pr_body = f"Automated update of {pkg.name} from {old_ver} to {new_ver}." 302 pr_result = run_cmd( 303 [ 304 "gh", "pr", "create", 305 "--title", commit_msg, 306 "--body", pr_body, 307 "--label", "dependencies", 308 ], 309 cwd=worktree_path, 310 check=False, 311 ) 312 313 if pr_result.returncode != 0: 314 print(f" Error creating PR: {pr_result.stderr}") 315 return False 316 317 print(f" Created PR: {pr_result.stdout.strip()}") 318 return True 319 320 321 def list_packages(packages: list[Package]) -> None: 322 """List all discovered packages.""" 323 print("Packages with nix-update:") 324 for pkg in packages: 325 if pkg.method == "nix-update": 326 args = " ".join(pkg.extra_args or []) 327 print(f" - {pkg.name}" + (f" ({args})" if args else "")) 328 329 print("\nPackages with custom update.py:") 330 for pkg in packages: 331 if pkg.method == "custom": 332 print(f" - {pkg.name}") 333 334 335 def main() -> int: 336 parser = argparse.ArgumentParser( 337 description="Update third-party packages in ./packages" 338 ) 339 parser.add_argument( 340 "--dry-run", 341 action="store_true", 342 help="Show what would be done without making changes", 343 ) 344 parser.add_argument( 345 "--package", 346 "-p", 347 help="Update only the specified package", 348 ) 349 parser.add_argument( 350 "--list", 351 "-l", 352 action="store_true", 353 help="List all discovered packages", 354 ) 355 parser.add_argument( 356 "--pr", 357 action="store_true", 358 help="Create a PR for each updated package (uses git worktrees)", 359 ) 360 361 args = parser.parse_args() 362 363 pkgs_dir = get_pkgs_dir() 364 flake_root = pkgs_dir.parent 365 366 packages = discover_packages(pkgs_dir) 367 368 if args.list: 369 list_packages(packages) 370 return 0 371 372 if args.package: 373 packages = [p for p in packages if p.name == args.package] 374 if not packages: 375 print(f"Error: Package '{args.package}' not found") 376 return 1 377 378 success_count = 0 379 failure_count = 0 380 381 for pkg in packages: 382 if args.pr: 383 # PR mode: use worktree to create PR without touching current checkout 384 if create_pr_for_package(pkg, flake_root, args.dry_run): 385 success_count += 1 386 else: 387 failure_count += 1 388 else: 389 # Normal mode: update in place 390 result = update_package(pkg, flake_root, args.dry_run) 391 if result.success: 392 success_count += 1 393 else: 394 failure_count += 1 395 396 print(f"\n{'=' * 40}") 397 print(f"Results: {success_count} succeeded, {failure_count} failed") 398 399 return 0 if failure_count == 0 else 1 400 401 402 if __name__ == "__main__": 403 sys.exit(main())