/ tools / checkpoint_manager.py
checkpoint_manager.py
  1  """
  2  Checkpoint Manager — Transparent filesystem snapshots via shadow git repos.
  3  
  4  Creates automatic snapshots of working directories before file-mutating
  5  operations (write_file, patch), triggered once per conversation turn.
  6  Provides rollback to any previous checkpoint.
  7  
  8  This is NOT a tool — the LLM never sees it.  It's transparent infrastructure
  9  controlled by the ``checkpoints`` config flag or ``--checkpoints`` CLI flag.
 10  
 11  Architecture:
 12      ~/.hermes/checkpoints/{sha256(abs_dir)[:16]}/   — shadow git repo
 13          HEAD, refs/, objects/                        — standard git internals
 14          HERMES_WORKDIR                               — original dir path
 15          info/exclude                                 — default excludes
 16  
 17  The shadow repo uses GIT_DIR + GIT_WORK_TREE so no git state leaks
 18  into the user's project directory.
 19  """
 20  
 21  import hashlib
 22  import logging
 23  import os
 24  import re
 25  import shutil
 26  import subprocess
 27  from pathlib import Path
 28  from hermes_constants import get_hermes_home
 29  from typing import Dict, List, Optional, Set
 30  
 31  logger = logging.getLogger(__name__)
 32  
 33  # ---------------------------------------------------------------------------
 34  # Constants
 35  # ---------------------------------------------------------------------------
 36  
 37  CHECKPOINT_BASE = get_hermes_home() / "checkpoints"
 38  
 39  DEFAULT_EXCLUDES = [
 40      "node_modules/",
 41      "dist/",
 42      "build/",
 43      ".env",
 44      ".env.*",
 45      ".env.local",
 46      ".env.*.local",
 47      "__pycache__/",
 48      "*.pyc",
 49      "*.pyo",
 50      ".DS_Store",
 51      "*.log",
 52      ".cache/",
 53      ".next/",
 54      ".nuxt/",
 55      "coverage/",
 56      ".pytest_cache/",
 57      ".venv/",
 58      "venv/",
 59      ".git/",
 60  ]
 61  
 62  # Git subprocess timeout (seconds).
 63  _GIT_TIMEOUT: int = max(10, min(60, int(os.getenv("HERMES_CHECKPOINT_TIMEOUT", "30"))))
 64  
 65  # Max files to snapshot — skip huge directories to avoid slowdowns.
 66  _MAX_FILES = 50_000
 67  
 68  # Valid git commit hash pattern: 4–40 hex chars (short or full SHA-1/SHA-256).
 69  _COMMIT_HASH_RE = re.compile(r'^[0-9a-fA-F]{4,64}$')
 70  
 71  
 72  # ---------------------------------------------------------------------------
 73  # Input validation helpers
 74  # ---------------------------------------------------------------------------
 75  
 76  def _validate_commit_hash(commit_hash: str) -> Optional[str]:
 77      """Validate a commit hash to prevent git argument injection.
 78  
 79      Returns an error string if invalid, None if valid.
 80      Values starting with '-' would be interpreted as git flags
 81      (e.g., '--patch', '-p') instead of revision specifiers.
 82      """
 83      if not commit_hash or not commit_hash.strip():
 84          return "Empty commit hash"
 85      if commit_hash.startswith("-"):
 86          return f"Invalid commit hash (must not start with '-'): {commit_hash!r}"
 87      if not _COMMIT_HASH_RE.match(commit_hash):
 88          return f"Invalid commit hash (expected 4-64 hex characters): {commit_hash!r}"
 89      return None
 90  
 91  
 92  def _validate_file_path(file_path: str, working_dir: str) -> Optional[str]:
 93      """Validate a file path to prevent path traversal outside the working directory.
 94  
 95      Returns an error string if invalid, None if valid.
 96      """
 97      if not file_path or not file_path.strip():
 98          return "Empty file path"
 99      # Reject absolute paths — restore targets must be relative to the workdir
