/ plugins / disk-cleanup / disk_cleanup.py
disk_cleanup.py
  1  """disk_cleanup — ephemeral file cleanup for Hermes Agent.
  2  
  3  Library module wrapping the deterministic cleanup rules written by
  4  @LVT382009 in PR #12212. The plugin ``__init__.py`` wires these
  5  functions into ``post_tool_call`` and ``on_session_end`` hooks so
  6  tracking and cleanup happen automatically — the agent never needs to
  7  call a tool or remember a skill.
  8  
  9  Rules:
 10    - test files    → delete immediately at task end (age >= 0)
 11    - temp files    → delete after 7 days
 12    - cron-output   → delete after 14 days
 13    - empty dirs    → always delete (under HERMES_HOME)
 14    - research      → keep 10 newest, prompt for older (deep only)
 15    - chrome-profile→ prompt after 14 days (deep only)
 16    - >500 MB files → prompt always (deep only)
 17  
 18  Scope: strictly HERMES_HOME and /tmp/hermes-*
 19  Never touches: ~/.hermes/logs/ or any system directory.
 20  """
 21  
 22  from __future__ import annotations
 23  
 24  import json
 25  import logging
 26  import shutil
 27  from datetime import datetime, timezone
 28  from pathlib import Path
 29  from typing import Any, Dict, List, Optional, Tuple
 30  
 31  try:
 32      from hermes_constants import get_hermes_home
 33  except Exception:  # pragma: no cover — plugin may load before constants resolves
 34      import os
 35  
 36      def get_hermes_home() -> Path:  # type: ignore[no-redef]
 37          val = (os.environ.get("HERMES_HOME") or "").strip()
 38          return Path(val).resolve() if val else (Path.home() / ".hermes").resolve()
 39  
 40  
 41  logger = logging.getLogger(__name__)
 42  
 43  
 44  # ---------------------------------------------------------------------------
 45  # Paths
 46  # ---------------------------------------------------------------------------
 47  
 48  def get_state_dir() -> Path:
 49      """State dir — separate from ``$HERMES_HOME/logs/``."""
 50      return get_hermes_home() / "disk-cleanup"
 51  
 52  
 53  def get_tracked_file() -> Path:
 54      return get_state_dir() / "tracked.json"
 55  
 56  
 57  def get_log_file() -> Path:
 58      """Audit log — intentionally NOT under ``$HERMES_HOME/logs/``."""
 59      return get_state_dir() / "cleanup.log"
 60  
 61  
 62  # ---------------------------------------------------------------------------
 63  # Path safety
 64  # ---------------------------------------------------------------------------
 65  
 66  def is_safe_path(path: Path) -> bool:
 67      """Accept only paths under HERMES_HOME or ``/tmp/hermes-*``.
 68  
 69      Rejects Windows mounts (``/mnt/c`` etc.) and any system directory.
 70      """
 71      hermes_home = get_hermes_home()
 72      try:
 73          path.resolve().relative_to(hermes_home)
 74          return True
 75      except (ValueError, OSError):
 76          pass
 77      # Allow /tmp/hermes-* explicitly
 78      parts = path.parts
 79      if len(parts) >= 3 and parts[1] == "tmp" and parts[2].startswith("hermes-"):
 80          return True
 81      return False
 82  
 83  
 84  # ---------------------------------------------------------------------------
 85  # Audit log
 86  # ---------------------------------------------------------------------------
 87  
 88  def _log(message: str) -> None:
 89      try:
 90          log_file = get_log_file()
 91          log_file.parent.mkdir(parents=True, exist_ok=True)
 92          ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
 93          with open(log_file, "a") as f:
 94              f.write(f"[{ts}] {message}\n")
 95      except OSError:
 96          # Never let the audit log break the agent loop.
 97          pass
 98  
 99  
