/ tests / cli / test_worktree.py
test_worktree.py
  1  """Tests for git worktree isolation (CLI --worktree / -w flag).
  2  
  3  Verifies worktree creation, cleanup, .worktreeinclude handling,
  4  .gitignore management, and integration with the CLI.  (#652)
  5  """
  6  
  7  import os
  8  import shutil
  9  import subprocess
 10  import pytest
 11  from pathlib import Path
 12  from unittest.mock import patch, MagicMock
 13  
 14  
 15  @pytest.fixture
 16  def git_repo(tmp_path):
 17      """Create a temporary git repo for testing."""
 18      repo = tmp_path / "test-repo"
 19      repo.mkdir()
 20      subprocess.run(["git", "init"], cwd=repo, capture_output=True)
 21      subprocess.run(
 22          ["git", "config", "user.email", "test@test.com"],
 23          cwd=repo, capture_output=True,
 24      )
 25      subprocess.run(
 26          ["git", "config", "user.name", "Test"],
 27          cwd=repo, capture_output=True,
 28      )
 29      # Create initial commit (worktrees need at least one commit)
 30      (repo / "README.md").write_text("# Test Repo\n")
 31      subprocess.run(["git", "add", "."], cwd=repo, capture_output=True)
 32      subprocess.run(
 33          ["git", "commit", "-m", "Initial commit"],
 34          cwd=repo, capture_output=True,
 35      )
 36      # Add a fake remote ref so cleanup logic sees the initial commit as
 37      # "pushed".  Without this, `git log HEAD --not --remotes` treats every
 38      # commit as unpushed and cleanup refuses to delete worktrees.
 39      subprocess.run(
 40          ["git", "update-ref", "refs/remotes/origin/main", "HEAD"],
 41          cwd=repo, capture_output=True,
 42      )
 43      return repo
 44  
 45  
 46  # ---------------------------------------------------------------------------
 47  # Lightweight reimplementations for testing (avoid importing cli.py)
 48  # ---------------------------------------------------------------------------
 49  
 50  def _git_repo_root(cwd=None):
 51      """Test version of _git_repo_root."""
 52      try:
 53          result = subprocess.run(
 54              ["git", "rev-parse", "--show-toplevel"],
 55              capture_output=True, text=True, timeout=5,
 56              cwd=cwd,
 57          )
 58          if result.returncode == 0:
 59              return result.stdout.strip()
 60      except Exception:
 61          pass
 62      return None
 63  
 64  
 65  def _setup_worktree(repo_root):
 66      """Test version of _setup_worktree — creates a worktree."""
 67      import uuid
 68      short_id = uuid.uuid4().hex[:8]
 69      wt_name = f"hermes-{short_id}"
 70      branch_name = f"hermes/{wt_name}"
 71  
 72      worktrees_dir = Path(repo_root) / ".worktrees"
 73      worktrees_dir.mkdir(parents=True, exist_ok=True)
 74      wt_path = worktrees_dir / wt_name
 75  
 76      result = subprocess.run(
 77          ["git", "worktree", "add", str(wt_path), "-b", branch_name, "HEAD"],
 78          capture_output=True, text=True, timeout=30, cwd=repo_root,
 79      )
 80      if result.returncode != 0:
 81          return None
 82  
 83      return {
 84          "path": str(wt_path),
 85          "branch": branch_name,
 86          "repo_root": repo_root,
 87      }
 88  
 89  
 90  def _cleanup_worktree(info):
 91      """Test version of _cleanup_worktree.
 92  
 93      Preserves the worktree only if it has unpushed commits.
 94      Dirty working tree alone is not enough to keep it.
 95      """
 96      wt_path = info["path"]
 97      branch = info["branch"]
 98      repo_root = info["repo_root"]
 99  