100      if os.path.isabs(file_path):
101          return f"File path must be relative, got absolute path: {file_path!r}"
102      # Resolve and check containment within working_dir
103      abs_workdir = _normalize_path(working_dir)
104      resolved = (abs_workdir / file_path).resolve()
105      try:
106          resolved.relative_to(abs_workdir)
107      except ValueError:
108          return f"File path escapes the working directory via traversal: {file_path!r}"
109      return None
110  
111  
112  # ---------------------------------------------------------------------------
113  # Shadow repo helpers
114  # ---------------------------------------------------------------------------
115  
116  def _normalize_path(path_value: str) -> Path:
117      """Return a canonical absolute path for checkpoint operations."""
118      return Path(path_value).expanduser().resolve()
119  
120  
121  def _shadow_repo_path(working_dir: str) -> Path:
122      """Deterministic shadow repo path: sha256(abs_path)[:16]."""
123      abs_path = str(_normalize_path(working_dir))
124      dir_hash = hashlib.sha256(abs_path.encode()).hexdigest()[:16]
125      return CHECKPOINT_BASE / dir_hash
126  
127  
128  def _git_env(shadow_repo: Path, working_dir: str) -> dict:
129      """Build env dict that redirects git to the shadow repo.
130  
131      The shadow repo is internal Hermes infrastructure — it must NOT inherit
132      the user's global or system git config.  User-level settings like
133      ``commit.gpgsign = true``, signing hooks, or credential helpers would
134      either break background snapshots or, worse, spawn interactive prompts
135      (pinentry GUI windows) mid-session every time a file is written.
136  
137      Isolation strategy:
138      * ``GIT_CONFIG_GLOBAL=<os.devnull>`` — ignore ``~/.gitconfig`` (git 2.32+).
139      * ``GIT_CONFIG_SYSTEM=<os.devnull>`` — ignore ``/etc/gitconfig`` (git 2.32+).
140      * ``GIT_CONFIG_NOSYSTEM=1`` — legacy belt-and-suspenders for older git.
141  
142      The shadow repo still has its own per-repo config (user.email, user.name,
143      commit.gpgsign=false) set in ``_init_shadow_repo``.
144      """
145      normalized_working_dir = _normalize_path(working_dir)
146      env = os.environ.copy()
147      env["GIT_DIR"] = str(shadow_repo)
148      env["GIT_WORK_TREE"] = str(normalized_working_dir)
149      env.pop("GIT_INDEX_FILE", None)
150      env.pop("GIT_NAMESPACE", None)
151      env.pop("GIT_ALTERNATE_OBJECT_DIRECTORIES", None)
152      # Isolate the shadow repo from the user's global/system git config.
153      # Prevents commit.gpgsign, hooks, aliases, credential helpers, etc. from
154      # leaking into background snapshots.  Uses os.devnull for cross-platform
155      # support (``/dev/null`` on POSIX, ``nul`` on Windows).
156      env["GIT_CONFIG_GLOBAL"] = os.devnull
157      env["GIT_CONFIG_SYSTEM"] = os.devnull
158      env["GIT_CONFIG_NOSYSTEM"] = "1"
159      return env
160  
161  
162  def _run_git(
163      args: List[str],
164      shadow_repo: Path,
165      working_dir: str,
166      timeout: int = _GIT_TIMEOUT,
167      allowed_returncodes: Optional[Set[int]] = None,
168  ) -> tuple:
169      """Run a git command against the shadow repo.  Returns (ok, stdout, stderr).
170  
171      ``allowed_returncodes`` suppresses error logging for known/expected non-zero
172      exits while preserving the normal ``ok = (returncode == 0)`` contract.
173      Example: ``git diff --cached --quiet`` returns 1 when changes exist.
174      """
175      normalized_working_dir = _normalize_path(working_dir)
176      if not normalized_working_dir.exists():
177          msg = f"working directory not found: {normalized_working_dir}"
178          logger.error("Git command skipped: %s (%s)", " ".join(["git"] + list(args)), msg)
179          return False, "", msg
180      if not normalized_working_dir.is_dir():
181          msg = f"working directory is not a directory: {normalized_working_dir}"
182          logger.error("Git command skipped: %s (%s)", " ".join(["git"] + list(args)), msg)
183          return False, "", msg
184  
185      env = _git_env(shadow_repo, str(normalized_working_dir))
186      cmd = ["git"] + list(args)
187      allowed_returncodes = allowed_returncodes or set()
188      try:
189          result = subprocess.run(
190              cmd,
191              capture_output=True,
192              text=True,
193              timeout=timeout,
194              env=env,
195              cwd=str(normalized_working_dir),
196          )
197          ok = result.returncode == 0
198          stdout = result.stdout.strip()
199          stderr = result.stderr.strip()
200          if not ok and result.returncode not in allowed_returncodes:
201              logger.error(
202                  "Git command failed: %s (rc=%d) stderr=%s",
203                  " ".join(cmd), result.returncode, stderr,
204              )
205          return ok, stdout, stderr
206      except subprocess.TimeoutExpired:
207          msg = f"git timed out after {timeout}s: {' '.join(cmd)}"
208          logger.error(msg, exc_info=True)
209          return False, "", msg
210      except FileNotFoundError as exc:
211          missing_target = getattr(exc, "filename", None)
212          if missing_target == "git":
213              logger.error("Git executable not found: %s", " ".join(cmd), exc_info=True)
214              return False, "", "git not found"
215          msg = f"working directory not found: {normalized_working_dir}"
216          logger.error("Git command failed before execution: %s (%s)", " ".join(cmd), msg, exc_info=True)
217          return False, "", msg
218      except Exception as exc:
219          logger.error("Unexpected git error running %s: %s", " ".join(cmd), exc, exc_info=True)
220          return False, "", str(exc)
221  
222  
223  def _init_shadow_repo(shadow_repo: Path, working_dir: str) -> Optional[str]:
224      """Initialise shadow repo if needed.  Returns error string or None."""
225      if (shadow_repo / "HEAD").exists():
226          return None
227  
228      shadow_repo.mkdir(parents=True, exist_ok=True)
229  
230      ok, _, err = _run_git(["init"], shadow_repo, working_dir)
231      if not ok:
232          return f"Shadow repo init failed: {err}"
233  
234      _run_git(["config", "user.email", "hermes@local"], shadow_repo, working_dir)
235      _run_git(["config", "user.name", "Hermes Checkpoint"], shadow_repo, working_dir)
236      # Explicitly disable commit/tag signing in the shadow repo.  _git_env
237      # already isolates from the user's global config, but writing these into
238      # the shadow's own config is belt-and-suspenders — it guarantees the
239      # shadow repo is correct even if someone inspects or runs git against it
240      # directly (without the GIT_CONFIG_* env vars).
241      _run_git(["config", "commit.gpgsign", "false"], shadow_repo, working_dir)
242      _run_git(["config", "tag.gpgSign", "false"], shadow_repo, working_dir)
243  
244      info_dir = shadow_repo / "info"
245      info_dir.mkdir(exist_ok=True)
246      (info_dir / "exclude").write_text(
247          "\n".join(DEFAULT_EXCLUDES) + "\n", encoding="utf-8"
248      )
249  
250      (shadow_repo / "HERMES_WORKDIR").write_text(
251          str(_normalize_path(working_dir)) + "\n", encoding="utf-8"
252      )
253  
254      logger.debug("Initialised checkpoint repo at %s for %s", shadow_repo, working_dir)
255      return None
256  
257  
258  def _dir_file_count(path: str) -> int:
259      """Quick file count estimate (stops early if over _MAX_FILES)."""
260      count = 0
261      try:
262          for _ in Path(path).rglob("*"):
263              count += 1
264              if count > _MAX_FILES:
265                  return count
266      except (PermissionError, OSError):
267          pass
268      return count
269  
270  
271  # ---------------------------------------------------------------------------
272  # CheckpointManager
273  # ---------------------------------------------------------------------------
274  
275  class CheckpointManager:
276      """Manages automatic filesystem checkpoints.
277  
278      Designed to be owned by AIAgent.  Call ``new_turn()`` at the start of
279      each conversation turn and ``ensure_checkpoint(dir, reason)`` before
280      any file-mutating tool call.  The manager deduplicates so at most one
281      snapshot is taken per directory per turn.
282  
283      Parameters
284      ----------
285      enabled : bool
286          Master switch (from config / CLI flag).
287      max_snapshots : int
288          Keep at most this many checkpoints per directory.
289      """
290  
291      def __init__(self, enabled: bool = False, max_snapshots: int = 50):
292          self.enabled = enabled
293          self.max_snapshots = max_snapshots
294          self._checkpointed_dirs: Set[str] = set()
295          self._git_available: Optional[bool] = None  # lazy probe
296  
297      # ------------------------------------------------------------------
298      # Turn lifecycle
299      # ------------------------------------------------------------------
300  
301      def new_turn(self) -> None:
302          """Reset per-turn dedup.  Call at the start of each agent iteration."""
303          self._checkpointed_dirs.clear()
304  
305      # ------------------------------------------------------------------
306      # Public API
307      # ------------------------------------------------------------------
308  
309      def ensure_checkpoint(self, working_dir: str, reason: str = "auto") -> bool:
310          """Take a checkpoint if enabled and not already done this turn.
311  
312          Returns True if a checkpoint was taken, False otherwise.
313          Never raises — all errors are silently logged.
314          """
315          if not self.enabled:
316              return False
317  
318          # Lazy git probe
319          if self._git_available is None:
320              self._git_available = shutil.which("git") is not None
321              if not self._git_available:
322                  logger.debug("Checkpoints disabled: git not found")
323          if not self._git_available:
324              return False
325  
326          abs_dir = str(_normalize_path(working_dir))
327  
328          # Skip root, home, and other overly broad directories
329          if abs_dir in ("/", str(Path.home())):
330              logger.debug("Checkpoint skipped: directory too broad (%s)", abs_dir)
331              return False
332  
333          # Already checkpointed this turn?
334          if abs_dir in self._checkpointed_dirs:
335              return False
336  
337          self._checkpointed_dirs.add(abs_dir)
338  
339          try:
340              return self._take(abs_dir, reason)
341          except Exception as e:
342              logger.debug("Checkpoint failed (non-fatal): %s", e)
343              return False
344  
345      def list_checkpoints(self, working_dir: str) -> List[Dict]:
346          """List available checkpoints for a directory.
347  
348          Returns a list of dicts with keys: hash, short_hash, timestamp, reason,
349          files_changed, insertions, deletions.  Most recent first.
350          """
351          abs_dir = str(_normalize_path(working_dir))
352          shadow = _shadow_repo_path(abs_dir)
353  
354          if not (shadow / "HEAD").exists():
355              return []
356  
357          ok, stdout, _ = _run_git(
358              ["log", "--format=%H|%h|%aI|%s", "-n", str(self.max_snapshots)],
359              shadow, abs_dir,
360          )
361  
362          if not ok or not stdout:
363              return []
364  
365          results = []
366          for line in stdout.splitlines():
367              parts = line.split("|", 3)
368              if len(parts) == 4:
369                  entry = {
370                      "hash": parts[0],
371                      "short_hash": parts[1],
372                      "timestamp": parts[2],
373                      "reason": parts[3],
374                      "files_changed": 0,
375                      "insertions": 0,
376                      "deletions": 0,
377                  }
378                  # Get diffstat for this commit
379                  stat_ok, stat_out, _ = _run_git(
380                      ["diff", "--shortstat", f"{parts[0]}~1", parts[0]],
381                      shadow, abs_dir,
382                      allowed_returncodes={128, 129},  # first commit has no parent
383                  )
384                  if stat_ok and stat_out:
385                      self._parse_shortstat(stat_out, entry)
386                  results.append(entry)
387          return results
388  
389      @staticmethod
390      def _parse_shortstat(stat_line: str, entry: Dict) -> None:
391          """Parse git --shortstat output into entry dict."""
392          m = re.search(r'(\d+) file', stat_line)
393          if m:
394              entry["files_changed"] = int(m.group(1))
395          m = re.search(r'(\d+) insertion', stat_line)
396          if m:
397              entry["insertions"] = int(m.group(1))
398          m = re.search(r'(\d+) deletion', stat_line)
399          if m:
400              entry["deletions"] = int(m.group(1))
401  
402      def diff(self, working_dir: str, commit_hash: str) -> Dict:
403          """Show diff between a checkpoint and the current working tree.
404  
405          Returns dict with success, diff text, and stat summary.
406          """
407          # Validate commit_hash to prevent git argument injection
408          hash_err = _validate_commit_hash(commit_hash)
409          if hash_err:
410              return {"success": False, "error": hash_err}
411  
412          abs_dir = str(_normalize_path(working_dir))
413          shadow = _shadow_repo_path(abs_dir)
414  
415          if not (shadow / "HEAD").exists():
416              return {"success": False, "error": "No checkpoints exist for this directory"}
417  
418          # Verify the commit exists
419          ok, _, err = _run_git(
420              ["cat-file", "-t", commit_hash], shadow, abs_dir,
421          )
422          if not ok:
423              return {"success": False, "error": f"Checkpoint '{commit_hash}' not found"}
424  
425          # Stage current state to compare against checkpoint
426          _run_git(["add", "-A"], shadow, abs_dir, timeout=_GIT_TIMEOUT * 2)
427  
428          # Get stat summary: checkpoint vs current working tree
429          ok_stat, stat_out, _ = _run_git(
430              ["diff", "--stat", commit_hash, "--cached"],
431              shadow, abs_dir,
432          )
433  
434          # Get actual diff (limited to avoid terminal flood)
435          ok_diff, diff_out, _ = _run_git(
436              ["diff", commit_hash, "--cached", "--no-color"],
437              shadow, abs_dir,
438          )
439  
440          # Unstage to avoid polluting the shadow repo index
441          _run_git(["reset", "HEAD", "--quiet"], shadow, abs_dir)
442  
443          if not ok_stat and not ok_diff:
444              return {"success": False, "error": "Could not generate diff"}
445  
446          return {
447              "success": True,
448              "stat": stat_out if ok_stat else "",
449              "diff": diff_out if ok_diff else "",
450          }
451  
452      def restore(self, working_dir: str, commit_hash: str, file_path: str = None) -> Dict:
453          """Restore files to a checkpoint state.
454  
455          Uses ``git checkout <hash> -- .`` (or a specific file) which restores
456          tracked files without moving HEAD — safe and reversible.
457  
458          Parameters
459          ----------
460          file_path : str, optional
461              If provided, restore only this file instead of the entire directory.
462  
463          Returns dict with success/error info.
464          """
465          # Validate commit_hash to prevent git argument injection
466          hash_err = _validate_commit_hash(commit_hash)
467          if hash_err:
468              return {"success": False, "error": hash_err}
469  
470          abs_dir = str(_normalize_path(working_dir))
471  
472          # Validate file_path to prevent path traversal outside the working dir
473          if file_path:
474              path_err = _validate_file_path(file_path, abs_dir)
475              if path_err:
476                  return {"success": False, "error": path_err}
477  
478          shadow = _shadow_repo_path(abs_dir)
479  
480          if not (shadow / "HEAD").exists():
481              return {"success": False, "error": "No checkpoints exist for this directory"}
482  
483          # Verify the commit exists
484          ok, _, err = _run_git(
485              ["cat-file", "-t", commit_hash], shadow, abs_dir,
486          )
487          if not ok:
488              return {"success": False, "error": f"Checkpoint '{commit_hash}' not found", "debug": err or None}
489  
490          # Take a checkpoint of current state before restoring (so you can undo the undo)
491          self._take(abs_dir, f"pre-rollback snapshot (restoring to {commit_hash[:8]})")
492  
493          # Restore — full directory or single file
494          restore_target = file_path if file_path else "."
495          ok, stdout, err = _run_git(
496              ["checkout", commit_hash, "--", restore_target],
497              shadow, abs_dir, timeout=_GIT_TIMEOUT * 2,
498          )
499  
500          if not ok:
501              return {"success": False, "error": f"Restore failed: {err}", "debug": err or None}
502  
503          # Get info about what was restored
504          ok2, reason_out, _ = _run_git(
505              ["log", "--format=%s", "-1", commit_hash], shadow, abs_dir,
506          )
507          reason = reason_out if ok2 else "unknown"
508  
509          result = {
510              "success": True,
511              "restored_to": commit_hash[:8],
512              "reason": reason,
513              "directory": abs_dir,
514          }
515          if file_path:
516              result["file"] = file_path
517          return result
518  
519      def get_working_dir_for_path(self, file_path: str) -> str:
520          """Resolve a file path to its working directory for checkpointing.
521  
522          Walks up from the file's parent to find a reasonable project root
523          (directory containing .git, pyproject.toml, package.json, etc.).
524          Falls back to the file's parent directory.
525          """
526          path = _normalize_path(file_path)
527          if path.is_dir():
528              candidate = path
529          else:
530              candidate = path.parent
531  
532          # Walk up looking for project root markers
533          markers = {".git", "pyproject.toml", "package.json", "Cargo.toml",
534                      "go.mod", "Makefile", "pom.xml", ".hg", "Gemfile"}
535          check = candidate
536          while check != check.parent:
537              if any((check / m).exists() for m in markers):
538                  return str(check)
539              check = check.parent
540  
541          # No project root found — use the file's parent
542          return str(candidate)
543  
544      # ------------------------------------------------------------------
545      # Internal
546      # ------------------------------------------------------------------
547  
548      def _take(self, working_dir: str, reason: str) -> bool:
549          """Take a snapshot.  Returns True on success."""
550          shadow = _shadow_repo_path(working_dir)
551  
552          # Init if needed
553          err = _init_shadow_repo(shadow, working_dir)
554          if err:
555              logger.debug("Checkpoint init failed: %s", err)
556              return False
557  
558          # Quick size guard — don't try to snapshot enormous directories
559          if _dir_file_count(working_dir) > _MAX_FILES:
560              logger.debug("Checkpoint skipped: >%d files in %s", _MAX_FILES, working_dir)
561              return False
562  
563          # Stage everything
564          ok, _, err = _run_git(
565              ["add", "-A"], shadow, working_dir, timeout=_GIT_TIMEOUT * 2,
566          )
567          if not ok:
568              logger.debug("Checkpoint git-add failed: %s", err)
569              return False
570  
571          # Check if there's anything to commit
572          ok_diff, diff_out, _ = _run_git(
573              ["diff", "--cached", "--quiet"],
574              shadow,
575              working_dir,
576              allowed_returncodes={1},
577          )
578          if ok_diff:
579              # No changes to commit
580              logger.debug("Checkpoint skipped: no changes in %s", working_dir)
581              return False
582  
583          # Commit.  ``--no-gpg-sign`` inline covers shadow repos created before
584          # the commit.gpgsign=false config was added to _init_shadow_repo — so
585          # users with existing checkpoints never hit a GPG pinentry popup.
586          ok, _, err = _run_git(
587              ["commit", "-m", reason, "--allow-empty-message", "--no-gpg-sign"],
588              shadow, working_dir, timeout=_GIT_TIMEOUT * 2,
589          )
590          if not ok:
591              logger.debug("Checkpoint commit failed: %s", err)
592              return False
593  
594          logger.debug("Checkpoint taken in %s: %s", working_dir, reason)
595  
596          # Prune old snapshots
597          self._prune(shadow, working_dir)
598  
599          return True
600  
601      def _prune(self, shadow_repo: Path, working_dir: str) -> None:
602          """Keep only the last max_snapshots commits via orphan reset."""
603          ok, stdout, _ = _run_git(
604              ["rev-list", "--count", "HEAD"], shadow_repo, working_dir,
605          )
606          if not ok:
607              return
608  
609          try:
610              count = int(stdout)
611          except ValueError:
612              return
613  
614          if count <= self.max_snapshots:
615              return
616  
617          # For simplicity, we don't actually prune — git's pack mechanism
618          # handles this efficiently, and the objects are small.  The log
619          # listing is already limited by max_snapshots.
620          # Full pruning would require rebase --onto or filter-branch which
621          # is fragile for a background feature.  We just limit the log view.
622          logger.debug("Checkpoint repo has %d commits (limit %d)", count, self.max_snapshots)
623  
624  
625  def format_checkpoint_list(checkpoints: List[Dict], directory: str) -> str:
626      """Format checkpoint list for display to user."""
627      if not checkpoints:
628          return f"No checkpoints found for {directory}"
629  
630      lines = [f"📸 Checkpoints for {directory}:\n"]
631      for i, cp in enumerate(checkpoints, 1):
632          # Parse ISO timestamp to something readable
633          ts = cp["timestamp"]
634          if "T" in ts:
635              ts = ts.split("T")[1].split("+")[0].split("-")[0][:5]  # HH:MM
636              date = cp["timestamp"].split("T")[0]
637              ts = f"{date} {ts}"
638  
639          # Build change summary
640          files = cp.get("files_changed", 0)
641          ins = cp.get("insertions", 0)
642          dele = cp.get("deletions", 0)
643          if files:
644              stat = f"  ({files} file{'s' if files != 1 else ''}, +{ins}/-{dele})"
645          else:
646              stat = ""
647  
648          lines.append(f"  {i}. {cp['short_hash']}  {ts}  {cp['reason']}{stat}")
649  
650      lines.append("\n  /rollback <N>             restore to checkpoint N")
651      lines.append("  /rollback diff <N>        preview changes since checkpoint N")
652      lines.append("  /rollback <N> <file>      restore a single file from checkpoint N")
653      return "\n".join(lines)
654  
655  
656  # ---------------------------------------------------------------------------
657  # Auto-maintenance (issue #3015 follow-up)
658  # ---------------------------------------------------------------------------
659  #
660  # Every working directory the agent has ever touched gets its own shadow
661  # repo under CHECKPOINT_BASE.  Per-repo ``_prune`` is a no-op (see comment
662  # in CheckpointManager._prune), so abandoned repos (deleted projects,
663  # one-off tmp dirs, long-stale work trees) accumulate forever.  Field
664  # reports put the typical offender at 1000+ repos / ~12 GB on active
665  # contributor machines.
666  #
667  # ``prune_checkpoints`` sweeps CHECKPOINT_BASE at startup, deleting shadow
668  # repos that match either criterion:
669  #   * orphan:  the ``HERMES_WORKDIR`` path no longer exists on disk
670  #   * stale:   the repo's newest mtime is older than ``retention_days``
671  #
672  # ``maybe_auto_prune_checkpoints`` wraps it with an idempotency marker
673  # (``CHECKPOINT_BASE/.last_prune``) so calling it on every CLI/gateway
674  # startup is free after the first run of the day.  Opt-in via
675  # ``checkpoints.auto_prune`` in config.yaml — default off so users who
676  # rely on ``/rollback`` against long-ago sessions never lose data
677  # silently.
678  
679  _PRUNE_MARKER_NAME = ".last_prune"
680  
681  
682  def _read_workdir_marker(shadow_repo: Path) -> Optional[str]:
683      """Read ``HERMES_WORKDIR`` from a shadow repo, or None if missing/unreadable."""
684      try:
685          return (shadow_repo / "HERMES_WORKDIR").read_text(encoding="utf-8").strip()
686      except (OSError, UnicodeDecodeError):
687          return None
688  
689  
690  def _shadow_repo_newest_mtime(shadow_repo: Path) -> float:
691      """Return newest mtime across the shadow repo (walks objects/refs/HEAD).
692  
693      We walk instead of trusting the directory mtime because git's pack
694      operations can leave the top-level dir untouched while refs/objects
695      inside get updated.  Best-effort — returns 0.0 on any error.
696      """
697      newest = 0.0
698      try:
699          for p in shadow_repo.rglob("*"):
700              try:
701                  m = p.stat().st_mtime
702                  if m > newest:
703                      newest = m
704              except OSError:
705                  continue
706      except OSError:
707          pass
708      return newest
709  
710  
711  def prune_checkpoints(
712      retention_days: int = 7,
713      delete_orphans: bool = True,
714      checkpoint_base: Optional[Path] = None,
715  ) -> Dict[str, int]:
716      """Delete stale/orphan shadow repos under ``checkpoint_base``.
717  
718      A shadow repo is deleted when either:
719  
720      * ``delete_orphans=True`` and its ``HERMES_WORKDIR`` path no longer
721        exists on disk (the original project was deleted / moved); OR
722      * its newest in-repo mtime is older than ``retention_days`` days.
723  
724      Returns a dict with counts ``{"scanned", "deleted_orphan",
725      "deleted_stale", "errors", "bytes_freed"}``.
726  
727      Never raises — maintenance must never block interactive startup.
728      """
729      base = checkpoint_base or CHECKPOINT_BASE
730      result = {
731          "scanned": 0,
732          "deleted_orphan": 0,
733          "deleted_stale": 0,
734          "errors": 0,
735          "bytes_freed": 0,
736      }
737      if not base.exists():
738          return result
739  
740      cutoff = 0.0
741      if retention_days > 0:
742          import time as _time
743          cutoff = _time.time() - retention_days * 86400
744  
745      for child in base.iterdir():
746          if not child.is_dir():
747              continue
748          # Protect the marker file and anything that isn't a real shadow
749          # repo (no HEAD = not initialised, leave alone).
750          if not (child / "HEAD").exists():
751              continue
752          result["scanned"] += 1
753  
754          reason: Optional[str] = None
755          if delete_orphans:
756              workdir = _read_workdir_marker(child)
757              if workdir is None or not Path(workdir).exists():
758                  reason = "orphan"
759  
760          if reason is None and retention_days > 0:
761              newest = _shadow_repo_newest_mtime(child)
762              if newest > 0 and newest < cutoff:
763                  reason = "stale"
764  
765          if reason is None:
766              continue
767  
768          # Measure size before delete (best-effort)
769          try:
770              size = sum(p.stat().st_size for p in child.rglob("*") if p.is_file())
771          except OSError:
772              size = 0
773          try:
774              shutil.rmtree(child)
775              result["bytes_freed"] += size
776              if reason == "orphan":
777                  result["deleted_orphan"] += 1
778              else:
779                  result["deleted_stale"] += 1
780              logger.debug("Pruned %s checkpoint repo: %s (%d bytes)", reason, child.name, size)
781          except OSError as exc:
782              result["errors"] += 1
783              logger.warning("Failed to prune checkpoint repo %s: %s", child.name, exc)
784  
785      return result
786  
787  
788  def maybe_auto_prune_checkpoints(
789      retention_days: int = 7,
790      min_interval_hours: int = 24,
791      delete_orphans: bool = True,
792      checkpoint_base: Optional[Path] = None,
793  ) -> Dict[str, object]:
794      """Idempotent wrapper around ``prune_checkpoints`` for startup hooks.
795  
796      Writes ``CHECKPOINT_BASE/.last_prune`` on completion so subsequent
797      calls within ``min_interval_hours`` short-circuit.  Designed to be
798      called once per CLI/gateway process startup; the marker keeps costs
799      bounded regardless of how many times hermes is invoked per day.
800  
801      Returns ``{"skipped": bool, "result": prune_checkpoints-dict,
802      "error": optional str}``.
803      """
804      import time as _time
805      base = checkpoint_base or CHECKPOINT_BASE
806      out: Dict[str, object] = {"skipped": False}
807  
808      try:
809          if not base.exists():
810              out["result"] = {
811                  "scanned": 0, "deleted_orphan": 0, "deleted_stale": 0,
812                  "errors": 0, "bytes_freed": 0,
813              }
814              return out
815  
816          marker = base / _PRUNE_MARKER_NAME
817          now = _time.time()
818          if marker.exists():
819              try:
820                  last_ts = float(marker.read_text(encoding="utf-8").strip())
821                  if now - last_ts < min_interval_hours * 3600:
822                      out["skipped"] = True
823                      return out
824              except (OSError, ValueError):
825                  pass  # corrupt marker — treat as no prior run
826  
827          result = prune_checkpoints(
828              retention_days=retention_days,
829              delete_orphans=delete_orphans,
830              checkpoint_base=base,
831          )
832          out["result"] = result
833  
834          try:
835              marker.write_text(str(now), encoding="utf-8")
836          except OSError as exc:
837              logger.debug("Could not write checkpoint prune marker: %s", exc)
838  
839          total = result["deleted_orphan"] + result["deleted_stale"]
840          if total > 0:
841              logger.info(
842                  "checkpoint auto-maintenance: pruned %d repo(s) "
843                  "(%d orphan, %d stale), reclaimed %.1f MB",
844                  total,
845                  result["deleted_orphan"],
846                  result["deleted_stale"],
847                  result["bytes_freed"] / (1024 * 1024),
848              )
849      except Exception as exc:
850          logger.warning("checkpoint auto-maintenance failed: %s", exc)
851          out["error"] = str(exc)
852  
853      return out
854