test_worktree.py
1 """Tests for git worktree isolation (CLI --worktree / -w flag). 2 3 Verifies worktree creation, cleanup, .worktreeinclude handling, 4 .gitignore management, and integration with the CLI. (#652) 5 """ 6 7 import os 8 import shutil 9 import subprocess 10 import pytest 11 from pathlib import Path 12 from unittest.mock import patch, MagicMock 13 14 15 @pytest.fixture 16 def git_repo(tmp_path): 17 """Create a temporary git repo for testing.""" 18 repo = tmp_path / "test-repo" 19 repo.mkdir() 20 subprocess.run(["git", "init"], cwd=repo, capture_output=True) 21 subprocess.run( 22 ["git", "config", "user.email", "test@test.com"], 23 cwd=repo, capture_output=True, 24 ) 25 subprocess.run( 26 ["git", "config", "user.name", "Test"], 27 cwd=repo, capture_output=True, 28 ) 29 # Create initial commit (worktrees need at least one commit) 30 (repo / "README.md").write_text("# Test Repo\n") 31 subprocess.run(["git", "add", "."], cwd=repo, capture_output=True) 32 subprocess.run( 33 ["git", "commit", "-m", "Initial commit"], 34 cwd=repo, capture_output=True, 35 ) 36 # Add a fake remote ref so cleanup logic sees the initial commit as 37 # "pushed". Without this, `git log HEAD --not --remotes` treats every 38 # commit as unpushed and cleanup refuses to delete worktrees. 39 subprocess.run( 40 ["git", "update-ref", "refs/remotes/origin/main", "HEAD"], 41 cwd=repo, capture_output=True, 42 ) 43 return repo 44 45 46 # --------------------------------------------------------------------------- 47 # Lightweight reimplementations for testing (avoid importing cli.py) 48 # --------------------------------------------------------------------------- 49 50 def _git_repo_root(cwd=None): 51 """Test version of _git_repo_root.""" 52 try: 53 result = subprocess.run( 54 ["git", "rev-parse", "--show-toplevel"], 55 capture_output=True, text=True, timeout=5, 56 cwd=cwd, 57 ) 58 if result.returncode == 0: 59 return result.stdout.strip() 60 except Exception: 61 pass 62 return None 63 64 65 def _setup_worktree(repo_root): 66 """Test version of _setup_worktree — creates a worktree.""" 67 import uuid 68 short_id = uuid.uuid4().hex[:8] 69 wt_name = f"hermes-{short_id}" 70 branch_name = f"hermes/{wt_name}" 71 72 worktrees_dir = Path(repo_root) / ".worktrees" 73 worktrees_dir.mkdir(parents=True, exist_ok=True) 74 wt_path = worktrees_dir / wt_name 75 76 result = subprocess.run( 77 ["git", "worktree", "add", str(wt_path), "-b", branch_name, "HEAD"], 78 capture_output=True, text=True, timeout=30, cwd=repo_root, 79 ) 80 if result.returncode != 0: 81 return None 82 83 return { 84 "path": str(wt_path), 85 "branch": branch_name, 86 "repo_root": repo_root, 87 } 88 89 90 def _cleanup_worktree(info): 91 """Test version of _cleanup_worktree. 92 93 Preserves the worktree only if it has unpushed commits. 94 Dirty working tree alone is not enough to keep it. 95 """ 96 wt_path = info["path"] 97 branch = info["branch"] 98 repo_root = info["repo_root"] 99 100 if not Path(wt_path).exists(): 101 return 102 103 # Check for unpushed commits 104 result = subprocess.run( 105 ["git", "log", "--oneline", "HEAD", "--not", "--remotes"], 106 capture_output=True, text=True, timeout=10, cwd=wt_path, 107 ) 108 has_unpushed = bool(result.stdout.strip()) 109 110 if has_unpushed: 111 return False # Did not clean up — has unpushed commits 112 113 subprocess.run( 114 ["git", "worktree", "remove", wt_path, "--force"], 115 capture_output=True, text=True, timeout=15, cwd=repo_root, 116 ) 117 subprocess.run( 118 ["git", "branch", "-D", branch], 119 capture_output=True, text=True, timeout=10, cwd=repo_root, 120 ) 121 return True # Cleaned up 122 123 124 # --------------------------------------------------------------------------- 125 # Tests 126 # --------------------------------------------------------------------------- 127 128 class TestGitRepoDetection: 129 """Test git repo root detection.""" 130 131 def test_detects_git_repo(self, git_repo): 132 root = _git_repo_root(cwd=str(git_repo)) 133 assert root is not None 134 assert Path(root).resolve() == git_repo.resolve() 135 136 def test_detects_subdirectory(self, git_repo): 137 subdir = git_repo / "src" / "lib" 138 subdir.mkdir(parents=True) 139 root = _git_repo_root(cwd=str(subdir)) 140 assert root is not None 141 assert Path(root).resolve() == git_repo.resolve() 142 143 def test_returns_none_outside_repo(self, tmp_path): 144 # tmp_path itself is not a git repo 145 bare_dir = tmp_path / "not-a-repo" 146 bare_dir.mkdir() 147 root = _git_repo_root(cwd=str(bare_dir)) 148 assert root is None 149 150 151 class TestWorktreeCreation: 152 """Test worktree setup.""" 153 154 def test_creates_worktree(self, git_repo): 155 info = _setup_worktree(str(git_repo)) 156 assert info is not None 157 assert Path(info["path"]).exists() 158 assert info["branch"].startswith("hermes/hermes-") 159 assert info["repo_root"] == str(git_repo) 160 161 # Verify it's a valid git worktree 162 result = subprocess.run( 163 ["git", "rev-parse", "--is-inside-work-tree"], 164 capture_output=True, text=True, cwd=info["path"], 165 ) 166 assert result.stdout.strip() == "true" 167 168 def test_worktree_has_own_branch(self, git_repo): 169 info = _setup_worktree(str(git_repo)) 170 assert info is not None 171 172 # Check branch name in worktree 173 result = subprocess.run( 174 ["git", "branch", "--show-current"], 175 capture_output=True, text=True, cwd=info["path"], 176 ) 177 assert result.stdout.strip() == info["branch"] 178 179 def test_worktree_is_independent(self, git_repo): 180 """Two worktrees from the same repo are independent.""" 181 info1 = _setup_worktree(str(git_repo)) 182 info2 = _setup_worktree(str(git_repo)) 183 assert info1 is not None 184 assert info2 is not None 185 assert info1["path"] != info2["path"] 186 assert info1["branch"] != info2["branch"] 187 188 # Create a file in worktree 1 189 (Path(info1["path"]) / "only-in-wt1.txt").write_text("hello") 190 191 # It should NOT appear in worktree 2 192 assert not (Path(info2["path"]) / "only-in-wt1.txt").exists() 193 194 def test_worktrees_dir_created(self, git_repo): 195 info = _setup_worktree(str(git_repo)) 196 assert info is not None 197 assert (git_repo / ".worktrees").is_dir() 198 199 def test_worktree_has_repo_files(self, git_repo): 200 """Worktree should contain the repo's tracked files.""" 201 info = _setup_worktree(str(git_repo)) 202 assert info is not None 203 assert (Path(info["path"]) / "README.md").exists() 204 205 206 class TestWorktreeCleanup: 207 """Test worktree cleanup on exit.""" 208 209 def test_clean_worktree_removed(self, git_repo): 210 info = _setup_worktree(str(git_repo)) 211 assert info is not None 212 assert Path(info["path"]).exists() 213 214 result = _cleanup_worktree(info) 215 assert result is True 216 assert not Path(info["path"]).exists() 217 218 def test_dirty_worktree_cleaned_when_no_unpushed(self, git_repo): 219 """Dirty working tree without unpushed commits is cleaned up. 220 221 Agent sessions typically leave untracked files / artifacts behind. 222 Since all real work is in pushed commits, these don't warrant 223 keeping the worktree. 224 """ 225 info = _setup_worktree(str(git_repo)) 226 assert info is not None 227 228 # Make uncommitted changes (untracked file) 229 (Path(info["path"]) / "new-file.txt").write_text("uncommitted") 230 subprocess.run( 231 ["git", "add", "new-file.txt"], 232 cwd=info["path"], capture_output=True, 233 ) 234 235 # The git_repo fixture already has a fake remote ref so the initial 236 # commit is seen as "pushed". No unpushed commits → cleanup proceeds. 237 result = _cleanup_worktree(info) 238 assert result is True # Cleaned up despite dirty working tree 239 assert not Path(info["path"]).exists() 240 241 def test_worktree_with_unpushed_commits_kept(self, git_repo): 242 """Worktree with unpushed commits is preserved.""" 243 info = _setup_worktree(str(git_repo)) 244 assert info is not None 245 246 # Make a commit that is NOT on any remote 247 (Path(info["path"]) / "work.txt").write_text("real work") 248 subprocess.run(["git", "add", "work.txt"], cwd=info["path"], capture_output=True) 249 subprocess.run( 250 ["git", "commit", "-m", "agent work"], 251 cwd=info["path"], capture_output=True, 252 ) 253 254 result = _cleanup_worktree(info) 255 assert result is False # Kept — has unpushed commits 256 assert Path(info["path"]).exists() 257 258 def test_branch_deleted_on_cleanup(self, git_repo): 259 info = _setup_worktree(str(git_repo)) 260 branch = info["branch"] 261 262 _cleanup_worktree(info) 263 264 # Branch should be gone 265 result = subprocess.run( 266 ["git", "branch", "--list", branch], 267 capture_output=True, text=True, cwd=str(git_repo), 268 ) 269 assert branch not in result.stdout 270 271 def test_cleanup_nonexistent_worktree(self, git_repo): 272 """Cleanup should handle already-removed worktrees gracefully.""" 273 info = { 274 "path": str(git_repo / ".worktrees" / "nonexistent"), 275 "branch": "hermes/nonexistent", 276 "repo_root": str(git_repo), 277 } 278 # Should not raise 279 _cleanup_worktree(info) 280 281 282 class TestWorktreeInclude: 283 """Test .worktreeinclude file handling.""" 284 285 def test_copies_included_files(self, git_repo): 286 """Files listed in .worktreeinclude should be copied to the worktree.""" 287 # Create a .env file (gitignored) 288 (git_repo / ".env").write_text("SECRET=abc123") 289 (git_repo / ".gitignore").write_text(".env\n.worktrees/\n") 290 subprocess.run( 291 ["git", "add", ".gitignore"], 292 cwd=str(git_repo), capture_output=True, 293 ) 294 subprocess.run( 295 ["git", "commit", "-m", "Add gitignore"], 296 cwd=str(git_repo), capture_output=True, 297 ) 298 299 # Create .worktreeinclude 300 (git_repo / ".worktreeinclude").write_text(".env\n") 301 302 # Import and use the real _setup_worktree logic for include handling 303 info = _setup_worktree(str(git_repo)) 304 assert info is not None 305 306 # Manually copy .worktreeinclude entries (mirrors cli.py logic) 307 import shutil 308 include_file = git_repo / ".worktreeinclude" 309 wt_path = Path(info["path"]) 310 for line in include_file.read_text().splitlines(): 311 entry = line.strip() 312 if not entry or entry.startswith("#"): 313 continue 314 src = git_repo / entry 315 dst = wt_path / entry 316 if src.is_file(): 317 dst.parent.mkdir(parents=True, exist_ok=True) 318 shutil.copy2(str(src), str(dst)) 319 320 # Verify .env was copied 321 assert (wt_path / ".env").exists() 322 assert (wt_path / ".env").read_text() == "SECRET=abc123" 323 324 def test_ignores_comments_and_blanks(self, git_repo): 325 """Comments and blank lines in .worktreeinclude should be skipped.""" 326 (git_repo / ".worktreeinclude").write_text( 327 "# This is a comment\n" 328 "\n" 329 " # Another comment\n" 330 ) 331 info = _setup_worktree(str(git_repo)) 332 assert info is not None 333 # Should not crash — just skip all lines 334 335 336 class TestGitignoreManagement: 337 """Test that .worktrees/ is added to .gitignore.""" 338 339 def test_adds_to_gitignore(self, git_repo): 340 """Creating a worktree should add .worktrees/ to .gitignore.""" 341 # Remove any existing .gitignore 342 gitignore = git_repo / ".gitignore" 343 if gitignore.exists(): 344 gitignore.unlink() 345 346 info = _setup_worktree(str(git_repo)) 347 assert info is not None 348 349 # Now manually add .worktrees/ to .gitignore (mirrors cli.py logic) 350 _ignore_entry = ".worktrees/" 351 existing = gitignore.read_text() if gitignore.exists() else "" 352 if _ignore_entry not in existing.splitlines(): 353 with open(gitignore, "a") as f: 354 if existing and not existing.endswith("\n"): 355 f.write("\n") 356 f.write(f"{_ignore_entry}\n") 357 358 content = gitignore.read_text() 359 assert ".worktrees/" in content 360 361 def test_does_not_duplicate_gitignore_entry(self, git_repo): 362 """If .worktrees/ is already in .gitignore, don't add again.""" 363 gitignore = git_repo / ".gitignore" 364 gitignore.write_text(".worktrees/\n") 365 366 # The check should see it's already there 367 existing = gitignore.read_text() 368 assert ".worktrees/" in existing.splitlines() 369 370 371 class TestMultipleWorktrees: 372 """Test running multiple worktrees concurrently (the core use case).""" 373 374 def test_ten_concurrent_worktrees(self, git_repo): 375 """Create 10 worktrees — simulating 10 parallel agents.""" 376 worktrees = [] 377 for _ in range(10): 378 info = _setup_worktree(str(git_repo)) 379 assert info is not None 380 worktrees.append(info) 381 382 # All should exist and be independent 383 paths = [info["path"] for info in worktrees] 384 assert len(set(paths)) == 10 # All unique 385 386 # Each should have the repo files 387 for info in worktrees: 388 assert (Path(info["path"]) / "README.md").exists() 389 390 # Edit a file in one worktree 391 (Path(worktrees[0]["path"]) / "README.md").write_text("Modified in wt0") 392 393 # Others should be unaffected 394 for info in worktrees[1:]: 395 assert (Path(info["path"]) / "README.md").read_text() == "# Test Repo\n" 396 397 # List worktrees via git 398 result = subprocess.run( 399 ["git", "worktree", "list"], 400 capture_output=True, text=True, cwd=str(git_repo), 401 ) 402 # Should have 11 entries: main + 10 worktrees 403 lines = [l for l in result.stdout.strip().splitlines() if l.strip()] 404 assert len(lines) == 11 405 406 # Cleanup all (git_repo fixture has a fake remote ref so cleanup works) 407 for info in worktrees: 408 # Discard changes first so cleanup works 409 subprocess.run( 410 ["git", "checkout", "--", "."], 411 cwd=info["path"], capture_output=True, 412 ) 413 _cleanup_worktree(info) 414 415 # All should be removed 416 for info in worktrees: 417 assert not Path(info["path"]).exists() 418 419 420 class TestWorktreeDirectorySymlink: 421 """Test .worktreeinclude with directories (symlinked).""" 422 423 def test_symlinks_directory(self, git_repo): 424 """Directories in .worktreeinclude should be symlinked.""" 425 # Create a .venv directory 426 venv_dir = git_repo / ".venv" / "lib" 427 venv_dir.mkdir(parents=True) 428 (venv_dir / "marker.txt").write_text("venv marker") 429 (git_repo / ".gitignore").write_text(".venv/\n.worktrees/\n") 430 subprocess.run( 431 ["git", "add", ".gitignore"], cwd=str(git_repo), capture_output=True 432 ) 433 subprocess.run( 434 ["git", "commit", "-m", "gitignore"], cwd=str(git_repo), capture_output=True 435 ) 436 437 (git_repo / ".worktreeinclude").write_text(".venv/\n") 438 439 info = _setup_worktree(str(git_repo)) 440 assert info is not None 441 442 wt_path = Path(info["path"]) 443 src = git_repo / ".venv" 444 dst = wt_path / ".venv" 445 446 # Manually symlink (mirrors cli.py logic) 447 if not dst.exists(): 448 dst.parent.mkdir(parents=True, exist_ok=True) 449 os.symlink(str(src.resolve()), str(dst)) 450 451 assert dst.is_symlink() 452 assert (dst / "lib" / "marker.txt").read_text() == "venv marker" 453 454 455 class TestStaleWorktreePruning: 456 """Test _prune_stale_worktrees garbage collection.""" 457 458 def test_prunes_old_clean_worktree(self, git_repo): 459 """Old clean worktrees should be removed on prune.""" 460 import time 461 462 info = _setup_worktree(str(git_repo)) 463 assert info is not None 464 assert Path(info["path"]).exists() 465 466 # Make the worktree look old (set mtime to 25h ago) 467 old_time = time.time() - (25 * 3600) 468 os.utime(info["path"], (old_time, old_time)) 469 470 # Reimplementation of prune logic (matches cli.py) 471 worktrees_dir = git_repo / ".worktrees" 472 cutoff = time.time() - (24 * 3600) 473 474 for entry in worktrees_dir.iterdir(): 475 if not entry.is_dir() or not entry.name.startswith("hermes-"): 476 continue 477 try: 478 mtime = entry.stat().st_mtime 479 if mtime > cutoff: 480 continue 481 except Exception: 482 continue 483 484 status = subprocess.run( 485 ["git", "status", "--porcelain"], 486 capture_output=True, text=True, timeout=5, cwd=str(entry), 487 ) 488 if status.stdout.strip(): 489 continue 490 491 branch_result = subprocess.run( 492 ["git", "branch", "--show-current"], 493 capture_output=True, text=True, timeout=5, cwd=str(entry), 494 ) 495 branch = branch_result.stdout.strip() 496 subprocess.run( 497 ["git", "worktree", "remove", str(entry), "--force"], 498 capture_output=True, text=True, timeout=15, cwd=str(git_repo), 499 ) 500 if branch: 501 subprocess.run( 502 ["git", "branch", "-D", branch], 503 capture_output=True, text=True, timeout=10, cwd=str(git_repo), 504 ) 505 506 assert not Path(info["path"]).exists() 507 508 def test_keeps_recent_worktree(self, git_repo): 509 """Recent worktrees should NOT be pruned.""" 510 import time 511 512 info = _setup_worktree(str(git_repo)) 513 assert info is not None 514 515 # Don't modify mtime — it's recent 516 worktrees_dir = git_repo / ".worktrees" 517 cutoff = time.time() - (24 * 3600) 518 519 pruned = False 520 for entry in worktrees_dir.iterdir(): 521 if not entry.is_dir() or not entry.name.startswith("hermes-"): 522 continue 523 mtime = entry.stat().st_mtime 524 if mtime > cutoff: 525 continue # Too recent 526 pruned = True 527 528 assert not pruned 529 assert Path(info["path"]).exists() 530 531 def test_keeps_old_worktree_with_unpushed_commits(self, git_repo): 532 """Old worktrees (24-72h) with unpushed commits should NOT be pruned.""" 533 import time 534 535 info = _setup_worktree(str(git_repo)) 536 assert info is not None 537 538 # Make an unpushed commit 539 (Path(info["path"]) / "work.txt").write_text("real work") 540 subprocess.run(["git", "add", "work.txt"], cwd=info["path"], capture_output=True) 541 subprocess.run( 542 ["git", "commit", "-m", "agent work"], 543 cwd=info["path"], capture_output=True, 544 ) 545 546 # Make it old (25h — in the 24-72h soft tier) 547 old_time = time.time() - (25 * 3600) 548 os.utime(info["path"], (old_time, old_time)) 549 550 # Check for unpushed commits (simulates prune logic) 551 result = subprocess.run( 552 ["git", "log", "--oneline", "HEAD", "--not", "--remotes"], 553 capture_output=True, text=True, cwd=info["path"], 554 ) 555 has_unpushed = bool(result.stdout.strip()) 556 assert has_unpushed # Has unpushed commits → not pruned in soft tier 557 assert Path(info["path"]).exists() 558 559 def test_force_prunes_very_old_worktree(self, git_repo): 560 """Worktrees older than 72h should be force-pruned regardless.""" 561 import time 562 563 info = _setup_worktree(str(git_repo)) 564 assert info is not None 565 566 # Make an unpushed commit (would normally protect it) 567 (Path(info["path"]) / "work.txt").write_text("stale work") 568 subprocess.run(["git", "add", "work.txt"], cwd=info["path"], capture_output=True) 569 subprocess.run( 570 ["git", "commit", "-m", "old agent work"], 571 cwd=info["path"], capture_output=True, 572 ) 573 574 # Make it very old (73h — beyond the 72h hard threshold) 575 old_time = time.time() - (73 * 3600) 576 os.utime(info["path"], (old_time, old_time)) 577 578 # Simulate the force-prune tier check 579 hard_cutoff = time.time() - (72 * 3600) 580 mtime = Path(info["path"]).stat().st_mtime 581 assert mtime <= hard_cutoff # Should qualify for force removal 582 583 # Actually remove it (simulates _prune_stale_worktrees force path) 584 branch_result = subprocess.run( 585 ["git", "branch", "--show-current"], 586 capture_output=True, text=True, timeout=5, cwd=info["path"], 587 ) 588 branch = branch_result.stdout.strip() 589 590 subprocess.run( 591 ["git", "worktree", "remove", info["path"], "--force"], 592 capture_output=True, text=True, timeout=15, cwd=str(git_repo), 593 ) 594 if branch: 595 subprocess.run( 596 ["git", "branch", "-D", branch], 597 capture_output=True, text=True, timeout=10, cwd=str(git_repo), 598 ) 599 600 assert not Path(info["path"]).exists() 601 602 603 class TestEdgeCases: 604 """Test edge cases for robustness.""" 605 606 def test_no_commits_repo(self, tmp_path): 607 """Worktree creation should fail gracefully on a repo with no commits.""" 608 repo = tmp_path / "empty-repo" 609 repo.mkdir() 610 subprocess.run(["git", "init"], cwd=str(repo), capture_output=True) 611 612 info = _setup_worktree(str(repo)) 613 assert info is None # Should fail gracefully 614 615 def test_not_a_git_repo(self, tmp_path): 616 """Repo detection should return None for non-git directories.""" 617 bare = tmp_path / "not-git" 618 bare.mkdir() 619 root = _git_repo_root(cwd=str(bare)) 620 assert root is None 621 622 def test_worktrees_dir_already_exists(self, git_repo): 623 """Should work fine if .worktrees/ already exists.""" 624 (git_repo / ".worktrees").mkdir(exist_ok=True) 625 info = _setup_worktree(str(git_repo)) 626 assert info is not None 627 assert Path(info["path"]).exists() 628 629 630 class TestCLIFlagLogic: 631 """Test the flag/config OR logic from main().""" 632 633 def test_worktree_flag_triggers(self): 634 """--worktree flag should trigger worktree creation.""" 635 worktree = True 636 w = False 637 config_worktree = False 638 use_worktree = worktree or w or config_worktree 639 assert use_worktree 640 641 def test_w_flag_triggers(self): 642 """-w flag should trigger worktree creation.""" 643 worktree = False 644 w = True 645 config_worktree = False 646 use_worktree = worktree or w or config_worktree 647 assert use_worktree 648 649 def test_config_triggers(self): 650 """worktree: true in config should trigger worktree creation.""" 651 worktree = False 652 w = False 653 config_worktree = True 654 use_worktree = worktree or w or config_worktree 655 assert use_worktree 656 657 def test_none_set_no_trigger(self): 658 """No flags and no config should not trigger.""" 659 worktree = False 660 w = False 661 config_worktree = False 662 use_worktree = worktree or w or config_worktree 663 assert not use_worktree 664 665 666 class TestTerminalCWDIntegration: 667 """Test that TERMINAL_CWD is correctly set to the worktree path.""" 668 669 def test_terminal_cwd_set(self, git_repo): 670 """After worktree setup, TERMINAL_CWD should point to the worktree.""" 671 info = _setup_worktree(str(git_repo)) 672 assert info is not None 673 674 # This is what main() does: 675 os.environ["TERMINAL_CWD"] = info["path"] 676 assert os.environ["TERMINAL_CWD"] == info["path"] 677 assert Path(os.environ["TERMINAL_CWD"]).exists() 678 679 # Clean up env 680 del os.environ["TERMINAL_CWD"] 681 682 def test_terminal_cwd_is_valid_git_repo(self, git_repo): 683 """The TERMINAL_CWD worktree should be a valid git working tree.""" 684 info = _setup_worktree(str(git_repo)) 685 assert info is not None 686 687 result = subprocess.run( 688 ["git", "rev-parse", "--is-inside-work-tree"], 689 capture_output=True, text=True, cwd=info["path"], 690 ) 691 assert result.stdout.strip() == "true" 692 693 694 class TestOrphanedBranchPruning: 695 """Test cleanup of orphaned hermes/* and pr-* branches.""" 696 697 def test_prunes_orphaned_hermes_branch(self, git_repo): 698 """hermes/hermes-* branches with no worktree should be deleted.""" 699 # Create a branch that looks like a worktree branch but has no worktree 700 subprocess.run( 701 ["git", "branch", "hermes/hermes-deadbeef", "HEAD"], 702 cwd=str(git_repo), capture_output=True, 703 ) 704 705 # Verify it exists 706 result = subprocess.run( 707 ["git", "branch", "--list", "hermes/hermes-deadbeef"], 708 capture_output=True, text=True, cwd=str(git_repo), 709 ) 710 assert "hermes/hermes-deadbeef" in result.stdout 711 712 # Simulate _prune_orphaned_branches logic 713 result = subprocess.run( 714 ["git", "branch", "--format=%(refname:short)"], 715 capture_output=True, text=True, cwd=str(git_repo), 716 ) 717 all_branches = [b.strip() for b in result.stdout.strip().split("\n") if b.strip()] 718 719 wt_result = subprocess.run( 720 ["git", "worktree", "list", "--porcelain"], 721 capture_output=True, text=True, cwd=str(git_repo), 722 ) 723 active_branches = {"main"} 724 for line in wt_result.stdout.split("\n"): 725 if line.startswith("branch refs/heads/"): 726 active_branches.add(line.split("branch refs/heads/", 1)[-1].strip()) 727 728 orphaned = [ 729 b for b in all_branches 730 if b not in active_branches 731 and (b.startswith("hermes/hermes-") or b.startswith("pr-")) 732 ] 733 assert "hermes/hermes-deadbeef" in orphaned 734 735 # Delete them 736 if orphaned: 737 subprocess.run( 738 ["git", "branch", "-D"] + orphaned, 739 capture_output=True, text=True, cwd=str(git_repo), 740 ) 741 742 # Verify gone 743 result = subprocess.run( 744 ["git", "branch", "--list", "hermes/hermes-deadbeef"], 745 capture_output=True, text=True, cwd=str(git_repo), 746 ) 747 assert "hermes/hermes-deadbeef" not in result.stdout 748 749 def test_prunes_orphaned_pr_branch(self, git_repo): 750 """pr-* branches should be deleted during pruning.""" 751 subprocess.run( 752 ["git", "branch", "pr-1234", "HEAD"], 753 cwd=str(git_repo), capture_output=True, 754 ) 755 subprocess.run( 756 ["git", "branch", "pr-5678", "HEAD"], 757 cwd=str(git_repo), capture_output=True, 758 ) 759 760 result = subprocess.run( 761 ["git", "branch", "--format=%(refname:short)"], 762 capture_output=True, text=True, cwd=str(git_repo), 763 ) 764 all_branches = [b.strip() for b in result.stdout.strip().split("\n") if b.strip()] 765 766 active_branches = {"main"} 767 orphaned = [ 768 b for b in all_branches 769 if b not in active_branches and b.startswith("pr-") 770 ] 771 assert "pr-1234" in orphaned 772 assert "pr-5678" in orphaned 773 774 subprocess.run( 775 ["git", "branch", "-D"] + orphaned, 776 capture_output=True, text=True, cwd=str(git_repo), 777 ) 778 779 # Verify gone 780 result = subprocess.run( 781 ["git", "branch", "--format=%(refname:short)"], 782 capture_output=True, text=True, cwd=str(git_repo), 783 ) 784 remaining = result.stdout.strip() 785 assert "pr-1234" not in remaining 786 assert "pr-5678" not in remaining 787 788 def test_preserves_active_worktree_branch(self, git_repo): 789 """Branches with active worktrees should NOT be pruned.""" 790 info = _setup_worktree(str(git_repo)) 791 assert info is not None 792 793 result = subprocess.run( 794 ["git", "worktree", "list", "--porcelain"], 795 capture_output=True, text=True, cwd=str(git_repo), 796 ) 797 active_branches = set() 798 for line in result.stdout.split("\n"): 799 if line.startswith("branch refs/heads/"): 800 active_branches.add(line.split("branch refs/heads/", 1)[-1].strip()) 801 802 assert info["branch"] in active_branches # Protected 803 804 def test_preserves_main_branch(self, git_repo): 805 """main branch should never be pruned.""" 806 result = subprocess.run( 807 ["git", "branch", "--format=%(refname:short)"], 808 capture_output=True, text=True, cwd=str(git_repo), 809 ) 810 all_branches = [b.strip() for b in result.stdout.strip().split("\n") if b.strip()] 811 active_branches = {"main"} 812 813 orphaned = [ 814 b for b in all_branches 815 if b not in active_branches 816 and (b.startswith("hermes/hermes-") or b.startswith("pr-")) 817 ] 818 assert "main" not in orphaned 819 820 821 class TestSystemPromptInjection: 822 """Test that the agent gets worktree context in its system prompt.""" 823 824 def test_prompt_note_format(self, git_repo): 825 """Verify the system prompt note contains all required info.""" 826 info = _setup_worktree(str(git_repo)) 827 assert info is not None 828 829 # This is what main() does: 830 wt_note = ( 831 f"\n\n[System note: You are working in an isolated git worktree at " 832 f"{info['path']}. Your branch is `{info['branch']}`. " 833 f"Changes here do not affect the main working tree or other agents. " 834 f"Remember to commit and push your changes, and create a PR if appropriate. " 835 f"The original repo is at {info['repo_root']}.]\n" 836 ) 837 838 assert info["path"] in wt_note 839 assert info["branch"] in wt_note 840 assert info["repo_root"] in wt_note 841 assert "isolated git worktree" in wt_note 842 assert "commit and push" in wt_note