100      if not Path(wt_path).exists():
101          return
102  
103      # Check for unpushed commits
104      result = subprocess.run(
105          ["git", "log", "--oneline", "HEAD", "--not", "--remotes"],
106          capture_output=True, text=True, timeout=10, cwd=wt_path,
107      )
108      has_unpushed = bool(result.stdout.strip())
109  
110      if has_unpushed:
111          return False  # Did not clean up — has unpushed commits
112  
113      subprocess.run(
114          ["git", "worktree", "remove", wt_path, "--force"],
115          capture_output=True, text=True, timeout=15, cwd=repo_root,
116      )
117      subprocess.run(
118          ["git", "branch", "-D", branch],
119          capture_output=True, text=True, timeout=10, cwd=repo_root,
120      )
121      return True  # Cleaned up
122  
123  
124  # ---------------------------------------------------------------------------
125  # Tests
126  # ---------------------------------------------------------------------------
127  
128  class TestGitRepoDetection:
129      """Test git repo root detection."""
130  
131      def test_detects_git_repo(self, git_repo):
132          root = _git_repo_root(cwd=str(git_repo))
133          assert root is not None
134          assert Path(root).resolve() == git_repo.resolve()
135  
136      def test_detects_subdirectory(self, git_repo):
137          subdir = git_repo / "src" / "lib"
138          subdir.mkdir(parents=True)
139          root = _git_repo_root(cwd=str(subdir))
140          assert root is not None
141          assert Path(root).resolve() == git_repo.resolve()
142  
143      def test_returns_none_outside_repo(self, tmp_path):
144          # tmp_path itself is not a git repo
145          bare_dir = tmp_path / "not-a-repo"
146          bare_dir.mkdir()
147          root = _git_repo_root(cwd=str(bare_dir))
148          assert root is None
149  
150  
151  class TestWorktreeCreation:
152      """Test worktree setup."""
153  
154      def test_creates_worktree(self, git_repo):
155          info = _setup_worktree(str(git_repo))
156          assert info is not None
157          assert Path(info["path"]).exists()
158          assert info["branch"].startswith("hermes/hermes-")
159          assert info["repo_root"] == str(git_repo)
160  
161          # Verify it's a valid git worktree
162          result = subprocess.run(
163              ["git", "rev-parse", "--is-inside-work-tree"],
164              capture_output=True, text=True, cwd=info["path"],
165          )
166          assert result.stdout.strip() == "true"
167  
168      def test_worktree_has_own_branch(self, git_repo):
169          info = _setup_worktree(str(git_repo))
170          assert info is not None
171  
172          # Check branch name in worktree
173          result = subprocess.run(
174              ["git", "branch", "--show-current"],
175              capture_output=True, text=True, cwd=info["path"],
176          )
177          assert result.stdout.strip() == info["branch"]
178  
179      def test_worktree_is_independent(self, git_repo):
180          """Two worktrees from the same repo are independent."""
181          info1 = _setup_worktree(str(git_repo))
182          info2 = _setup_worktree(str(git_repo))
183          assert info1 is not None
184          assert info2 is not None
185          assert info1["path"] != info2["path"]
186          assert info1["branch"] != info2["branch"]
187  
188          # Create a file in worktree 1
189          (Path(info1["path"]) / "only-in-wt1.txt").write_text("hello")
190  
191          # It should NOT appear in worktree 2
192          assert not (Path(info2["path"]) / "only-in-wt1.txt").exists()
193  
194      def test_worktrees_dir_created(self, git_repo):
195          info = _setup_worktree(str(git_repo))
196          assert info is not None
197          assert (git_repo / ".worktrees").is_dir()
198  
199      def test_worktree_has_repo_files(self, git_repo):
200          """Worktree should contain the repo's tracked files."""
201          info = _setup_worktree(str(git_repo))
202          assert info is not None
203          assert (Path(info["path"]) / "README.md").exists()
204  
205  
206  class TestWorktreeCleanup:
207      """Test worktree cleanup on exit."""
208  
209      def test_clean_worktree_removed(self, git_repo):
210          info = _setup_worktree(str(git_repo))
211          assert info is not None
212          assert Path(info["path"]).exists()
213  
214          result = _cleanup_worktree(info)
215          assert result is True
216          assert not Path(info["path"]).exists()
217  
218      def test_dirty_worktree_cleaned_when_no_unpushed(self, git_repo):
219          """Dirty working tree without unpushed commits is cleaned up.
220  
221          Agent sessions typically leave untracked files / artifacts behind.
222          Since all real work is in pushed commits, these don't warrant
223          keeping the worktree.
224          """
225          info = _setup_worktree(str(git_repo))
226          assert info is not None
227  
228          # Make uncommitted changes (untracked file)
229          (Path(info["path"]) / "new-file.txt").write_text("uncommitted")
230          subprocess.run(
231              ["git", "add", "new-file.txt"],
232              cwd=info["path"], capture_output=True,
233          )
234  
235          # The git_repo fixture already has a fake remote ref so the initial
236          # commit is seen as "pushed".  No unpushed commits → cleanup proceeds.
237          result = _cleanup_worktree(info)
238          assert result is True  # Cleaned up despite dirty working tree
239          assert not Path(info["path"]).exists()
240  
241      def test_worktree_with_unpushed_commits_kept(self, git_repo):
242          """Worktree with unpushed commits is preserved."""
243          info = _setup_worktree(str(git_repo))
244          assert info is not None
245  
246          # Make a commit that is NOT on any remote
247          (Path(info["path"]) / "work.txt").write_text("real work")
248          subprocess.run(["git", "add", "work.txt"], cwd=info["path"], capture_output=True)
249          subprocess.run(
250              ["git", "commit", "-m", "agent work"],
251              cwd=info["path"], capture_output=True,
252          )
253  
254          result = _cleanup_worktree(info)
255          assert result is False  # Kept — has unpushed commits
256          assert Path(info["path"]).exists()
257  
258      def test_branch_deleted_on_cleanup(self, git_repo):
259          info = _setup_worktree(str(git_repo))
260          branch = info["branch"]
261  
262          _cleanup_worktree(info)
263  
264          # Branch should be gone
265          result = subprocess.run(
266              ["git", "branch", "--list", branch],
267              capture_output=True, text=True, cwd=str(git_repo),
268          )
269          assert branch not in result.stdout
270  
271      def test_cleanup_nonexistent_worktree(self, git_repo):
272          """Cleanup should handle already-removed worktrees gracefully."""
273          info = {
274              "path": str(git_repo / ".worktrees" / "nonexistent"),
275              "branch": "hermes/nonexistent",
276              "repo_root": str(git_repo),
277          }
278          # Should not raise
279          _cleanup_worktree(info)
280  
281  
282  class TestWorktreeInclude:
283      """Test .worktreeinclude file handling."""
284  
285      def test_copies_included_files(self, git_repo):
286          """Files listed in .worktreeinclude should be copied to the worktree."""
287          # Create a .env file (gitignored)
288          (git_repo / ".env").write_text("SECRET=abc123")
289          (git_repo / ".gitignore").write_text(".env\n.worktrees/\n")
290          subprocess.run(
291              ["git", "add", ".gitignore"],
292              cwd=str(git_repo), capture_output=True,
293          )
294          subprocess.run(
295              ["git", "commit", "-m", "Add gitignore"],
296              cwd=str(git_repo), capture_output=True,
297          )
298  
299          # Create .worktreeinclude
300          (git_repo / ".worktreeinclude").write_text(".env\n")
301  
302          # Import and use the real _setup_worktree logic for include handling
303          info = _setup_worktree(str(git_repo))
304          assert info is not None
305  
306          # Manually copy .worktreeinclude entries (mirrors cli.py logic)
307          import shutil
308          include_file = git_repo / ".worktreeinclude"
309          wt_path = Path(info["path"])
310          for line in include_file.read_text().splitlines():
311              entry = line.strip()
312              if not entry or entry.startswith("#"):
313                  continue
314              src = git_repo / entry
315              dst = wt_path / entry
316              if src.is_file():
317                  dst.parent.mkdir(parents=True, exist_ok=True)
318                  shutil.copy2(str(src), str(dst))
319  
320          # Verify .env was copied
321          assert (wt_path / ".env").exists()
322          assert (wt_path / ".env").read_text() == "SECRET=abc123"
323  
324      def test_ignores_comments_and_blanks(self, git_repo):
325          """Comments and blank lines in .worktreeinclude should be skipped."""
326          (git_repo / ".worktreeinclude").write_text(
327              "# This is a comment\n"
328              "\n"
329              "  # Another comment\n"
330          )
331          info = _setup_worktree(str(git_repo))
332          assert info is not None
333          # Should not crash — just skip all lines
334  
335  
336  class TestGitignoreManagement:
337      """Test that .worktrees/ is added to .gitignore."""
338  
339      def test_adds_to_gitignore(self, git_repo):
340          """Creating a worktree should add .worktrees/ to .gitignore."""
341          # Remove any existing .gitignore
342          gitignore = git_repo / ".gitignore"
343          if gitignore.exists():
344              gitignore.unlink()
345  
346          info = _setup_worktree(str(git_repo))
347          assert info is not None
348  
349          # Now manually add .worktrees/ to .gitignore (mirrors cli.py logic)
350          _ignore_entry = ".worktrees/"
351          existing = gitignore.read_text() if gitignore.exists() else ""
352          if _ignore_entry not in existing.splitlines():
353              with open(gitignore, "a") as f:
354                  if existing and not existing.endswith("\n"):
355                      f.write("\n")
356                  f.write(f"{_ignore_entry}\n")
357  
358          content = gitignore.read_text()
359          assert ".worktrees/" in content
360  
361      def test_does_not_duplicate_gitignore_entry(self, git_repo):
362          """If .worktrees/ is already in .gitignore, don't add again."""
363          gitignore = git_repo / ".gitignore"
364          gitignore.write_text(".worktrees/\n")
365  
366          # The check should see it's already there
367          existing = gitignore.read_text()
368          assert ".worktrees/" in existing.splitlines()
369  
370  
371  class TestMultipleWorktrees:
372      """Test running multiple worktrees concurrently (the core use case)."""
373  
374      def test_ten_concurrent_worktrees(self, git_repo):
375          """Create 10 worktrees — simulating 10 parallel agents."""
376          worktrees = []
377          for _ in range(10):
378              info = _setup_worktree(str(git_repo))
379              assert info is not None
380              worktrees.append(info)
381  
382          # All should exist and be independent
383          paths = [info["path"] for info in worktrees]
384          assert len(set(paths)) == 10  # All unique
385  
386          # Each should have the repo files
387          for info in worktrees:
388              assert (Path(info["path"]) / "README.md").exists()
389  
390          # Edit a file in one worktree
391          (Path(worktrees[0]["path"]) / "README.md").write_text("Modified in wt0")
392  
393          # Others should be unaffected
394          for info in worktrees[1:]:
395              assert (Path(info["path"]) / "README.md").read_text() == "# Test Repo\n"
396  
397          # List worktrees via git
398          result = subprocess.run(
399              ["git", "worktree", "list"],
400              capture_output=True, text=True, cwd=str(git_repo),
401          )
402          # Should have 11 entries: main + 10 worktrees
403          lines = [l for l in result.stdout.strip().splitlines() if l.strip()]
404          assert len(lines) == 11
405  
406          # Cleanup all (git_repo fixture has a fake remote ref so cleanup works)
407          for info in worktrees:
408              # Discard changes first so cleanup works
409              subprocess.run(
410                  ["git", "checkout", "--", "."],
411                  cwd=info["path"], capture_output=True,
412              )
413              _cleanup_worktree(info)
414  
415          # All should be removed
416          for info in worktrees:
417              assert not Path(info["path"]).exists()
418  
419  
420  class TestWorktreeDirectorySymlink:
421      """Test .worktreeinclude with directories (symlinked)."""
422  
423      def test_symlinks_directory(self, git_repo):
424          """Directories in .worktreeinclude should be symlinked."""
425          # Create a .venv directory
426          venv_dir = git_repo / ".venv" / "lib"
427          venv_dir.mkdir(parents=True)
428          (venv_dir / "marker.txt").write_text("venv marker")
429          (git_repo / ".gitignore").write_text(".venv/\n.worktrees/\n")
430          subprocess.run(
431              ["git", "add", ".gitignore"], cwd=str(git_repo), capture_output=True
432          )
433          subprocess.run(
434              ["git", "commit", "-m", "gitignore"], cwd=str(git_repo), capture_output=True
435          )
436  
437          (git_repo / ".worktreeinclude").write_text(".venv/\n")
438  
439          info = _setup_worktree(str(git_repo))
440          assert info is not None
441  
442          wt_path = Path(info["path"])
443          src = git_repo / ".venv"
444          dst = wt_path / ".venv"
445  
446          # Manually symlink (mirrors cli.py logic)
447          if not dst.exists():
448              dst.parent.mkdir(parents=True, exist_ok=True)
449              os.symlink(str(src.resolve()), str(dst))
450  
451          assert dst.is_symlink()
452          assert (dst / "lib" / "marker.txt").read_text() == "venv marker"
453  
454  
455  class TestStaleWorktreePruning:
456      """Test _prune_stale_worktrees garbage collection."""
457  
458      def test_prunes_old_clean_worktree(self, git_repo):
459          """Old clean worktrees should be removed on prune."""
460          import time
461  
462          info = _setup_worktree(str(git_repo))
463          assert info is not None
464          assert Path(info["path"]).exists()
465  
466          # Make the worktree look old (set mtime to 25h ago)
467          old_time = time.time() - (25 * 3600)
468          os.utime(info["path"], (old_time, old_time))
469  
470          # Reimplementation of prune logic (matches cli.py)
471          worktrees_dir = git_repo / ".worktrees"
472          cutoff = time.time() - (24 * 3600)
473  
474          for entry in worktrees_dir.iterdir():
475              if not entry.is_dir() or not entry.name.startswith("hermes-"):
476                  continue
477              try:
478                  mtime = entry.stat().st_mtime
479                  if mtime > cutoff:
480                      continue
481              except Exception:
482                  continue
483  
484              status = subprocess.run(
485                  ["git", "status", "--porcelain"],
486                  capture_output=True, text=True, timeout=5, cwd=str(entry),
487              )
488              if status.stdout.strip():
489                  continue
490  
491              branch_result = subprocess.run(
492                  ["git", "branch", "--show-current"],
493                  capture_output=True, text=True, timeout=5, cwd=str(entry),
494              )
495              branch = branch_result.stdout.strip()
496              subprocess.run(
497                  ["git", "worktree", "remove", str(entry), "--force"],
498                  capture_output=True, text=True, timeout=15, cwd=str(git_repo),
499              )
500              if branch:
501                  subprocess.run(
502                      ["git", "branch", "-D", branch],
503                      capture_output=True, text=True, timeout=10, cwd=str(git_repo),
504                  )
505  
506          assert not Path(info["path"]).exists()
507  
508      def test_keeps_recent_worktree(self, git_repo):
509          """Recent worktrees should NOT be pruned."""
510          import time
511  
512          info = _setup_worktree(str(git_repo))
513          assert info is not None
514  
515          # Don't modify mtime — it's recent
516          worktrees_dir = git_repo / ".worktrees"
517          cutoff = time.time() - (24 * 3600)
518  
519          pruned = False
520          for entry in worktrees_dir.iterdir():
521              if not entry.is_dir() or not entry.name.startswith("hermes-"):
522                  continue
523              mtime = entry.stat().st_mtime
524              if mtime > cutoff:
525                  continue  # Too recent
526              pruned = True
527  
528          assert not pruned
529          assert Path(info["path"]).exists()
530  
531      def test_keeps_old_worktree_with_unpushed_commits(self, git_repo):
532          """Old worktrees (24-72h) with unpushed commits should NOT be pruned."""
533          import time
534  
535          info = _setup_worktree(str(git_repo))
536          assert info is not None
537  
538          # Make an unpushed commit
539          (Path(info["path"]) / "work.txt").write_text("real work")
540          subprocess.run(["git", "add", "work.txt"], cwd=info["path"], capture_output=True)
541          subprocess.run(
542              ["git", "commit", "-m", "agent work"],
543              cwd=info["path"], capture_output=True,
544          )
545  
546          # Make it old (25h — in the 24-72h soft tier)
547          old_time = time.time() - (25 * 3600)
548          os.utime(info["path"], (old_time, old_time))
549  
550          # Check for unpushed commits (simulates prune logic)
551          result = subprocess.run(
552              ["git", "log", "--oneline", "HEAD", "--not", "--remotes"],
553              capture_output=True, text=True, cwd=info["path"],
554          )
555          has_unpushed = bool(result.stdout.strip())
556          assert has_unpushed  # Has unpushed commits → not pruned in soft tier
557          assert Path(info["path"]).exists()
558  
559      def test_force_prunes_very_old_worktree(self, git_repo):
560          """Worktrees older than 72h should be force-pruned regardless."""
561          import time
562  
563          info = _setup_worktree(str(git_repo))
564          assert info is not None
565  
566          # Make an unpushed commit (would normally protect it)
567          (Path(info["path"]) / "work.txt").write_text("stale work")
568          subprocess.run(["git", "add", "work.txt"], cwd=info["path"], capture_output=True)
569          subprocess.run(
570              ["git", "commit", "-m", "old agent work"],
571              cwd=info["path"], capture_output=True,
572          )
573  
574          # Make it very old (73h — beyond the 72h hard threshold)
575          old_time = time.time() - (73 * 3600)
576          os.utime(info["path"], (old_time, old_time))
577  
578          # Simulate the force-prune tier check
579          hard_cutoff = time.time() - (72 * 3600)
580          mtime = Path(info["path"]).stat().st_mtime
581          assert mtime <= hard_cutoff  # Should qualify for force removal
582  
583          # Actually remove it (simulates _prune_stale_worktrees force path)
584          branch_result = subprocess.run(
585              ["git", "branch", "--show-current"],
586              capture_output=True, text=True, timeout=5, cwd=info["path"],
587          )
588          branch = branch_result.stdout.strip()
589  
590          subprocess.run(
591              ["git", "worktree", "remove", info["path"], "--force"],
592              capture_output=True, text=True, timeout=15, cwd=str(git_repo),
593          )
594          if branch:
595              subprocess.run(
596                  ["git", "branch", "-D", branch],
597                  capture_output=True, text=True, timeout=10, cwd=str(git_repo),
598              )
599  
600          assert not Path(info["path"]).exists()
601  
602  
603  class TestEdgeCases:
604      """Test edge cases for robustness."""
605  
606      def test_no_commits_repo(self, tmp_path):
607          """Worktree creation should fail gracefully on a repo with no commits."""
608          repo = tmp_path / "empty-repo"
609          repo.mkdir()
610          subprocess.run(["git", "init"], cwd=str(repo), capture_output=True)
611  
612          info = _setup_worktree(str(repo))
613          assert info is None  # Should fail gracefully
614  
615      def test_not_a_git_repo(self, tmp_path):
616          """Repo detection should return None for non-git directories."""
617          bare = tmp_path / "not-git"
618          bare.mkdir()
619          root = _git_repo_root(cwd=str(bare))
620          assert root is None
621  
622      def test_worktrees_dir_already_exists(self, git_repo):
623          """Should work fine if .worktrees/ already exists."""
624          (git_repo / ".worktrees").mkdir(exist_ok=True)
625          info = _setup_worktree(str(git_repo))
626          assert info is not None
627          assert Path(info["path"]).exists()
628  
629  
630  class TestCLIFlagLogic:
631      """Test the flag/config OR logic from main()."""
632  
633      def test_worktree_flag_triggers(self):
634          """--worktree flag should trigger worktree creation."""
635          worktree = True
636          w = False
637          config_worktree = False
638          use_worktree = worktree or w or config_worktree
639          assert use_worktree
640  
641      def test_w_flag_triggers(self):
642          """-w flag should trigger worktree creation."""
643          worktree = False
644          w = True
645          config_worktree = False
646          use_worktree = worktree or w or config_worktree
647          assert use_worktree
648  
649      def test_config_triggers(self):
650          """worktree: true in config should trigger worktree creation."""
651          worktree = False
652          w = False
653          config_worktree = True
654          use_worktree = worktree or w or config_worktree
655          assert use_worktree
656  
657      def test_none_set_no_trigger(self):
658          """No flags and no config should not trigger."""
659          worktree = False
660          w = False
661          config_worktree = False
662          use_worktree = worktree or w or config_worktree
663          assert not use_worktree
664  
665  
666  class TestTerminalCWDIntegration:
667      """Test that TERMINAL_CWD is correctly set to the worktree path."""
668  
669      def test_terminal_cwd_set(self, git_repo):
670          """After worktree setup, TERMINAL_CWD should point to the worktree."""
671          info = _setup_worktree(str(git_repo))
672          assert info is not None
673  
674          # This is what main() does:
675          os.environ["TERMINAL_CWD"] = info["path"]
676          assert os.environ["TERMINAL_CWD"] == info["path"]
677          assert Path(os.environ["TERMINAL_CWD"]).exists()
678  
679          # Clean up env
680          del os.environ["TERMINAL_CWD"]
681  
682      def test_terminal_cwd_is_valid_git_repo(self, git_repo):
683          """The TERMINAL_CWD worktree should be a valid git working tree."""
684          info = _setup_worktree(str(git_repo))
685          assert info is not None
686  
687          result = subprocess.run(
688              ["git", "rev-parse", "--is-inside-work-tree"],
689              capture_output=True, text=True, cwd=info["path"],
690          )
691          assert result.stdout.strip() == "true"
692  
693  
694  class TestOrphanedBranchPruning:
695      """Test cleanup of orphaned hermes/* and pr-* branches."""
696  
697      def test_prunes_orphaned_hermes_branch(self, git_repo):
698          """hermes/hermes-* branches with no worktree should be deleted."""
699          # Create a branch that looks like a worktree branch but has no worktree
700          subprocess.run(
701              ["git", "branch", "hermes/hermes-deadbeef", "HEAD"],
702              cwd=str(git_repo), capture_output=True,
703          )
704  
705          # Verify it exists
706          result = subprocess.run(
707              ["git", "branch", "--list", "hermes/hermes-deadbeef"],
708              capture_output=True, text=True, cwd=str(git_repo),
709          )
710          assert "hermes/hermes-deadbeef" in result.stdout
711  
712          # Simulate _prune_orphaned_branches logic
713          result = subprocess.run(
714              ["git", "branch", "--format=%(refname:short)"],
715              capture_output=True, text=True, cwd=str(git_repo),
716          )
717          all_branches = [b.strip() for b in result.stdout.strip().split("\n") if b.strip()]
718  
719          wt_result = subprocess.run(
720              ["git", "worktree", "list", "--porcelain"],
721              capture_output=True, text=True, cwd=str(git_repo),
722          )
723          active_branches = {"main"}
724          for line in wt_result.stdout.split("\n"):
725              if line.startswith("branch refs/heads/"):
726                  active_branches.add(line.split("branch refs/heads/", 1)[-1].strip())
727  
728          orphaned = [
729              b for b in all_branches
730              if b not in active_branches
731              and (b.startswith("hermes/hermes-") or b.startswith("pr-"))
732          ]
733          assert "hermes/hermes-deadbeef" in orphaned
734  
735          # Delete them
736          if orphaned:
737              subprocess.run(
738                  ["git", "branch", "-D"] + orphaned,
739                  capture_output=True, text=True, cwd=str(git_repo),
740              )
741  
742          # Verify gone
743          result = subprocess.run(
744              ["git", "branch", "--list", "hermes/hermes-deadbeef"],
745              capture_output=True, text=True, cwd=str(git_repo),
746          )
747          assert "hermes/hermes-deadbeef" not in result.stdout
748  
749      def test_prunes_orphaned_pr_branch(self, git_repo):
750          """pr-* branches should be deleted during pruning."""
751          subprocess.run(
752              ["git", "branch", "pr-1234", "HEAD"],
753              cwd=str(git_repo), capture_output=True,
754          )
755          subprocess.run(
756              ["git", "branch", "pr-5678", "HEAD"],
757              cwd=str(git_repo), capture_output=True,
758          )
759  
760          result = subprocess.run(
761              ["git", "branch", "--format=%(refname:short)"],
762              capture_output=True, text=True, cwd=str(git_repo),
763          )
764          all_branches = [b.strip() for b in result.stdout.strip().split("\n") if b.strip()]
765  
766          active_branches = {"main"}
767          orphaned = [
768              b for b in all_branches
769              if b not in active_branches and b.startswith("pr-")
770          ]
771          assert "pr-1234" in orphaned
772          assert "pr-5678" in orphaned
773  
774          subprocess.run(
775              ["git", "branch", "-D"] + orphaned,
776              capture_output=True, text=True, cwd=str(git_repo),
777          )
778  
779          # Verify gone
780          result = subprocess.run(
781              ["git", "branch", "--format=%(refname:short)"],
782              capture_output=True, text=True, cwd=str(git_repo),
783          )
784          remaining = result.stdout.strip()
785          assert "pr-1234" not in remaining
786          assert "pr-5678" not in remaining
787  
788      def test_preserves_active_worktree_branch(self, git_repo):
789          """Branches with active worktrees should NOT be pruned."""
790          info = _setup_worktree(str(git_repo))
791          assert info is not None
792  
793          result = subprocess.run(
794              ["git", "worktree", "list", "--porcelain"],
795              capture_output=True, text=True, cwd=str(git_repo),
796          )
797          active_branches = set()
798          for line in result.stdout.split("\n"):
799              if line.startswith("branch refs/heads/"):
800                  active_branches.add(line.split("branch refs/heads/", 1)[-1].strip())
801  
802          assert info["branch"] in active_branches  # Protected
803  
804      def test_preserves_main_branch(self, git_repo):
805          """main branch should never be pruned."""
806          result = subprocess.run(
807              ["git", "branch", "--format=%(refname:short)"],
808              capture_output=True, text=True, cwd=str(git_repo),
809          )
810          all_branches = [b.strip() for b in result.stdout.strip().split("\n") if b.strip()]
811          active_branches = {"main"}
812  
813          orphaned = [
814              b for b in all_branches
815              if b not in active_branches
816              and (b.startswith("hermes/hermes-") or b.startswith("pr-"))
817          ]
818          assert "main" not in orphaned
819  
820  
821  class TestSystemPromptInjection:
822      """Test that the agent gets worktree context in its system prompt."""
823  
824      def test_prompt_note_format(self, git_repo):
825          """Verify the system prompt note contains all required info."""
826          info = _setup_worktree(str(git_repo))
827          assert info is not None
828  
829          # This is what main() does:
830          wt_note = (
831              f"\n\n[System note: You are working in an isolated git worktree at "
832              f"{info['path']}. Your branch is `{info['branch']}`. "
833              f"Changes here do not affect the main working tree or other agents. "
834              f"Remember to commit and push your changes, and create a PR if appropriate. "
835              f"The original repo is at {info['repo_root']}.]\n"
836          )
837  
838          assert info["path"] in wt_note
839          assert info["branch"] in wt_note
840          assert info["repo_root"] in wt_note
841          assert "isolated git worktree" in wt_note
842          assert "commit and push" in wt_note