/ tests / tools / test_checkpoint_manager.py
test_checkpoint_manager.py
  1  """Tests for tools/checkpoint_manager.py — CheckpointManager."""
  2  
  3  import logging
  4  import subprocess
  5  import pytest
  6  from pathlib import Path
  7  from unittest.mock import patch
  8  
  9  from tools.checkpoint_manager import (
 10      CheckpointManager,
 11      _shadow_repo_path,
 12      _init_shadow_repo,
 13      _run_git,
 14      _git_env,
 15      _dir_file_count,
 16      format_checkpoint_list,
 17      DEFAULT_EXCLUDES,
 18      CHECKPOINT_BASE,
 19  )
 20  
 21  
 22  # =========================================================================
 23  # Fixtures
 24  # =========================================================================
 25  
 26  @pytest.fixture()
 27  def work_dir(tmp_path):
 28      """Temporary working directory."""
 29      d = tmp_path / "project"
 30      d.mkdir()
 31      (d / "main.py").write_text("print('hello')\\n")
 32      (d / "README.md").write_text("# Project\\n")
 33      return d
 34  
 35  
 36  @pytest.fixture()
 37  def checkpoint_base(tmp_path):
 38      """Isolated checkpoint base — never writes to ~/.hermes/."""
 39      return tmp_path / "checkpoints"
 40  
 41  
 42  @pytest.fixture()
 43  def fake_home(tmp_path, monkeypatch):
 44      """Set a deterministic fake home for expanduser/path-home behavior."""
 45      home = tmp_path / "home"
 46      home.mkdir()
 47      monkeypatch.setenv("HOME", str(home))
 48      monkeypatch.setenv("USERPROFILE", str(home))
 49      monkeypatch.delenv("HOMEDRIVE", raising=False)
 50      monkeypatch.delenv("HOMEPATH", raising=False)
 51      monkeypatch.setattr(Path, "home", classmethod(lambda cls: home))
 52      return home
 53  
 54  
 55  @pytest.fixture()
 56  def mgr(work_dir, checkpoint_base, monkeypatch):
 57      """CheckpointManager with redirected checkpoint base."""
 58      monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
 59      return CheckpointManager(enabled=True, max_snapshots=50)
 60  
 61  
 62  @pytest.fixture()
 63  def disabled_mgr(checkpoint_base, monkeypatch):
 64      """Disabled CheckpointManager."""
 65      monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
 66      return CheckpointManager(enabled=False)
 67  
 68  
 69  # =========================================================================
 70  # Shadow repo path
 71  # =========================================================================
 72  
 73  class TestShadowRepoPath:
 74      def test_deterministic(self, work_dir, checkpoint_base, monkeypatch):
 75          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
 76          p1 = _shadow_repo_path(str(work_dir))
 77          p2 = _shadow_repo_path(str(work_dir))
 78          assert p1 == p2
 79  
 80      def test_different_dirs_different_paths(self, tmp_path, checkpoint_base, monkeypatch):
 81          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
 82          p1 = _shadow_repo_path(str(tmp_path / "a"))
 83          p2 = _shadow_repo_path(str(tmp_path / "b"))
 84          assert p1 != p2
 85  
 86      def test_under_checkpoint_base(self, work_dir, checkpoint_base, monkeypatch):
 87          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
 88          p = _shadow_repo_path(str(work_dir))
 89          assert str(p).startswith(str(checkpoint_base))
 90  
 91      def test_tilde_and_expanded_home_share_shadow_repo(self, fake_home, checkpoint_base, monkeypatch):
 92          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
 93          project = fake_home / "project"
 94          project.mkdir()
 95  
 96          tilde_path = f"~/{project.name}"
 97          expanded_path = str(project)
 98  
 99          assert _shadow_repo_path(tilde_path) == _shadow_repo_path(expanded_path)
