disk_cleanup.py
1 """disk_cleanup — ephemeral file cleanup for Hermes Agent. 2 3 Library module wrapping the deterministic cleanup rules written by 4 @LVT382009 in PR #12212. The plugin ``__init__.py`` wires these 5 functions into ``post_tool_call`` and ``on_session_end`` hooks so 6 tracking and cleanup happen automatically — the agent never needs to 7 call a tool or remember a skill. 8 9 Rules: 10 - test files → delete immediately at task end (age >= 0) 11 - temp files → delete after 7 days 12 - cron-output → delete after 14 days 13 - empty dirs → always delete (under HERMES_HOME) 14 - research → keep 10 newest, prompt for older (deep only) 15 - chrome-profile→ prompt after 14 days (deep only) 16 - >500 MB files → prompt always (deep only) 17 18 Scope: strictly HERMES_HOME and /tmp/hermes-* 19 Never touches: ~/.hermes/logs/ or any system directory. 20 """ 21 22 from __future__ import annotations 23 24 import json 25 import logging 26 import shutil 27 from datetime import datetime, timezone 28 from pathlib import Path 29 from typing import Any, Dict, List, Optional, Tuple 30 31 try: 32 from hermes_constants import get_hermes_home 33 except Exception: # pragma: no cover — plugin may load before constants resolves 34 import os 35 36 def get_hermes_home() -> Path: # type: ignore[no-redef] 37 val = (os.environ.get("HERMES_HOME") or "").strip() 38 return Path(val).resolve() if val else (Path.home() / ".hermes").resolve() 39 40 41 logger = logging.getLogger(__name__) 42 43 44 # --------------------------------------------------------------------------- 45 # Paths 46 # --------------------------------------------------------------------------- 47 48 def get_state_dir() -> Path: 49 """State dir — separate from ``$HERMES_HOME/logs/``.""" 50 return get_hermes_home() / "disk-cleanup" 51 52 53 def get_tracked_file() -> Path: 54 return get_state_dir() / "tracked.json" 55 56 57 def get_log_file() -> Path: 58 """Audit log — intentionally NOT under ``$HERMES_HOME/logs/``.""" 59 return get_state_dir() / "cleanup.log" 60 61 62 # --------------------------------------------------------------------------- 63 # Path safety 64 # --------------------------------------------------------------------------- 65 66 def is_safe_path(path: Path) -> bool: 67 """Accept only paths under HERMES_HOME or ``/tmp/hermes-*``. 68 69 Rejects Windows mounts (``/mnt/c`` etc.) and any system directory. 70 """ 71 hermes_home = get_hermes_home() 72 try: 73 path.resolve().relative_to(hermes_home) 74 return True 75 except (ValueError, OSError): 76 pass 77 # Allow /tmp/hermes-* explicitly 78 parts = path.parts 79 if len(parts) >= 3 and parts[1] == "tmp" and parts[2].startswith("hermes-"): 80 return True 81 return False 82 83 84 # --------------------------------------------------------------------------- 85 # Audit log 86 # --------------------------------------------------------------------------- 87 88 def _log(message: str) -> None: 89 try: 90 log_file = get_log_file() 91 log_file.parent.mkdir(parents=True, exist_ok=True) 92 ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") 93 with open(log_file, "a") as f: 94 f.write(f"[{ts}] {message}\n") 95 except OSError: 96 # Never let the audit log break the agent loop. 97 pass 98 99 100 # --------------------------------------------------------------------------- 101 # tracked.json — atomic read/write, backup scoped to tracked.json only 102 # --------------------------------------------------------------------------- 103 104 def load_tracked() -> List[Dict[str, Any]]: 105 """Load tracked.json. Restores from ``.bak`` on corruption.""" 106 tf = get_tracked_file() 107 tf.parent.mkdir(parents=True, exist_ok=True) 108 109 if not tf.exists(): 110 return [] 111 112 try: 113 return json.loads(tf.read_text()) 114 except (json.JSONDecodeError, ValueError): 115 bak = tf.with_suffix(".json.bak") 116 if bak.exists(): 117 try: 118 data = json.loads(bak.read_text()) 119 _log("WARN: tracked.json corrupted — restored from .bak") 120 return data 121 except Exception: 122 pass 123 _log("WARN: tracked.json corrupted, no backup — starting fresh") 124 return [] 125 126 127 def save_tracked(tracked: List[Dict[str, Any]]) -> None: 128 """Atomic write: ``.tmp`` → backup old → rename.""" 129 tf = get_tracked_file() 130 tf.parent.mkdir(parents=True, exist_ok=True) 131 tmp = tf.with_suffix(".json.tmp") 132 tmp.write_text(json.dumps(tracked, indent=2)) 133 if tf.exists(): 134 shutil.copy2(tf, tf.with_suffix(".json.bak")) 135 tmp.replace(tf) 136 137 138 # --------------------------------------------------------------------------- 139 # Categories 140 # --------------------------------------------------------------------------- 141 142 ALLOWED_CATEGORIES = { 143 "temp", "test", "research", "download", 144 "chrome-profile", "cron-output", "other", 145 } 146 147 148 def fmt_size(n: float) -> str: 149 for unit in ("B", "KB", "MB", "GB", "TB"): 150 if n < 1024: 151 return f"{n:.1f} {unit}" 152 n /= 1024 153 return f"{n:.1f} PB" 154 155 156 # --------------------------------------------------------------------------- 157 # Track / forget 158 # --------------------------------------------------------------------------- 159 160 def track(path_str: str, category: str, silent: bool = False) -> bool: 161 """Register a file for tracking. Returns True if newly tracked.""" 162 if category not in ALLOWED_CATEGORIES: 163 _log(f"WARN: unknown category '{category}', using 'other'") 164 category = "other" 165 166 path = Path(path_str).resolve() 167 168 if not path.exists(): 169 _log(f"SKIP: {path} (does not exist)") 170 return False 171 172 if not is_safe_path(path): 173 _log(f"REJECT: {path} (outside HERMES_HOME)") 174 return False 175 176 size = path.stat().st_size if path.is_file() else 0 177 tracked = load_tracked() 178 179 # Deduplicate 180 if any(item["path"] == str(path) for item in tracked): 181 return False 182 183 tracked.append({ 184 "path": str(path), 185 "timestamp": datetime.now(timezone.utc).isoformat(), 186 "category": category, 187 "size": size, 188 }) 189 save_tracked(tracked) 190 _log(f"TRACKED: {path} ({category}, {fmt_size(size)})") 191 if not silent: 192 print(f"Tracked: {path} ({category}, {fmt_size(size)})") 193 return True 194 195 196 def forget(path_str: str) -> int: 197 """Remove a path from tracking without deleting the file.""" 198 p = Path(path_str).resolve() 199 tracked = load_tracked() 200 before = len(tracked) 201 tracked = [i for i in tracked if Path(i["path"]).resolve() != p] 202 removed = before - len(tracked) 203 if removed: 204 save_tracked(tracked) 205 _log(f"FORGOT: {p} ({removed} entries)") 206 return removed 207 208 209 # --------------------------------------------------------------------------- 210 # Dry run 211 # --------------------------------------------------------------------------- 212 213 def dry_run() -> Tuple[List[Dict], List[Dict]]: 214 """Return (auto_delete_list, needs_prompt_list) without touching files.""" 215 tracked = load_tracked() 216 now = datetime.now(timezone.utc) 217 218 auto: List[Dict] = [] 219 prompt: List[Dict] = [] 220 221 for item in tracked: 222 p = Path(item["path"]) 223 if not p.exists(): 224 continue 225 age = (now - datetime.fromisoformat(item["timestamp"])).days 226 cat = item["category"] 227 size = item["size"] 228 229 if cat == "test": 230 auto.append(item) 231 elif cat == "temp" and age > 7: 232 auto.append(item) 233 elif cat == "cron-output" and age > 14: 234 auto.append(item) 235 elif cat == "research" and age > 30: 236 prompt.append(item) 237 elif cat == "chrome-profile" and age > 14: 238 prompt.append(item) 239 elif size > 500 * 1024 * 1024: 240 prompt.append(item) 241 242 return auto, prompt 243 244 245 # --------------------------------------------------------------------------- 246 # Quick cleanup 247 # --------------------------------------------------------------------------- 248 249 def quick() -> Dict[str, Any]: 250 """Safe deterministic cleanup — no prompts. 251 252 Returns: ``{"deleted": N, "empty_dirs": N, "freed": bytes, 253 "errors": [str, ...]}``. 254 """ 255 tracked = load_tracked() 256 now = datetime.now(timezone.utc) 257 deleted = 0 258 freed = 0 259 new_tracked: List[Dict] = [] 260 errors: List[str] = [] 261 262 for item in tracked: 263 p = Path(item["path"]) 264 cat = item["category"] 265 266 if not p.exists(): 267 _log(f"STALE: {p} (removed from tracking)") 268 continue 269 270 age = (now - datetime.fromisoformat(item["timestamp"])).days 271 272 should_delete = ( 273 cat == "test" 274 or (cat == "temp" and age > 7) 275 or (cat == "cron-output" and age > 14) 276 ) 277 278 if should_delete: 279 try: 280 if p.is_file(): 281 p.unlink() 282 elif p.is_dir(): 283 shutil.rmtree(p) 284 freed += item["size"] 285 deleted += 1 286 _log(f"DELETED: {p} ({cat}, {fmt_size(item['size'])})") 287 except OSError as e: 288 _log(f"ERROR deleting {p}: {e}") 289 errors.append(f"{p}: {e}") 290 new_tracked.append(item) 291 else: 292 new_tracked.append(item) 293 294 # Remove empty dirs under HERMES_HOME (but leave HERMES_HOME itself and 295 # a short list of well-known top-level state dirs alone — a fresh install 296 # has these empty, and deleting them would surprise the user). 297 hermes_home = get_hermes_home() 298 _PROTECTED_TOP_LEVEL = { 299 "logs", "memories", "sessions", "cron", "cronjobs", 300 "cache", "skills", "plugins", "disk-cleanup", "optional-skills", 301 "hermes-agent", "backups", "profiles", ".worktrees", 302 } 303 empty_removed = 0 304 try: 305 for dirpath in sorted(hermes_home.rglob("*"), reverse=True): 306 if not dirpath.is_dir() or dirpath == hermes_home: 307 continue 308 try: 309 rel_parts = dirpath.relative_to(hermes_home).parts 310 except ValueError: 311 continue 312 # Skip the well-known top-level state dirs themselves. 313 if len(rel_parts) == 1 and rel_parts[0] in _PROTECTED_TOP_LEVEL: 314 continue 315 try: 316 if not any(dirpath.iterdir()): 317 dirpath.rmdir() 318 empty_removed += 1 319 _log(f"DELETED: {dirpath} (empty dir)") 320 except OSError: 321 pass 322 except OSError: 323 pass 324 325 save_tracked(new_tracked) 326 _log( 327 f"QUICK_SUMMARY: {deleted} files, {empty_removed} dirs, " 328 f"{fmt_size(freed)}" 329 ) 330 return { 331 "deleted": deleted, 332 "empty_dirs": empty_removed, 333 "freed": freed, 334 "errors": errors, 335 } 336 337 338 # --------------------------------------------------------------------------- 339 # Deep cleanup (interactive — not called from plugin hooks) 340 # --------------------------------------------------------------------------- 341 342 def deep( 343 confirm: Optional[callable] = None, 344 ) -> Dict[str, Any]: 345 """Deep cleanup. 346 347 Runs :func:`quick` first, then asks the *confirm* callable for each 348 risky item (research > 30d beyond 10 newest, chrome-profile > 14d, 349 any file > 500 MB). *confirm(item)* must return True to delete. 350 351 Returns: ``{"quick": {...}, "deep_deleted": N, "deep_freed": bytes}``. 352 """ 353 quick_result = quick() 354 355 if confirm is None: 356 # No interactive confirmer — deep stops after the quick pass. 357 return {"quick": quick_result, "deep_deleted": 0, "deep_freed": 0} 358 359 tracked = load_tracked() 360 now = datetime.now(timezone.utc) 361 research, chrome, large = [], [], [] 362 363 for item in tracked: 364 p = Path(item["path"]) 365 if not p.exists(): 366 continue 367 age = (now - datetime.fromisoformat(item["timestamp"])).days 368 cat = item["category"] 369 370 if cat == "research" and age > 30: 371 research.append(item) 372 elif cat == "chrome-profile" and age > 14: 373 chrome.append(item) 374 elif item["size"] > 500 * 1024 * 1024: 375 large.append(item) 376 377 research.sort(key=lambda x: x["timestamp"], reverse=True) 378 old_research = research[10:] 379 380 freed, count = 0, 0 381 to_remove: List[Dict] = [] 382 383 for group in (old_research, chrome, large): 384 for item in group: 385 if confirm(item): 386 try: 387 p = Path(item["path"]) 388 if p.is_file(): 389 p.unlink() 390 elif p.is_dir(): 391 shutil.rmtree(p) 392 to_remove.append(item) 393 freed += item["size"] 394 count += 1 395 _log( 396 f"DELETED: {p} ({item['category']}, " 397 f"{fmt_size(item['size'])})" 398 ) 399 except OSError as e: 400 _log(f"ERROR deleting {item['path']}: {e}") 401 402 if to_remove: 403 remove_paths = {i["path"] for i in to_remove} 404 save_tracked([i for i in tracked if i["path"] not in remove_paths]) 405 406 return {"quick": quick_result, "deep_deleted": count, "deep_freed": freed} 407 408 409 # --------------------------------------------------------------------------- 410 # Status 411 # --------------------------------------------------------------------------- 412 413 def status() -> Dict[str, Any]: 414 """Return per-category breakdown and top 10 largest tracked files.""" 415 tracked = load_tracked() 416 cats: Dict[str, Dict] = {} 417 for item in tracked: 418 c = item["category"] 419 cats.setdefault(c, {"count": 0, "size": 0}) 420 cats[c]["count"] += 1 421 cats[c]["size"] += item["size"] 422 423 existing = [ 424 (i["path"], i["size"], i["category"]) 425 for i in tracked if Path(i["path"]).exists() 426 ] 427 existing.sort(key=lambda x: x[1], reverse=True) 428 429 return { 430 "categories": cats, 431 "top10": existing[:10], 432 "total_tracked": len(tracked), 433 } 434 435 436 def format_status(s: Dict[str, Any]) -> str: 437 """Human-readable status string (for slash command output).""" 438 lines = [f"{'Category':<20} {'Files':>6} {'Size':>10}", "-" * 40] 439 cats = s["categories"] 440 for cat, d in sorted(cats.items(), key=lambda x: x[1]["size"], reverse=True): 441 lines.append(f"{cat:<20} {d['count']:>6} {fmt_size(d['size']):>10}") 442 443 if not cats: 444 lines.append("(nothing tracked yet)") 445 446 lines.append("") 447 lines.append("Top 10 largest tracked files:") 448 if not s["top10"]: 449 lines.append(" (none)") 450 else: 451 for rank, (path, size, cat) in enumerate(s["top10"], 1): 452 lines.append(f" {rank:>2}. {fmt_size(size):>8} [{cat}] {path}") 453 return "\n".join(lines) 454 455 456 # --------------------------------------------------------------------------- 457 # Auto-categorisation from tool-call inspection 458 # --------------------------------------------------------------------------- 459 460 _TEST_PATTERNS = ("test_", "tmp_") 461 _TEST_SUFFIXES = (".test.py", ".test.js", ".test.ts", ".test.md") 462 463 464 def guess_category(path: Path) -> Optional[str]: 465 """Return a category label for *path*, or None if we shouldn't track it. 466 467 Used by the ``post_tool_call`` hook to auto-track ephemeral files. 468 """ 469 if not is_safe_path(path): 470 return None 471 472 # Skip the state dir itself, logs, memory files, sessions, config. 473 hermes_home = get_hermes_home() 474 try: 475 rel = path.resolve().relative_to(hermes_home) 476 top = rel.parts[0] if rel.parts else "" 477 if top in { 478 "disk-cleanup", "logs", "memories", "sessions", "config.yaml", 479 "skills", "plugins", ".env", "USER.md", "MEMORY.md", "SOUL.md", 480 "auth.json", "hermes-agent", 481 }: 482 return None 483 if top == "cron" or top == "cronjobs": 484 return "cron-output" 485 if top == "cache": 486 return "temp" 487 except ValueError: 488 # Path isn't under HERMES_HOME (e.g. /tmp/hermes-*) — fall through. 489 pass 490 491 name = path.name 492 if name.startswith(_TEST_PATTERNS): 493 return "test" 494 if any(name.endswith(sfx) for sfx in _TEST_SUFFIXES): 495 return "test" 496 return None