100  # ---------------------------------------------------------------------------
101  # tracked.json — atomic read/write, backup scoped to tracked.json only
102  # ---------------------------------------------------------------------------
103  
104  def load_tracked() -> List[Dict[str, Any]]:
105      """Load tracked.json.  Restores from ``.bak`` on corruption."""
106      tf = get_tracked_file()
107      tf.parent.mkdir(parents=True, exist_ok=True)
108  
109      if not tf.exists():
110          return []
111  
112      try:
113          return json.loads(tf.read_text())
114      except (json.JSONDecodeError, ValueError):
115          bak = tf.with_suffix(".json.bak")
116          if bak.exists():
117              try:
118                  data = json.loads(bak.read_text())
119                  _log("WARN: tracked.json corrupted — restored from .bak")
120                  return data
121              except Exception:
122                  pass
123          _log("WARN: tracked.json corrupted, no backup — starting fresh")
124          return []
125  
126  
127  def save_tracked(tracked: List[Dict[str, Any]]) -> None:
128      """Atomic write: ``.tmp`` → backup old → rename."""
129      tf = get_tracked_file()
130      tf.parent.mkdir(parents=True, exist_ok=True)
131      tmp = tf.with_suffix(".json.tmp")
132      tmp.write_text(json.dumps(tracked, indent=2))
133      if tf.exists():
134          shutil.copy2(tf, tf.with_suffix(".json.bak"))
135      tmp.replace(tf)
136  
137  
138  # ---------------------------------------------------------------------------
139  # Categories
140  # ---------------------------------------------------------------------------
141  
142  ALLOWED_CATEGORIES = {
143      "temp", "test", "research", "download",
144      "chrome-profile", "cron-output", "other",
145  }
146  
147  
148  def fmt_size(n: float) -> str:
149      for unit in ("B", "KB", "MB", "GB", "TB"):
150          if n < 1024:
151              return f"{n:.1f} {unit}"
152          n /= 1024
153      return f"{n:.1f} PB"
154  
155  
156  # ---------------------------------------------------------------------------
157  # Track / forget
158  # ---------------------------------------------------------------------------
159  
160  def track(path_str: str, category: str, silent: bool = False) -> bool:
161      """Register a file for tracking. Returns True if newly tracked."""
162      if category not in ALLOWED_CATEGORIES:
163          _log(f"WARN: unknown category '{category}', using 'other'")
164          category = "other"
165  
166      path = Path(path_str).resolve()
167  
168      if not path.exists():
169          _log(f"SKIP: {path} (does not exist)")
170          return False
171  
172      if not is_safe_path(path):
173          _log(f"REJECT: {path} (outside HERMES_HOME)")
174          return False
175  
176      size = path.stat().st_size if path.is_file() else 0
177      tracked = load_tracked()
178  
179      # Deduplicate
180      if any(item["path"] == str(path) for item in tracked):
181          return False
182  
183      tracked.append({
184          "path": str(path),
185          "timestamp": datetime.now(timezone.utc).isoformat(),
186          "category": category,
187          "size": size,
188      })
189      save_tracked(tracked)
190      _log(f"TRACKED: {path} ({category}, {fmt_size(size)})")
191      if not silent:
192          print(f"Tracked: {path} ({category}, {fmt_size(size)})")
193      return True
194  
195  
196  def forget(path_str: str) -> int:
197      """Remove a path from tracking without deleting the file."""
198      p = Path(path_str).resolve()
199      tracked = load_tracked()
200      before = len(tracked)
201      tracked = [i for i in tracked if Path(i["path"]).resolve() != p]
202      removed = before - len(tracked)
203      if removed:
204          save_tracked(tracked)
205          _log(f"FORGOT: {p} ({removed} entries)")
206      return removed
207  
208  
209  # ---------------------------------------------------------------------------
210  # Dry run
211  # ---------------------------------------------------------------------------
212  
213  def dry_run() -> Tuple[List[Dict], List[Dict]]:
214      """Return (auto_delete_list, needs_prompt_list) without touching files."""
215      tracked = load_tracked()
216      now = datetime.now(timezone.utc)
217  
218      auto: List[Dict] = []
219      prompt: List[Dict] = []
220  
221      for item in tracked:
222          p = Path(item["path"])
223          if not p.exists():
224              continue
225          age = (now - datetime.fromisoformat(item["timestamp"])).days
226          cat = item["category"]
227          size = item["size"]
228  
229          if cat == "test":
230              auto.append(item)
231          elif cat == "temp" and age > 7:
232              auto.append(item)
233          elif cat == "cron-output" and age > 14:
234              auto.append(item)
235          elif cat == "research" and age > 30:
236              prompt.append(item)
237          elif cat == "chrome-profile" and age > 14:
238              prompt.append(item)
239          elif size > 500 * 1024 * 1024:
240              prompt.append(item)
241  
242      return auto, prompt
243  
244  
245  # ---------------------------------------------------------------------------
246  # Quick cleanup
247  # ---------------------------------------------------------------------------
248  
249  def quick() -> Dict[str, Any]:
250      """Safe deterministic cleanup — no prompts.
251  
252      Returns: ``{"deleted": N, "empty_dirs": N, "freed": bytes,
253                 "errors": [str, ...]}``.
254      """
255      tracked = load_tracked()
256      now = datetime.now(timezone.utc)
257      deleted = 0
258      freed = 0
259      new_tracked: List[Dict] = []
260      errors: List[str] = []
261  
262      for item in tracked:
263          p = Path(item["path"])
264          cat = item["category"]
265  
266          if not p.exists():
267              _log(f"STALE: {p} (removed from tracking)")
268              continue
269  
270          age = (now - datetime.fromisoformat(item["timestamp"])).days
271  
272          should_delete = (
273              cat == "test"
274              or (cat == "temp" and age > 7)
275              or (cat == "cron-output" and age > 14)
276          )
277  
278          if should_delete:
279              try:
280                  if p.is_file():
281                      p.unlink()
282                  elif p.is_dir():
283                      shutil.rmtree(p)
284                  freed += item["size"]
285                  deleted += 1
286                  _log(f"DELETED: {p} ({cat}, {fmt_size(item['size'])})")
287              except OSError as e:
288                  _log(f"ERROR deleting {p}: {e}")
289                  errors.append(f"{p}: {e}")
290                  new_tracked.append(item)
291          else:
292              new_tracked.append(item)
293  
294      # Remove empty dirs under HERMES_HOME (but leave HERMES_HOME itself and
295      # a short list of well-known top-level state dirs alone — a fresh install
296      # has these empty, and deleting them would surprise the user).
297      hermes_home = get_hermes_home()
298      _PROTECTED_TOP_LEVEL = {
299          "logs", "memories", "sessions", "cron", "cronjobs",
300          "cache", "skills", "plugins", "disk-cleanup", "optional-skills",
301          "hermes-agent", "backups", "profiles", ".worktrees",
302      }
303      empty_removed = 0
304      try:
305          for dirpath in sorted(hermes_home.rglob("*"), reverse=True):
306              if not dirpath.is_dir() or dirpath == hermes_home:
307                  continue
308              try:
309                  rel_parts = dirpath.relative_to(hermes_home).parts
310              except ValueError:
311                  continue
312              # Skip the well-known top-level state dirs themselves.
313              if len(rel_parts) == 1 and rel_parts[0] in _PROTECTED_TOP_LEVEL:
314                  continue
315              try:
316                  if not any(dirpath.iterdir()):
317                      dirpath.rmdir()
318                      empty_removed += 1
319                      _log(f"DELETED: {dirpath} (empty dir)")
320              except OSError:
321                  pass
322      except OSError:
323          pass
324  
325      save_tracked(new_tracked)
326      _log(
327          f"QUICK_SUMMARY: {deleted} files, {empty_removed} dirs, "
328          f"{fmt_size(freed)}"
329      )
330      return {
331          "deleted": deleted,
332          "empty_dirs": empty_removed,
333          "freed": freed,
334          "errors": errors,
335      }
336  
337  
338  # ---------------------------------------------------------------------------
339  # Deep cleanup (interactive — not called from plugin hooks)
340  # ---------------------------------------------------------------------------
341  
342  def deep(
343      confirm: Optional[callable] = None,
344  ) -> Dict[str, Any]:
345      """Deep cleanup.
346  
347      Runs :func:`quick` first, then asks the *confirm* callable for each
348      risky item (research > 30d beyond 10 newest, chrome-profile > 14d,
349      any file > 500 MB).  *confirm(item)* must return True to delete.
350  
351      Returns: ``{"quick": {...}, "deep_deleted": N, "deep_freed": bytes}``.
352      """
353      quick_result = quick()
354  
355      if confirm is None:
356          # No interactive confirmer — deep stops after the quick pass.
357          return {"quick": quick_result, "deep_deleted": 0, "deep_freed": 0}
358  
359      tracked = load_tracked()
360      now = datetime.now(timezone.utc)
361      research, chrome, large = [], [], []
362  
363      for item in tracked:
364          p = Path(item["path"])
365          if not p.exists():
366              continue
367          age = (now - datetime.fromisoformat(item["timestamp"])).days
368          cat = item["category"]
369  
370          if cat == "research" and age > 30:
371              research.append(item)
372          elif cat == "chrome-profile" and age > 14:
373              chrome.append(item)
374          elif item["size"] > 500 * 1024 * 1024:
375              large.append(item)
376  
377      research.sort(key=lambda x: x["timestamp"], reverse=True)
378      old_research = research[10:]
379  
380      freed, count = 0, 0
381      to_remove: List[Dict] = []
382  
383      for group in (old_research, chrome, large):
384          for item in group:
385              if confirm(item):
386                  try:
387                      p = Path(item["path"])
388                      if p.is_file():
389                          p.unlink()
390                      elif p.is_dir():
391                          shutil.rmtree(p)
392                      to_remove.append(item)
393                      freed += item["size"]
394                      count += 1
395                      _log(
396                          f"DELETED: {p} ({item['category']}, "
397                          f"{fmt_size(item['size'])})"
398                      )
399                  except OSError as e:
400                      _log(f"ERROR deleting {item['path']}: {e}")
401  
402      if to_remove:
403          remove_paths = {i["path"] for i in to_remove}
404          save_tracked([i for i in tracked if i["path"] not in remove_paths])
405  
406      return {"quick": quick_result, "deep_deleted": count, "deep_freed": freed}
407  
408  
409  # ---------------------------------------------------------------------------
410  # Status
411  # ---------------------------------------------------------------------------
412  
413  def status() -> Dict[str, Any]:
414      """Return per-category breakdown and top 10 largest tracked files."""
415      tracked = load_tracked()
416      cats: Dict[str, Dict] = {}
417      for item in tracked:
418          c = item["category"]
419          cats.setdefault(c, {"count": 0, "size": 0})
420          cats[c]["count"] += 1
421          cats[c]["size"] += item["size"]
422  
423      existing = [
424          (i["path"], i["size"], i["category"])
425          for i in tracked if Path(i["path"]).exists()
426      ]
427      existing.sort(key=lambda x: x[1], reverse=True)
428  
429      return {
430          "categories": cats,
431          "top10": existing[:10],
432          "total_tracked": len(tracked),
433      }
434  
435  
436  def format_status(s: Dict[str, Any]) -> str:
437      """Human-readable status string (for slash command output)."""
438      lines = [f"{'Category':<20} {'Files':>6}  {'Size':>10}", "-" * 40]
439      cats = s["categories"]
440      for cat, d in sorted(cats.items(), key=lambda x: x[1]["size"], reverse=True):
441          lines.append(f"{cat:<20} {d['count']:>6}  {fmt_size(d['size']):>10}")
442  
443      if not cats:
444          lines.append("(nothing tracked yet)")
445  
446      lines.append("")
447      lines.append("Top 10 largest tracked files:")
448      if not s["top10"]:
449          lines.append("  (none)")
450      else:
451          for rank, (path, size, cat) in enumerate(s["top10"], 1):
452              lines.append(f"  {rank:>2}. {fmt_size(size):>8}  [{cat}]  {path}")
453      return "\n".join(lines)
454  
455  
456  # ---------------------------------------------------------------------------
457  # Auto-categorisation from tool-call inspection
458  # ---------------------------------------------------------------------------
459  
460  _TEST_PATTERNS = ("test_", "tmp_")
461  _TEST_SUFFIXES = (".test.py", ".test.js", ".test.ts", ".test.md")
462  
463  
464  def guess_category(path: Path) -> Optional[str]:
465      """Return a category label for *path*, or None if we shouldn't track it.
466  
467      Used by the ``post_tool_call`` hook to auto-track ephemeral files.
468      """
469      if not is_safe_path(path):
470          return None
471  
472      # Skip the state dir itself, logs, memory files, sessions, config.
473      hermes_home = get_hermes_home()
474      try:
475          rel = path.resolve().relative_to(hermes_home)
476          top = rel.parts[0] if rel.parts else ""
477          if top in {
478              "disk-cleanup", "logs", "memories", "sessions", "config.yaml",
479              "skills", "plugins", ".env", "USER.md", "MEMORY.md", "SOUL.md",
480              "auth.json", "hermes-agent",
481          }:
482              return None
483          if top == "cron" or top == "cronjobs":
484              return "cron-output"
485          if top == "cache":
486              return "temp"
487      except ValueError:
488          # Path isn't under HERMES_HOME (e.g. /tmp/hermes-*) — fall through.
489          pass
490  
491      name = path.name
492      if name.startswith(_TEST_PATTERNS):
493          return "test"
494      if any(name.endswith(sfx) for sfx in _TEST_SUFFIXES):
495          return "test"
496      return None