100  
101  
102  # =========================================================================
103  # Shadow repo init
104  # =========================================================================
105  
106  class TestShadowRepoInit:
107      def test_creates_git_repo(self, work_dir, checkpoint_base, monkeypatch):
108          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
109          shadow = _shadow_repo_path(str(work_dir))
110          err = _init_shadow_repo(shadow, str(work_dir))
111          assert err is None
112          assert (shadow / "HEAD").exists()
113  
114      def test_no_git_in_project_dir(self, work_dir, checkpoint_base, monkeypatch):
115          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
116          shadow = _shadow_repo_path(str(work_dir))
117          _init_shadow_repo(shadow, str(work_dir))
118          assert not (work_dir / ".git").exists()
119  
120      def test_has_exclude_file(self, work_dir, checkpoint_base, monkeypatch):
121          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
122          shadow = _shadow_repo_path(str(work_dir))
123          _init_shadow_repo(shadow, str(work_dir))
124          exclude = shadow / "info" / "exclude"
125          assert exclude.exists()
126          content = exclude.read_text()
127          assert "node_modules/" in content
128          assert ".env" in content
129  
130      def test_has_workdir_file(self, work_dir, checkpoint_base, monkeypatch):
131          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
132          shadow = _shadow_repo_path(str(work_dir))
133          _init_shadow_repo(shadow, str(work_dir))
134          workdir_file = shadow / "HERMES_WORKDIR"
135          assert workdir_file.exists()
136          assert str(work_dir.resolve()) in workdir_file.read_text()
137  
138      def test_idempotent(self, work_dir, checkpoint_base, monkeypatch):
139          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
140          shadow = _shadow_repo_path(str(work_dir))
141          err1 = _init_shadow_repo(shadow, str(work_dir))
142          err2 = _init_shadow_repo(shadow, str(work_dir))
143          assert err1 is None
144          assert err2 is None
145  
146  
147  # =========================================================================
148  # CheckpointManager — disabled
149  # =========================================================================
150  
151  class TestDisabledManager:
152      def test_ensure_checkpoint_returns_false(self, disabled_mgr, work_dir):
153          assert disabled_mgr.ensure_checkpoint(str(work_dir)) is False
154  
155      def test_new_turn_works(self, disabled_mgr):
156          disabled_mgr.new_turn()  # should not raise
157  
158  
159  # =========================================================================
160  # CheckpointManager — taking checkpoints
161  # =========================================================================
162  
163  class TestTakeCheckpoint:
164      def test_first_checkpoint(self, mgr, work_dir):
165          result = mgr.ensure_checkpoint(str(work_dir), "initial")
166          assert result is True
167  
168      def test_successful_checkpoint_does_not_log_expected_diff_exit(self, mgr, work_dir, caplog):
169          with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
170              result = mgr.ensure_checkpoint(str(work_dir), "initial")
171          assert result is True
172          assert not any("diff --cached --quiet" in r.getMessage() for r in caplog.records)
173  
174      def test_dedup_same_turn(self, mgr, work_dir):
175          r1 = mgr.ensure_checkpoint(str(work_dir), "first")
176          r2 = mgr.ensure_checkpoint(str(work_dir), "second")
177          assert r1 is True
178          assert r2 is False  # dedup'd
179  
180      def test_new_turn_resets_dedup(self, mgr, work_dir):
181          r1 = mgr.ensure_checkpoint(str(work_dir), "turn 1")
182          assert r1 is True
183  
184          mgr.new_turn()
185  
186          # Modify a file so there's something to commit
187          (work_dir / "main.py").write_text("print('modified')\\n")
188          r2 = mgr.ensure_checkpoint(str(work_dir), "turn 2")
189          assert r2 is True
190  
191      def test_no_changes_skips_commit(self, mgr, work_dir):
192          # First checkpoint
193          mgr.ensure_checkpoint(str(work_dir), "initial")
194          mgr.new_turn()
195  
196          # No file changes — should return False (nothing to commit)
197          r = mgr.ensure_checkpoint(str(work_dir), "no changes")
198          assert r is False
199  
200      def test_skip_root_dir(self, mgr):
201          r = mgr.ensure_checkpoint("/", "root")
202          assert r is False
203  
204      def test_skip_home_dir(self, mgr):
205          r = mgr.ensure_checkpoint(str(Path.home()), "home")
206          assert r is False
207  
208  
209  # =========================================================================
210  # CheckpointManager — listing checkpoints
211  # =========================================================================
212  
213  class TestListCheckpoints:
214      def test_empty_when_no_checkpoints(self, mgr, work_dir):
215          result = mgr.list_checkpoints(str(work_dir))
216          assert result == []
217  
218      def test_list_after_take(self, mgr, work_dir):
219          mgr.ensure_checkpoint(str(work_dir), "test checkpoint")
220          result = mgr.list_checkpoints(str(work_dir))
221          assert len(result) == 1
222          assert result[0]["reason"] == "test checkpoint"
223          assert "hash" in result[0]
224          assert "short_hash" in result[0]
225          assert "timestamp" in result[0]
226  
227      def test_multiple_checkpoints_ordered(self, mgr, work_dir):
228          mgr.ensure_checkpoint(str(work_dir), "first")
229          mgr.new_turn()
230  
231          (work_dir / "main.py").write_text("v2\\n")
232          mgr.ensure_checkpoint(str(work_dir), "second")
233          mgr.new_turn()
234  
235          (work_dir / "main.py").write_text("v3\\n")
236          mgr.ensure_checkpoint(str(work_dir), "third")
237  
238          result = mgr.list_checkpoints(str(work_dir))
239          assert len(result) == 3
240          # Most recent first
241          assert result[0]["reason"] == "third"
242          assert result[2]["reason"] == "first"
243  
244      def test_tilde_path_lists_same_checkpoints_as_expanded_path(self, checkpoint_base, fake_home, monkeypatch):
245          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
246          mgr = CheckpointManager(enabled=True, max_snapshots=50)
247          project = fake_home / "project"
248          project.mkdir()
249          (project / "main.py").write_text("v1\n")
250  
251          tilde_path = f"~/{project.name}"
252          assert mgr.ensure_checkpoint(tilde_path, "initial") is True
253  
254          listed = mgr.list_checkpoints(str(project))
255          assert len(listed) == 1
256          assert listed[0]["reason"] == "initial"
257  
258  
259  # =========================================================================
260  # CheckpointManager — restoring
261  # =========================================================================
262  
263  class TestRestore:
264      def test_restore_to_previous(self, mgr, work_dir):
265          # Write original content
266          (work_dir / "main.py").write_text("original\\n")
267          mgr.ensure_checkpoint(str(work_dir), "original state")
268          mgr.new_turn()
269  
270          # Modify the file
271          (work_dir / "main.py").write_text("modified\\n")
272  
273          # Get the checkpoint hash
274          checkpoints = mgr.list_checkpoints(str(work_dir))
275          assert len(checkpoints) == 1
276  
277          # Restore
278          result = mgr.restore(str(work_dir), checkpoints[0]["hash"])
279          assert result["success"] is True
280  
281          # File should be back to original
282          assert (work_dir / "main.py").read_text() == "original\\n"
283  
284      def test_restore_invalid_hash(self, mgr, work_dir):
285          mgr.ensure_checkpoint(str(work_dir), "initial")
286          result = mgr.restore(str(work_dir), "deadbeef1234")
287          assert result["success"] is False
288  
289      def test_restore_no_checkpoints(self, mgr, work_dir):
290          result = mgr.restore(str(work_dir), "abc123")
291          assert result["success"] is False
292  
293      def test_restore_creates_pre_rollback_snapshot(self, mgr, work_dir):
294          (work_dir / "main.py").write_text("v1\\n")
295          mgr.ensure_checkpoint(str(work_dir), "v1")
296          mgr.new_turn()
297  
298          (work_dir / "main.py").write_text("v2\\n")
299  
300          checkpoints = mgr.list_checkpoints(str(work_dir))
301          mgr.restore(str(work_dir), checkpoints[0]["hash"])
302  
303          # Should now have 2 checkpoints: original + pre-rollback
304          all_cps = mgr.list_checkpoints(str(work_dir))
305          assert len(all_cps) >= 2
306          assert "pre-rollback" in all_cps[0]["reason"]
307  
308      def test_tilde_path_supports_diff_and_restore_flow(self, checkpoint_base, fake_home, monkeypatch):
309          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
310          mgr = CheckpointManager(enabled=True, max_snapshots=50)
311          project = fake_home / "project"
312          project.mkdir()
313          file_path = project / "main.py"
314          file_path.write_text("original\n")
315  
316          tilde_path = f"~/{project.name}"
317          assert mgr.ensure_checkpoint(tilde_path, "initial") is True
318          mgr.new_turn()
319  
320          file_path.write_text("changed\n")
321          checkpoints = mgr.list_checkpoints(str(project))
322          diff_result = mgr.diff(tilde_path, checkpoints[0]["hash"])
323          assert diff_result["success"] is True
324          assert "main.py" in diff_result["diff"]
325  
326          restore_result = mgr.restore(tilde_path, checkpoints[0]["hash"])
327          assert restore_result["success"] is True
328          assert file_path.read_text() == "original\n"
329  
330  
331  # =========================================================================
332  # CheckpointManager — working dir resolution
333  # =========================================================================
334  
335  class TestWorkingDirResolution:
336      def test_resolves_git_project_root(self, tmp_path):
337          mgr = CheckpointManager(enabled=True)
338          project = tmp_path / "myproject"
339          project.mkdir()
340          (project / ".git").mkdir()
341          subdir = project / "src"
342          subdir.mkdir()
343          filepath = subdir / "main.py"
344          filepath.write_text("x\\n")
345  
346          result = mgr.get_working_dir_for_path(str(filepath))
347          assert result == str(project)
348  
349      def test_resolves_pyproject_root(self, tmp_path):
350          mgr = CheckpointManager(enabled=True)
351          project = tmp_path / "pyproj"
352          project.mkdir()
353          (project / "pyproject.toml").write_text("[project]\\n")
354          subdir = project / "src"
355          subdir.mkdir()
356  
357          result = mgr.get_working_dir_for_path(str(subdir / "file.py"))
358          assert result == str(project)
359  
360      def test_falls_back_to_parent(self, tmp_path, monkeypatch):
361          mgr = CheckpointManager(enabled=True)
362          filepath = tmp_path / "random" / "file.py"
363          filepath.parent.mkdir(parents=True)
364          filepath.write_text("x\\n")
365  
366          # The walk-up scan for project markers (.git, pyproject.toml, etc.)
367          # stops at tmp_path — otherwise stray markers in ``/tmp`` (e.g.
368          # ``/tmp/pyproject.toml`` left by other tools on the host) get
369          # picked up as the project root and this test flakes on shared CI.
370          import pathlib as _pl
371          _real_exists = _pl.Path.exists
372  
373          def _guarded_exists(self):
374              s = str(self)
375              stop = str(tmp_path)
376              if not s.startswith(stop) and any(
377                  s.endswith("/" + m) or s == "/" + m
378                  for m in (".git", "pyproject.toml", "package.json",
379                            "Cargo.toml", "go.mod", "Makefile", "pom.xml",
380                            ".hg", "Gemfile")
381              ):
382                  return False
383              return _real_exists(self)
384  
385          monkeypatch.setattr(_pl.Path, "exists", _guarded_exists)
386  
387          result = mgr.get_working_dir_for_path(str(filepath))
388          assert result == str(filepath.parent)
389  
390      def test_resolves_tilde_path_to_project_root(self, fake_home):
391          mgr = CheckpointManager(enabled=True)
392          project = fake_home / "myproject"
393          project.mkdir()
394          (project / "pyproject.toml").write_text("[project]\n")
395          subdir = project / "src"
396          subdir.mkdir()
397          filepath = subdir / "main.py"
398          filepath.write_text("x\n")
399  
400          result = mgr.get_working_dir_for_path(f"~/{project.name}/src/main.py")
401          assert result == str(project)
402  
403  
404  # =========================================================================
405  # Git env isolation
406  # =========================================================================
407  
408  class TestGitEnvIsolation:
409      def test_sets_git_dir(self, tmp_path):
410          shadow = tmp_path / "shadow"
411          env = _git_env(shadow, str(tmp_path / "work"))
412          assert env["GIT_DIR"] == str(shadow)
413  
414      def test_sets_work_tree(self, tmp_path):
415          shadow = tmp_path / "shadow"
416          work = tmp_path / "work"
417          env = _git_env(shadow, str(work))
418          assert env["GIT_WORK_TREE"] == str(work.resolve())
419  
420      def test_clears_index_file(self, tmp_path, monkeypatch):
421          monkeypatch.setenv("GIT_INDEX_FILE", "/some/index")
422          shadow = tmp_path / "shadow"
423          env = _git_env(shadow, str(tmp_path))
424          assert "GIT_INDEX_FILE" not in env
425  
426      def test_expands_tilde_in_work_tree(self, fake_home, tmp_path):
427          shadow = tmp_path / "shadow"
428          work = fake_home / "work"
429          work.mkdir()
430  
431          env = _git_env(shadow, f"~/{work.name}")
432          assert env["GIT_WORK_TREE"] == str(work.resolve())
433  
434  
435  # =========================================================================
436  # format_checkpoint_list
437  # =========================================================================
438  
439  class TestFormatCheckpointList:
440      def test_empty_list(self):
441          result = format_checkpoint_list([], "/some/dir")
442          assert "No checkpoints" in result
443  
444      def test_formats_entries(self):
445          cps = [
446              {"hash": "abc123", "short_hash": "abc1", "timestamp": "2026-03-09T21:15:00-07:00", "reason": "before write_file"},
447              {"hash": "def456", "short_hash": "def4", "timestamp": "2026-03-09T21:10:00-07:00", "reason": "before patch"},
448          ]
449          result = format_checkpoint_list(cps, "/home/user/project")
450          assert "abc1" in result
451          assert "def4" in result
452          assert "before write_file" in result
453          assert "/rollback" in result
454  
455  
456  # =========================================================================
457  # File count guard
458  # =========================================================================
459  
460  class TestDirFileCount:
461      def test_counts_files(self, work_dir):
462          count = _dir_file_count(str(work_dir))
463          assert count >= 2  # main.py + README.md
464  
465      def test_nonexistent_dir(self, tmp_path):
466          count = _dir_file_count(str(tmp_path / "nonexistent"))
467          assert count == 0
468  
469  
470  # =========================================================================
471  # Error resilience
472  # =========================================================================
473  
474  class TestErrorResilience:
475      def test_no_git_installed(self, work_dir, checkpoint_base, monkeypatch):
476          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
477          mgr = CheckpointManager(enabled=True)
478          # Mock git not found
479          monkeypatch.setattr("shutil.which", lambda x: None)
480          mgr._git_available = None  # reset lazy probe
481          result = mgr.ensure_checkpoint(str(work_dir), "test")
482          assert result is False
483  
484      def test_run_git_allows_expected_nonzero_without_error_log(self, tmp_path, caplog):
485          work = tmp_path / "work"
486          work.mkdir()
487          completed = subprocess.CompletedProcess(
488              args=["git", "diff", "--cached", "--quiet"],
489              returncode=1,
490              stdout="",
491              stderr="",
492          )
493          with patch("tools.checkpoint_manager.subprocess.run", return_value=completed):
494              with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
495                  ok, stdout, stderr = _run_git(
496                      ["diff", "--cached", "--quiet"],
497                      tmp_path / "shadow",
498                      str(work),
499                      allowed_returncodes={1},
500                  )
501          assert ok is False
502          assert stdout == ""
503          assert stderr == ""
504          assert not caplog.records
505  
506      def test_run_git_invalid_working_dir_reports_path_error(self, tmp_path, caplog):
507          missing = tmp_path / "missing"
508          with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
509              ok, stdout, stderr = _run_git(
510                  ["status"],
511                  tmp_path / "shadow",
512                  str(missing),
513              )
514          assert ok is False
515          assert stdout == ""
516          assert "working directory not found" in stderr
517          assert not any("Git executable not found" in r.getMessage() for r in caplog.records)
518  
519      def test_run_git_missing_git_reports_git_not_found(self, tmp_path, monkeypatch, caplog):
520          work = tmp_path / "work"
521          work.mkdir()
522  
523          def raise_missing_git(*args, **kwargs):
524              raise FileNotFoundError(2, "No such file or directory", "git")
525  
526          monkeypatch.setattr("tools.checkpoint_manager.subprocess.run", raise_missing_git)
527          with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
528              ok, stdout, stderr = _run_git(
529                  ["status"],
530                  tmp_path / "shadow",
531                  str(work),
532              )
533          assert ok is False
534          assert stdout == ""
535          assert stderr == "git not found"
536          assert any("Git executable not found" in r.getMessage() for r in caplog.records)
537  
538      def test_checkpoint_failure_does_not_raise(self, mgr, work_dir, monkeypatch):
539          """Checkpoint failures should never raise — they're silently logged."""
540          def broken_run_git(*args, **kwargs):
541              raise OSError("git exploded")
542          monkeypatch.setattr("tools.checkpoint_manager._run_git", broken_run_git)
543          # Should not raise
544          result = mgr.ensure_checkpoint(str(work_dir), "test")
545          assert result is False
546  
547  
548  # =========================================================================
549  # Security / Input validation
550  # =========================================================================
551  
552  class TestSecurity:
553      def test_restore_rejects_argument_injection(self, mgr, work_dir):
554          mgr.ensure_checkpoint(str(work_dir), "initial")
555          # Try to pass a git flag as a commit hash
556          result = mgr.restore(str(work_dir), "--patch")
557          assert result["success"] is False
558          assert "Invalid commit hash" in result["error"]
559          assert "must not start with '-'" in result["error"]
560          
561          result = mgr.restore(str(work_dir), "-p")
562          assert result["success"] is False
563          assert "Invalid commit hash" in result["error"]
564          
565      def test_restore_rejects_invalid_hex_chars(self, mgr, work_dir):
566          mgr.ensure_checkpoint(str(work_dir), "initial")
567          # Git hashes should not contain characters like ;, &, |
568          result = mgr.restore(str(work_dir), "abc; rm -rf /")
569          assert result["success"] is False
570          assert "expected 4-64 hex characters" in result["error"]
571          
572          result = mgr.diff(str(work_dir), "abc&def")
573          assert result["success"] is False
574          assert "expected 4-64 hex characters" in result["error"]
575  
576      def test_restore_rejects_path_traversal(self, mgr, work_dir):
577          mgr.ensure_checkpoint(str(work_dir), "initial")
578          # Real commit hash but malicious path
579          checkpoints = mgr.list_checkpoints(str(work_dir))
580          target_hash = checkpoints[0]["hash"]
581          
582          # Absolute path outside
583          result = mgr.restore(str(work_dir), target_hash, file_path="/etc/passwd")
584          assert result["success"] is False
585          assert "got absolute path" in result["error"]
586          
587          # Relative traversal outside path
588          result = mgr.restore(str(work_dir), target_hash, file_path="../outside_file.txt")
589          assert result["success"] is False
590          assert "escapes the working directory" in result["error"]
591  
592      def test_restore_accepts_valid_file_path(self, mgr, work_dir):
593          mgr.ensure_checkpoint(str(work_dir), "initial")
594          checkpoints = mgr.list_checkpoints(str(work_dir))
595          target_hash = checkpoints[0]["hash"]
596          
597          # Valid path inside directory
598          result = mgr.restore(str(work_dir), target_hash, file_path="main.py")
599          assert result["success"] is True
600          
601          # Another valid path with subdirectories
602          (work_dir / "subdir").mkdir()
603          (work_dir / "subdir" / "test.txt").write_text("hello")
604          mgr.new_turn()
605          mgr.ensure_checkpoint(str(work_dir), "second")
606          checkpoints = mgr.list_checkpoints(str(work_dir))
607          target_hash = checkpoints[0]["hash"]
608          
609          result = mgr.restore(str(work_dir), target_hash, file_path="subdir/test.txt")
610          assert result["success"] is True
611  
612  
613  # =========================================================================
614  # GPG / global git config isolation
615  # =========================================================================
616  # Regression tests for the bug where users with ``commit.gpgsign = true``
617  # in their global git config got a pinentry popup (or a failed commit)
618  # every time the agent took a background snapshot.
619  
620  import os as _os
621  
622  
623  class TestGpgAndGlobalConfigIsolation:
624      def test_git_env_isolates_global_and_system_config(self, tmp_path):
625          """_git_env must null out GIT_CONFIG_GLOBAL / GIT_CONFIG_SYSTEM so the
626          shadow repo does not inherit user-level gpgsign, hooks, aliases, etc."""
627          env = _git_env(tmp_path / "shadow", str(tmp_path))
628          assert env["GIT_CONFIG_GLOBAL"] == _os.devnull
629          assert env["GIT_CONFIG_SYSTEM"] == _os.devnull
630          assert env["GIT_CONFIG_NOSYSTEM"] == "1"
631  
632      def test_init_sets_commit_gpgsign_false(self, work_dir, checkpoint_base, monkeypatch):
633          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
634          shadow = _shadow_repo_path(str(work_dir))
635          _init_shadow_repo(shadow, str(work_dir))
636          # Inspect the shadow's own config directly — the settings must be
637          # written into the repo, not just inherited via env vars.
638          result = subprocess.run(
639              ["git", "config", "--file", str(shadow / "config"), "--get", "commit.gpgsign"],
640              capture_output=True, text=True,
641          )
642          assert result.stdout.strip() == "false"
643  
644      def test_init_sets_tag_gpgsign_false(self, work_dir, checkpoint_base, monkeypatch):
645          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
646          shadow = _shadow_repo_path(str(work_dir))
647          _init_shadow_repo(shadow, str(work_dir))
648          result = subprocess.run(
649              ["git", "config", "--file", str(shadow / "config"), "--get", "tag.gpgSign"],
650              capture_output=True, text=True,
651          )
652          assert result.stdout.strip() == "false"
653  
654      def test_checkpoint_works_with_global_gpgsign_and_broken_gpg(
655          self, work_dir, checkpoint_base, monkeypatch, tmp_path
656      ):
657          """The real bug scenario: user has global commit.gpgsign=true but GPG
658          is broken or pinentry is unavailable.  Before the fix, every snapshot
659          either failed or spawned a pinentry window.  After the fix, snapshots
660          succeed without ever invoking GPG."""
661          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
662  
663          # Fake HOME with global gpgsign=true and a deliberately broken GPG
664          # binary.  If isolation fails, the commit will try to exec this
665          # nonexistent path and the checkpoint will fail.
666          fake_home = tmp_path / "fake_home"
667          fake_home.mkdir()
668          (fake_home / ".gitconfig").write_text(
669              "[user]\n    email = real@user.com\n    name = Real User\n"
670              "[commit]\n    gpgsign = true\n"
671              "[tag]\n    gpgSign = true\n"
672              "[gpg]\n    program = /nonexistent/fake-gpg-binary\n"
673          )
674          monkeypatch.setenv("HOME", str(fake_home))
675          monkeypatch.delenv("GPG_TTY", raising=False)
676          monkeypatch.delenv("DISPLAY", raising=False)  # block GUI pinentry
677  
678          mgr = CheckpointManager(enabled=True)
679          assert mgr.ensure_checkpoint(str(work_dir), reason="with-global-gpgsign") is True
680          assert len(mgr.list_checkpoints(str(work_dir))) == 1
681  
682      def test_checkpoint_works_on_prefix_shadow_without_local_gpgsign(
683          self, work_dir, checkpoint_base, monkeypatch, tmp_path
684      ):
685          """Users with shadow repos created before the fix will not have
686          commit.gpgsign=false in their shadow's own config.  The inline
687          ``--no-gpg-sign`` flag on the commit call must cover them."""
688          monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
689  
690          # Simulate a pre-fix shadow repo: init without commit.gpgsign=false
691          # in its own config.  _init_shadow_repo now writes it, so we must
692          # manually remove it to mimic the pre-fix state.
693          shadow = _shadow_repo_path(str(work_dir))
694          _init_shadow_repo(shadow, str(work_dir))
695          subprocess.run(
696              ["git", "config", "--file", str(shadow / "config"),
697               "--unset", "commit.gpgsign"],
698              capture_output=True, text=True, check=False,
699          )
700          subprocess.run(
701              ["git", "config", "--file", str(shadow / "config"),
702               "--unset", "tag.gpgSign"],
703              capture_output=True, text=True, check=False,
704          )
705  
706          # And simulate hostile global config
707          fake_home = tmp_path / "fake_home"
708          fake_home.mkdir()
709          (fake_home / ".gitconfig").write_text(
710              "[commit]\n    gpgsign = true\n"
711              "[gpg]\n    program = /nonexistent/fake-gpg-binary\n"
712          )
713          monkeypatch.setenv("HOME", str(fake_home))
714          monkeypatch.delenv("GPG_TTY", raising=False)
715          monkeypatch.delenv("DISPLAY", raising=False)
716  
717          mgr = CheckpointManager(enabled=True)
718          assert mgr.ensure_checkpoint(str(work_dir), reason="prefix-shadow") is True
719          assert len(mgr.list_checkpoints(str(work_dir))) == 1
720  
721  
722  # =========================================================================
723  # Auto-maintenance: prune_checkpoints + maybe_auto_prune_checkpoints
724  # =========================================================================
725  
726  class TestPruneCheckpoints:
727      """Sweep orphan/stale shadow repos under CHECKPOINT_BASE (issue #3015 follow-up)."""
728  
729      def _seed_shadow_repo(
730          self, base: Path, dir_hash: str, workdir: Path, mtime: float = None
731      ) -> Path:
732          """Create a minimal shadow repo on disk without invoking real git."""
733          import time as _time
734          shadow = base / dir_hash
735          shadow.mkdir(parents=True)
736          (shadow / "HEAD").write_text("ref: refs/heads/main\n")
737          (shadow / "HERMES_WORKDIR").write_text(str(workdir) + "\n")
738          (shadow / "info").mkdir()
739          (shadow / "info" / "exclude").write_text("node_modules/\n")
740          if mtime is not None:
741              for p in shadow.rglob("*"):
742                  import os
743                  os.utime(p, (mtime, mtime))
744              import os
745              os.utime(shadow, (mtime, mtime))
746          return shadow
747  
748      def test_deletes_orphan_when_workdir_missing(self, tmp_path):
749          from tools.checkpoint_manager import prune_checkpoints
750  
751          base = tmp_path / "checkpoints"
752          alive_work = tmp_path / "alive"
753          alive_work.mkdir()
754          alive_repo = self._seed_shadow_repo(base, "aaaa" * 4, alive_work)
755          orphan_repo = self._seed_shadow_repo(
756              base, "bbbb" * 4, tmp_path / "was-deleted"
757          )
758  
759          result = prune_checkpoints(retention_days=0, checkpoint_base=base)
760  
761          assert result["scanned"] == 2
762          assert result["deleted_orphan"] == 1
763          assert result["deleted_stale"] == 0
764          assert alive_repo.exists()
765          assert not orphan_repo.exists()
766  
767      def test_deletes_stale_by_mtime_when_workdir_alive(self, tmp_path):
768          from tools.checkpoint_manager import prune_checkpoints
769          import time as _time
770  
771          base = tmp_path / "checkpoints"
772          work = tmp_path / "work"
773          work.mkdir()
774  
775          fresh_repo = self._seed_shadow_repo(base, "cccc" * 4, work)
776          stale_work = tmp_path / "stale_work"
777          stale_work.mkdir()
778          old = _time.time() - 60 * 86400  # 60 days ago
779          stale_repo = self._seed_shadow_repo(base, "dddd" * 4, stale_work, mtime=old)
780  
781          result = prune_checkpoints(
782              retention_days=30, delete_orphans=False, checkpoint_base=base
783          )
784  
785          assert result["deleted_orphan"] == 0
786          assert result["deleted_stale"] == 1
787          assert fresh_repo.exists()
788          assert not stale_repo.exists()
789  
790      def test_orphan_takes_priority_over_stale(self, tmp_path):
791          """Orphan detection counts first — reason="orphan" even if also stale."""
792          from tools.checkpoint_manager import prune_checkpoints
793          import time as _time
794  
795          base = tmp_path / "checkpoints"
796          old = _time.time() - 60 * 86400
797          self._seed_shadow_repo(base, "eeee" * 4, tmp_path / "gone", mtime=old)
798  
799          result = prune_checkpoints(retention_days=30, checkpoint_base=base)
800          assert result["deleted_orphan"] == 1
801          assert result["deleted_stale"] == 0
802  
803      def test_delete_orphans_disabled_keeps_orphans(self, tmp_path):
804          from tools.checkpoint_manager import prune_checkpoints
805  
806          base = tmp_path / "checkpoints"
807          orphan = self._seed_shadow_repo(base, "ffff" * 4, tmp_path / "gone")
808  
809          result = prune_checkpoints(
810              retention_days=0, delete_orphans=False, checkpoint_base=base
811          )
812          assert result["deleted_orphan"] == 0
813          assert orphan.exists()
814  
815      def test_skips_non_shadow_dirs(self, tmp_path):
816          """Dirs without HEAD (non-initialised) are left alone."""
817          from tools.checkpoint_manager import prune_checkpoints
818  
819          base = tmp_path / "checkpoints"
820          base.mkdir()
821          (base / "garbage-dir").mkdir()
822          (base / "garbage-dir" / "random.txt").write_text("hi")
823  
824          result = prune_checkpoints(retention_days=0, checkpoint_base=base)
825          assert result["scanned"] == 0
826          assert (base / "garbage-dir").exists()
827  
828      def test_tracks_bytes_freed(self, tmp_path):
829          from tools.checkpoint_manager import prune_checkpoints
830  
831          base = tmp_path / "checkpoints"
832          orphan = self._seed_shadow_repo(base, "1234" * 4, tmp_path / "gone")
833          (orphan / "objects").mkdir()
834          (orphan / "objects" / "pack.bin").write_bytes(b"x" * 5000)
835  
836          result = prune_checkpoints(retention_days=0, checkpoint_base=base)
837          assert result["deleted_orphan"] == 1
838          assert result["bytes_freed"] >= 5000
839  
840      def test_base_missing_returns_empty_counts(self, tmp_path):
841          from tools.checkpoint_manager import prune_checkpoints
842  
843          result = prune_checkpoints(checkpoint_base=tmp_path / "does-not-exist")
844          assert result == {
845              "scanned": 0, "deleted_orphan": 0, "deleted_stale": 0,
846              "errors": 0, "bytes_freed": 0,
847          }
848  
849  
850  class TestMaybeAutoPruneCheckpoints:
851      def _seed(self, base, dir_hash, workdir):
852          base.mkdir(parents=True, exist_ok=True)
853          shadow = base / dir_hash
854          shadow.mkdir()
855          (shadow / "HEAD").write_text("ref: refs/heads/main\n")
856          (shadow / "HERMES_WORKDIR").write_text(str(workdir) + "\n")
857          return shadow
858  
859      def test_first_call_prunes_and_writes_marker(self, tmp_path):
860          from tools.checkpoint_manager import maybe_auto_prune_checkpoints
861  
862          base = tmp_path / "checkpoints"
863          self._seed(base, "0000" * 4, tmp_path / "gone")
864  
865          out = maybe_auto_prune_checkpoints(checkpoint_base=base)
866          assert out["skipped"] is False
867          assert out["result"]["deleted_orphan"] == 1
868          assert (base / ".last_prune").exists()
869  
870      def test_second_call_within_interval_skips(self, tmp_path):
871          from tools.checkpoint_manager import maybe_auto_prune_checkpoints
872  
873          base = tmp_path / "checkpoints"
874          self._seed(base, "1111" * 4, tmp_path / "gone")
875  
876          first = maybe_auto_prune_checkpoints(
877              checkpoint_base=base, min_interval_hours=24
878          )
879          assert first["skipped"] is False
880  
881          self._seed(base, "2222" * 4, tmp_path / "also-gone")
882          second = maybe_auto_prune_checkpoints(
883              checkpoint_base=base, min_interval_hours=24
884          )
885          assert second["skipped"] is True
886          # The second orphan must still exist — skip was honoured.
887          assert (base / ("2222" * 4)).exists()
888  
889      def test_corrupt_marker_treated_as_no_prior_run(self, tmp_path):
890          from tools.checkpoint_manager import maybe_auto_prune_checkpoints
891  
892          base = tmp_path / "checkpoints"
893          base.mkdir()
894          (base / ".last_prune").write_text("not-a-timestamp")
895          self._seed(base, "3333" * 4, tmp_path / "gone")
896  
897          out = maybe_auto_prune_checkpoints(checkpoint_base=base)
898          assert out["skipped"] is False
899          assert out["result"]["deleted_orphan"] == 1
900  
901      def test_missing_base_no_raise(self, tmp_path):
902          from tools.checkpoint_manager import maybe_auto_prune_checkpoints
903  
904          out = maybe_auto_prune_checkpoints(
905              checkpoint_base=tmp_path / "does-not-exist"
906          )
907          assert out["skipped"] is False
908          assert out["result"]["scanned"] == 0
909