memory_tool.py
1 #!/usr/bin/env python3 2 """ 3 Memory Tool Module - Persistent Curated Memory 4 5 Provides bounded, file-backed memory that persists across sessions. Two stores: 6 - MEMORY.md: agent's personal notes and observations (environment facts, project 7 conventions, tool quirks, things learned) 8 - USER.md: what the agent knows about the user (preferences, communication style, 9 expectations, workflow habits) 10 11 Both are injected into the system prompt as a frozen snapshot at session start. 12 Mid-session writes update files on disk immediately (durable) but do NOT change 13 the system prompt -- this preserves the prefix cache for the entire session. 14 The snapshot refreshes on the next session start. 15 16 Entry delimiter: § (section sign). Entries can be multiline. 17 Character limits (not tokens) because char counts are model-independent. 18 19 Design: 20 - Single `memory` tool with action parameter: add, replace, remove, read 21 - replace/remove use short unique substring matching (not full text or IDs) 22 - Behavioral guidance lives in the tool schema description 23 - Frozen snapshot pattern: system prompt is stable, tool responses show live state 24 """ 25 26 import json 27 import logging 28 import os 29 import re 30 import tempfile 31 from contextlib import contextmanager 32 from pathlib import Path 33 from hermes_constants import get_hermes_home 34 from typing import Dict, Any, List, Optional 35 36 from utils import atomic_replace 37 38 # fcntl is Unix-only; on Windows use msvcrt for file locking 39 msvcrt = None 40 try: 41 import fcntl 42 except ImportError: 43 fcntl = None 44 try: 45 import msvcrt 46 except ImportError: 47 pass 48 49 logger = logging.getLogger(__name__) 50 51 # Where memory files live — resolved dynamically so profile overrides 52 # (HERMES_HOME env var changes) are always respected. The old module-level 53 # constant was cached at import time and could go stale if a profile switch 54 # happened after the first import. 55 def get_memory_dir() -> Path: 56 """Return the profile-scoped memories directory.""" 57 return get_hermes_home() / "memories" 58 59 ENTRY_DELIMITER = "\n§\n" 60 61 62 # --------------------------------------------------------------------------- 63 # Memory content scanning — lightweight check for injection/exfiltration 64 # in content that gets injected into the system prompt. 65 # --------------------------------------------------------------------------- 66 67 _MEMORY_THREAT_PATTERNS = [ 68 # Prompt injection 69 (r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"), 70 (r'you\s+are\s+now\s+', "role_hijack"), 71 (r'do\s+not\s+tell\s+the\s+user', "deception_hide"), 72 (r'system\s+prompt\s+override', "sys_prompt_override"), 73 (r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"), 74 (r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"), 75 # Exfiltration via curl/wget with secrets 76 (r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"), 77 (r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"), 78 (r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass|\.npmrc|\.pypirc)', "read_secrets"), 79 # Persistence via shell rc 80 (r'authorized_keys', "ssh_backdoor"), 81 (r'\$HOME/\.ssh|\~/\.ssh', "ssh_access"), 82 (r'\$HOME/\.hermes/\.env|\~/\.hermes/\.env', "hermes_env"), 83 ] 84 85 # Subset of invisible chars for injection detection 86 _INVISIBLE_CHARS = { 87 '\u200b', '\u200c', '\u200d', '\u2060', '\ufeff', 88 '\u202a', '\u202b', '\u202c', '\u202d', '\u202e', 89 } 90 91 92 def _scan_memory_content(content: str) -> Optional[str]: 93 """Scan memory content for injection/exfil patterns. Returns error string if blocked.""" 94 # Check invisible unicode 95 for char in _INVISIBLE_CHARS: 96 if char in content: 97 return f"Blocked: content contains invisible unicode character U+{ord(char):04X} (possible injection)." 98 99 # Check threat patterns 100 for pattern, pid in _MEMORY_THREAT_PATTERNS: 101 if re.search(pattern, content, re.IGNORECASE): 102 return f"Blocked: content matches threat pattern '{pid}'. Memory entries are injected into the system prompt and must not contain injection or exfiltration payloads." 103 104 return None 105 106 107 class MemoryStore: 108 """ 109 Bounded curated memory with file persistence. One instance per AIAgent. 110 111 Maintains two parallel states: 112 - _system_prompt_snapshot: frozen at load time, used for system prompt injection. 113 Never mutated mid-session. Keeps prefix cache stable. 114 - memory_entries / user_entries: live state, mutated by tool calls, persisted to disk. 115 Tool responses always reflect this live state. 116 """ 117 118 def __init__(self, memory_char_limit: int = 2200, user_char_limit: int = 1375): 119 self.memory_entries: List[str] = [] 120 self.user_entries: List[str] = [] 121 self.memory_char_limit = memory_char_limit 122 self.user_char_limit = user_char_limit 123 # Frozen snapshot for system prompt -- set once at load_from_disk() 124 self._system_prompt_snapshot: Dict[str, str] = {"memory": "", "user": ""} 125 126 def load_from_disk(self): 127 """Load entries from MEMORY.md and USER.md, capture system prompt snapshot.""" 128 mem_dir = get_memory_dir() 129 mem_dir.mkdir(parents=True, exist_ok=True) 130 131 self.memory_entries = self._read_file(mem_dir / "MEMORY.md") 132 self.user_entries = self._read_file(mem_dir / "USER.md") 133 134 # Deduplicate entries (preserves order, keeps first occurrence) 135 self.memory_entries = list(dict.fromkeys(self.memory_entries)) 136 self.user_entries = list(dict.fromkeys(self.user_entries)) 137 138 # Capture frozen snapshot for system prompt injection 139 self._system_prompt_snapshot = { 140 "memory": self._render_block("memory", self.memory_entries), 141 "user": self._render_block("user", self.user_entries), 142 } 143 144 @staticmethod 145 @contextmanager 146 def _file_lock(path: Path): 147 """Acquire an exclusive file lock for read-modify-write safety. 148 149 Uses a separate .lock file so the memory file itself can still be 150 atomically replaced via os.replace(). 151 """ 152 lock_path = path.with_suffix(path.suffix + ".lock") 153 lock_path.parent.mkdir(parents=True, exist_ok=True) 154 155 if fcntl is None and msvcrt is None: 156 yield 157 return 158 159 if msvcrt and (not lock_path.exists() or lock_path.stat().st_size == 0): 160 lock_path.write_text(" ", encoding="utf-8") 161 162 fd = open(lock_path, "r+" if msvcrt else "a+") 163 try: 164 if fcntl: 165 fcntl.flock(fd, fcntl.LOCK_EX) 166 else: 167 fd.seek(0) 168 msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1) 169 yield 170 finally: 171 if fcntl: 172 fcntl.flock(fd, fcntl.LOCK_UN) 173 elif msvcrt: 174 try: 175 fd.seek(0) 176 msvcrt.locking(fd.fileno(), msvcrt.LK_UNLCK, 1) 177 except (OSError, IOError): 178 pass 179 fd.close() 180 181 @staticmethod 182 def _path_for(target: str) -> Path: 183 mem_dir = get_memory_dir() 184 if target == "user": 185 return mem_dir / "USER.md" 186 return mem_dir / "MEMORY.md" 187 188 def _reload_target(self, target: str): 189 """Re-read entries from disk into in-memory state. 190 191 Called under file lock to get the latest state before mutating. 192 """ 193 fresh = self._read_file(self._path_for(target)) 194 fresh = list(dict.fromkeys(fresh)) # deduplicate 195 self._set_entries(target, fresh) 196 197 def save_to_disk(self, target: str): 198 """Persist entries to the appropriate file. Called after every mutation.""" 199 get_memory_dir().mkdir(parents=True, exist_ok=True) 200 self._write_file(self._path_for(target), self._entries_for(target)) 201 202 def _entries_for(self, target: str) -> List[str]: 203 if target == "user": 204 return self.user_entries 205 return self.memory_entries 206 207 def _set_entries(self, target: str, entries: List[str]): 208 if target == "user": 209 self.user_entries = entries 210 else: 211 self.memory_entries = entries 212 213 def _char_count(self, target: str) -> int: 214 entries = self._entries_for(target) 215 if not entries: 216 return 0 217 return len(ENTRY_DELIMITER.join(entries)) 218 219 def _char_limit(self, target: str) -> int: 220 if target == "user": 221 return self.user_char_limit 222 return self.memory_char_limit 223 224 def add(self, target: str, content: str) -> Dict[str, Any]: 225 """Append a new entry. Returns error if it would exceed the char limit.""" 226 content = content.strip() 227 if not content: 228 return {"success": False, "error": "Content cannot be empty."} 229 230 # Scan for injection/exfiltration before accepting 231 scan_error = _scan_memory_content(content) 232 if scan_error: 233 return {"success": False, "error": scan_error} 234 235 with self._file_lock(self._path_for(target)): 236 # Re-read from disk under lock to pick up writes from other sessions 237 self._reload_target(target) 238 239 entries = self._entries_for(target) 240 limit = self._char_limit(target) 241 242 # Reject exact duplicates 243 if content in entries: 244 return self._success_response(target, "Entry already exists (no duplicate added).") 245 246 # Calculate what the new total would be 247 new_entries = entries + [content] 248 new_total = len(ENTRY_DELIMITER.join(new_entries)) 249 250 if new_total > limit: 251 current = self._char_count(target) 252 return { 253 "success": False, 254 "error": ( 255 f"Memory at {current:,}/{limit:,} chars. " 256 f"Adding this entry ({len(content)} chars) would exceed the limit. " 257 f"Replace or remove existing entries first." 258 ), 259 "current_entries": entries, 260 "usage": f"{current:,}/{limit:,}", 261 } 262 263 entries.append(content) 264 self._set_entries(target, entries) 265 self.save_to_disk(target) 266 267 return self._success_response(target, "Entry added.") 268 269 def replace(self, target: str, old_text: str, new_content: str) -> Dict[str, Any]: 270 """Find entry containing old_text substring, replace it with new_content.""" 271 old_text = old_text.strip() 272 new_content = new_content.strip() 273 if not old_text: 274 return {"success": False, "error": "old_text cannot be empty."} 275 if not new_content: 276 return {"success": False, "error": "new_content cannot be empty. Use 'remove' to delete entries."} 277 278 # Scan replacement content for injection/exfiltration 279 scan_error = _scan_memory_content(new_content) 280 if scan_error: 281 return {"success": False, "error": scan_error} 282 283 with self._file_lock(self._path_for(target)): 284 self._reload_target(target) 285 286 entries = self._entries_for(target) 287 matches = [(i, e) for i, e in enumerate(entries) if old_text in e] 288 289 if not matches: 290 return {"success": False, "error": f"No entry matched '{old_text}'."} 291 292 if len(matches) > 1: 293 # If all matches are identical (exact duplicates), operate on the first one 294 unique_texts = set(e for _, e in matches) 295 if len(unique_texts) > 1: 296 previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches] 297 return { 298 "success": False, 299 "error": f"Multiple entries matched '{old_text}'. Be more specific.", 300 "matches": previews, 301 } 302 # All identical -- safe to replace just the first 303 304 idx = matches[0][0] 305 limit = self._char_limit(target) 306 307 # Check that replacement doesn't blow the budget 308 test_entries = entries.copy() 309 test_entries[idx] = new_content 310 new_total = len(ENTRY_DELIMITER.join(test_entries)) 311 312 if new_total > limit: 313 return { 314 "success": False, 315 "error": ( 316 f"Replacement would put memory at {new_total:,}/{limit:,} chars. " 317 f"Shorten the new content or remove other entries first." 318 ), 319 } 320 321 entries[idx] = new_content 322 self._set_entries(target, entries) 323 self.save_to_disk(target) 324 325 return self._success_response(target, "Entry replaced.") 326 327 def remove(self, target: str, old_text: str) -> Dict[str, Any]: 328 """Remove the entry containing old_text substring.""" 329 old_text = old_text.strip() 330 if not old_text: 331 return {"success": False, "error": "old_text cannot be empty."} 332 333 with self._file_lock(self._path_for(target)): 334 self._reload_target(target) 335 336 entries = self._entries_for(target) 337 matches = [(i, e) for i, e in enumerate(entries) if old_text in e] 338 339 if not matches: 340 return {"success": False, "error": f"No entry matched '{old_text}'."} 341 342 if len(matches) > 1: 343 # If all matches are identical (exact duplicates), remove the first one 344 unique_texts = set(e for _, e in matches) 345 if len(unique_texts) > 1: 346 previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches] 347 return { 348 "success": False, 349 "error": f"Multiple entries matched '{old_text}'. Be more specific.", 350 "matches": previews, 351 } 352 # All identical -- safe to remove just the first 353 354 idx = matches[0][0] 355 entries.pop(idx) 356 self._set_entries(target, entries) 357 self.save_to_disk(target) 358 359 return self._success_response(target, "Entry removed.") 360 361 def format_for_system_prompt(self, target: str) -> Optional[str]: 362 """ 363 Return the frozen snapshot for system prompt injection. 364 365 This returns the state captured at load_from_disk() time, NOT the live 366 state. Mid-session writes do not affect this. This keeps the system 367 prompt stable across all turns, preserving the prefix cache. 368 369 Returns None if the snapshot is empty (no entries at load time). 370 """ 371 block = self._system_prompt_snapshot.get(target, "") 372 return block if block else None 373 374 # -- Internal helpers -- 375 376 def _success_response(self, target: str, message: str = None) -> Dict[str, Any]: 377 entries = self._entries_for(target) 378 current = self._char_count(target) 379 limit = self._char_limit(target) 380 pct = min(100, int((current / limit) * 100)) if limit > 0 else 0 381 382 resp = { 383 "success": True, 384 "target": target, 385 "entries": entries, 386 "usage": f"{pct}% — {current:,}/{limit:,} chars", 387 "entry_count": len(entries), 388 } 389 if message: 390 resp["message"] = message 391 return resp 392 393 def _render_block(self, target: str, entries: List[str]) -> str: 394 """Render a system prompt block with header and usage indicator.""" 395 if not entries: 396 return "" 397 398 limit = self._char_limit(target) 399 content = ENTRY_DELIMITER.join(entries) 400 current = len(content) 401 pct = min(100, int((current / limit) * 100)) if limit > 0 else 0 402 403 if target == "user": 404 header = f"USER PROFILE (who the user is) [{pct}% — {current:,}/{limit:,} chars]" 405 else: 406 header = f"MEMORY (your personal notes) [{pct}% — {current:,}/{limit:,} chars]" 407 408 separator = "═" * 46 409 return f"{separator}\n{header}\n{separator}\n{content}" 410 411 @staticmethod 412 def _read_file(path: Path) -> List[str]: 413 """Read a memory file and split into entries. 414 415 No file locking needed: _write_file uses atomic rename, so readers 416 always see either the previous complete file or the new complete file. 417 """ 418 if not path.exists(): 419 return [] 420 try: 421 raw = path.read_text(encoding="utf-8") 422 except (OSError, IOError): 423 return [] 424 425 if not raw.strip(): 426 return [] 427 428 # Use ENTRY_DELIMITER for consistency with _write_file. Splitting by "§" 429 # alone would incorrectly split entries that contain "§" in their content. 430 entries = [e.strip() for e in raw.split(ENTRY_DELIMITER)] 431 return [e for e in entries if e] 432 433 @staticmethod 434 def _write_file(path: Path, entries: List[str]): 435 """Write entries to a memory file using atomic temp-file + rename. 436 437 Previous implementation used open("w") + flock, but "w" truncates the 438 file *before* the lock is acquired, creating a race window where 439 concurrent readers see an empty file. Atomic rename avoids this: 440 readers always see either the old complete file or the new one. 441 """ 442 content = ENTRY_DELIMITER.join(entries) if entries else "" 443 try: 444 # Write to temp file in same directory (same filesystem for atomic rename) 445 fd, tmp_path = tempfile.mkstemp( 446 dir=str(path.parent), suffix=".tmp", prefix=".mem_" 447 ) 448 try: 449 with os.fdopen(fd, "w", encoding="utf-8") as f: 450 f.write(content) 451 f.flush() 452 os.fsync(f.fileno()) 453 atomic_replace(tmp_path, path) 454 except BaseException: 455 # Clean up temp file on any failure 456 try: 457 os.unlink(tmp_path) 458 except OSError: 459 pass 460 raise 461 except (OSError, IOError) as e: 462 raise RuntimeError(f"Failed to write memory file {path}: {e}") 463 464 465 def memory_tool( 466 action: str, 467 target: str = "memory", 468 content: str = None, 469 old_text: str = None, 470 store: Optional[MemoryStore] = None, 471 ) -> str: 472 """ 473 Single entry point for the memory tool. Dispatches to MemoryStore methods. 474 475 Returns JSON string with results. 476 """ 477 if store is None: 478 return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False) 479 480 if target not in ("memory", "user"): 481 return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False) 482 483 if action == "add": 484 if not content: 485 return tool_error("Content is required for 'add' action.", success=False) 486 result = store.add(target, content) 487 488 elif action == "replace": 489 if not old_text: 490 return tool_error("old_text is required for 'replace' action.", success=False) 491 if not content: 492 return tool_error("content is required for 'replace' action.", success=False) 493 result = store.replace(target, old_text, content) 494 495 elif action == "remove": 496 if not old_text: 497 return tool_error("old_text is required for 'remove' action.", success=False) 498 result = store.remove(target, old_text) 499 500 else: 501 return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False) 502 503 return json.dumps(result, ensure_ascii=False) 504 505 506 def check_memory_requirements() -> bool: 507 """Memory tool has no external requirements -- always available.""" 508 return True 509 510 511 # ============================================================================= 512 # OpenAI Function-Calling Schema 513 # ============================================================================= 514 515 MEMORY_SCHEMA = { 516 "name": "memory", 517 "description": ( 518 "Save durable information to persistent memory that survives across sessions. " 519 "Memory is injected into future turns, so keep it compact and focused on facts " 520 "that will still matter later.\n\n" 521 "WHEN TO SAVE (do this proactively, don't wait to be asked):\n" 522 "- User corrects you or says 'remember this' / 'don't do that again'\n" 523 "- User shares a preference, habit, or personal detail (name, role, timezone, coding style)\n" 524 "- You discover something about the environment (OS, installed tools, project structure)\n" 525 "- You learn a convention, API quirk, or workflow specific to this user's setup\n" 526 "- You identify a stable fact that will be useful again in future sessions\n\n" 527 "PRIORITY: User preferences and corrections > environment facts > procedural knowledge. " 528 "The most valuable memory prevents the user from having to repeat themselves.\n\n" 529 "Do NOT save task progress, session outcomes, completed-work logs, or temporary TODO " 530 "state to memory; use session_search to recall those from past transcripts.\n" 531 "If you've discovered a new way to do something, solved a problem that could be " 532 "necessary later, save it as a skill with the skill tool.\n\n" 533 "TWO TARGETS:\n" 534 "- 'user': who the user is -- name, role, preferences, communication style, pet peeves\n" 535 "- 'memory': your notes -- environment facts, project conventions, tool quirks, lessons learned\n\n" 536 "ACTIONS: add (new entry), replace (update existing -- old_text identifies it), " 537 "remove (delete -- old_text identifies it).\n\n" 538 "SKIP: trivial/obvious info, things easily re-discovered, raw data dumps, and temporary task state." 539 ), 540 "parameters": { 541 "type": "object", 542 "properties": { 543 "action": { 544 "type": "string", 545 "enum": ["add", "replace", "remove"], 546 "description": "The action to perform." 547 }, 548 "target": { 549 "type": "string", 550 "enum": ["memory", "user"], 551 "description": "Which memory store: 'memory' for personal notes, 'user' for user profile." 552 }, 553 "content": { 554 "type": "string", 555 "description": "The entry content. Required for 'add' and 'replace'." 556 }, 557 "old_text": { 558 "type": "string", 559 "description": "Short unique substring identifying the entry to replace or remove." 560 }, 561 }, 562 "required": ["action", "target"], 563 }, 564 } 565 566 567 # --- Registry --- 568 from tools.registry import registry, tool_error 569 570 registry.register( 571 name="memory", 572 toolset="memory", 573 schema=MEMORY_SCHEMA, 574 handler=lambda args, **kw: memory_tool( 575 action=args.get("action", ""), 576 target=args.get("target", "memory"), 577 content=args.get("content"), 578 old_text=args.get("old_text"), 579 store=kw.get("store")), 580 check_fn=check_memory_requirements, 581 emoji="🧠", 582 ) 583 584 585 586