/ tools / memory_tool.py
memory_tool.py
  1  #!/usr/bin/env python3
  2  """
  3  Memory Tool Module - Persistent Curated Memory
  4  
  5  Provides bounded, file-backed memory that persists across sessions. Two stores:
  6    - MEMORY.md: agent's personal notes and observations (environment facts, project
  7      conventions, tool quirks, things learned)
  8    - USER.md: what the agent knows about the user (preferences, communication style,
  9      expectations, workflow habits)
 10  
 11  Both are injected into the system prompt as a frozen snapshot at session start.
 12  Mid-session writes update files on disk immediately (durable) but do NOT change
 13  the system prompt -- this preserves the prefix cache for the entire session.
 14  The snapshot refreshes on the next session start.
 15  
 16  Entry delimiter: § (section sign). Entries can be multiline.
 17  Character limits (not tokens) because char counts are model-independent.
 18  
 19  Design:
 20  - Single `memory` tool with action parameter: add, replace, remove, read
 21  - replace/remove use short unique substring matching (not full text or IDs)
 22  - Behavioral guidance lives in the tool schema description
 23  - Frozen snapshot pattern: system prompt is stable, tool responses show live state
 24  """
 25  
 26  import json
 27  import logging
 28  import os
 29  import re
 30  import tempfile
 31  from contextlib import contextmanager
 32  from pathlib import Path
 33  from hermes_constants import get_hermes_home
 34  from typing import Dict, Any, List, Optional
 35  
 36  from utils import atomic_replace
 37  
 38  # fcntl is Unix-only; on Windows use msvcrt for file locking
 39  msvcrt = None
 40  try:
 41      import fcntl
 42  except ImportError:
 43      fcntl = None
 44      try:
 45          import msvcrt
 46      except ImportError:
 47          pass
 48  
 49  logger = logging.getLogger(__name__)
 50  
 51  # Where memory files live — resolved dynamically so profile overrides
 52  # (HERMES_HOME env var changes) are always respected.  The old module-level
 53  # constant was cached at import time and could go stale if a profile switch
 54  # happened after the first import.
 55  def get_memory_dir() -> Path:
 56      """Return the profile-scoped memories directory."""
 57      return get_hermes_home() / "memories"
 58  
 59  ENTRY_DELIMITER = "\n§\n"
 60  
 61  
 62  # ---------------------------------------------------------------------------
 63  # Memory content scanning — lightweight check for injection/exfiltration
 64  # in content that gets injected into the system prompt.
 65  # ---------------------------------------------------------------------------
 66  
 67  _MEMORY_THREAT_PATTERNS = [
 68      # Prompt injection
 69      (r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
 70      (r'you\s+are\s+now\s+', "role_hijack"),
 71      (r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
 72      (r'system\s+prompt\s+override', "sys_prompt_override"),
 73      (r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
 74      (r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"),
 75      # Exfiltration via curl/wget with secrets
 76      (r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
 77      (r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"),
 78      (r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass|\.npmrc|\.pypirc)', "read_secrets"),
 79      # Persistence via shell rc
 80      (r'authorized_keys', "ssh_backdoor"),
 81      (r'\$HOME/\.ssh|\~/\.ssh', "ssh_access"),
 82      (r'\$HOME/\.hermes/\.env|\~/\.hermes/\.env', "hermes_env"),
 83  ]
 84  
 85  # Subset of invisible chars for injection detection
 86  _INVISIBLE_CHARS = {
 87      '\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
 88      '\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
 89  }
 90  
 91  
 92  def _scan_memory_content(content: str) -> Optional[str]:
 93      """Scan memory content for injection/exfil patterns. Returns error string if blocked."""
 94      # Check invisible unicode
 95      for char in _INVISIBLE_CHARS:
 96          if char in content:
 97              return f"Blocked: content contains invisible unicode character U+{ord(char):04X} (possible injection)."
 98  
 99      # Check threat patterns
100      for pattern, pid in _MEMORY_THREAT_PATTERNS:
101          if re.search(pattern, content, re.IGNORECASE):
102              return f"Blocked: content matches threat pattern '{pid}'. Memory entries are injected into the system prompt and must not contain injection or exfiltration payloads."
103  
104      return None
105  
106  
107  class MemoryStore:
108      """
109      Bounded curated memory with file persistence. One instance per AIAgent.
110  
111      Maintains two parallel states:
112        - _system_prompt_snapshot: frozen at load time, used for system prompt injection.
113          Never mutated mid-session. Keeps prefix cache stable.
114        - memory_entries / user_entries: live state, mutated by tool calls, persisted to disk.
115          Tool responses always reflect this live state.
116      """
117  
118      def __init__(self, memory_char_limit: int = 2200, user_char_limit: int = 1375):
119          self.memory_entries: List[str] = []
120          self.user_entries: List[str] = []
121          self.memory_char_limit = memory_char_limit
122          self.user_char_limit = user_char_limit
123          # Frozen snapshot for system prompt -- set once at load_from_disk()
124          self._system_prompt_snapshot: Dict[str, str] = {"memory": "", "user": ""}
125  
126      def load_from_disk(self):
127          """Load entries from MEMORY.md and USER.md, capture system prompt snapshot."""
128          mem_dir = get_memory_dir()
129          mem_dir.mkdir(parents=True, exist_ok=True)
130  
131          self.memory_entries = self._read_file(mem_dir / "MEMORY.md")
132          self.user_entries = self._read_file(mem_dir / "USER.md")
133  
134          # Deduplicate entries (preserves order, keeps first occurrence)
135          self.memory_entries = list(dict.fromkeys(self.memory_entries))
136          self.user_entries = list(dict.fromkeys(self.user_entries))
137  
138          # Capture frozen snapshot for system prompt injection
139          self._system_prompt_snapshot = {
140              "memory": self._render_block("memory", self.memory_entries),
141              "user": self._render_block("user", self.user_entries),
142          }
143  
144      @staticmethod
145      @contextmanager
146      def _file_lock(path: Path):
147          """Acquire an exclusive file lock for read-modify-write safety.
148  
149          Uses a separate .lock file so the memory file itself can still be
150          atomically replaced via os.replace().
151          """
152          lock_path = path.with_suffix(path.suffix + ".lock")
153          lock_path.parent.mkdir(parents=True, exist_ok=True)
154  
155          if fcntl is None and msvcrt is None:
156              yield
157              return
158  
159          if msvcrt and (not lock_path.exists() or lock_path.stat().st_size == 0):
160              lock_path.write_text(" ", encoding="utf-8")
161  
162          fd = open(lock_path, "r+" if msvcrt else "a+")
163          try:
164              if fcntl:
165                  fcntl.flock(fd, fcntl.LOCK_EX)
166              else:
167                  fd.seek(0)
168                  msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1)
169              yield
170          finally:
171              if fcntl:
172                  fcntl.flock(fd, fcntl.LOCK_UN)
173              elif msvcrt:
174                  try:
175                      fd.seek(0)
176                      msvcrt.locking(fd.fileno(), msvcrt.LK_UNLCK, 1)
177                  except (OSError, IOError):
178                      pass
179              fd.close()
180  
181      @staticmethod
182      def _path_for(target: str) -> Path:
183          mem_dir = get_memory_dir()
184          if target == "user":
185              return mem_dir / "USER.md"
186          return mem_dir / "MEMORY.md"
187  
188      def _reload_target(self, target: str):
189          """Re-read entries from disk into in-memory state.
190  
191          Called under file lock to get the latest state before mutating.
192          """
193          fresh = self._read_file(self._path_for(target))
194          fresh = list(dict.fromkeys(fresh))  # deduplicate
195          self._set_entries(target, fresh)
196  
197      def save_to_disk(self, target: str):
198          """Persist entries to the appropriate file. Called after every mutation."""
199          get_memory_dir().mkdir(parents=True, exist_ok=True)
200          self._write_file(self._path_for(target), self._entries_for(target))
201  
202      def _entries_for(self, target: str) -> List[str]:
203          if target == "user":
204              return self.user_entries
205          return self.memory_entries
206  
207      def _set_entries(self, target: str, entries: List[str]):
208          if target == "user":
209              self.user_entries = entries
210          else:
211              self.memory_entries = entries
212  
213      def _char_count(self, target: str) -> int:
214          entries = self._entries_for(target)
215          if not entries:
216              return 0
217          return len(ENTRY_DELIMITER.join(entries))
218  
219      def _char_limit(self, target: str) -> int:
220          if target == "user":
221              return self.user_char_limit
222          return self.memory_char_limit
223  
224      def add(self, target: str, content: str) -> Dict[str, Any]:
225          """Append a new entry. Returns error if it would exceed the char limit."""
226          content = content.strip()
227          if not content:
228              return {"success": False, "error": "Content cannot be empty."}
229  
230          # Scan for injection/exfiltration before accepting
231          scan_error = _scan_memory_content(content)
232          if scan_error:
233              return {"success": False, "error": scan_error}
234  
235          with self._file_lock(self._path_for(target)):
236              # Re-read from disk under lock to pick up writes from other sessions
237              self._reload_target(target)
238  
239              entries = self._entries_for(target)
240              limit = self._char_limit(target)
241  
242              # Reject exact duplicates
243              if content in entries:
244                  return self._success_response(target, "Entry already exists (no duplicate added).")
245  
246              # Calculate what the new total would be
247              new_entries = entries + [content]
248              new_total = len(ENTRY_DELIMITER.join(new_entries))
249  
250              if new_total > limit:
251                  current = self._char_count(target)
252                  return {
253                      "success": False,
254                      "error": (
255                          f"Memory at {current:,}/{limit:,} chars. "
256                          f"Adding this entry ({len(content)} chars) would exceed the limit. "
257                          f"Replace or remove existing entries first."
258                      ),
259                      "current_entries": entries,
260                      "usage": f"{current:,}/{limit:,}",
261                  }
262  
263              entries.append(content)
264              self._set_entries(target, entries)
265              self.save_to_disk(target)
266  
267          return self._success_response(target, "Entry added.")
268  
269      def replace(self, target: str, old_text: str, new_content: str) -> Dict[str, Any]:
270          """Find entry containing old_text substring, replace it with new_content."""
271          old_text = old_text.strip()
272          new_content = new_content.strip()
273          if not old_text:
274              return {"success": False, "error": "old_text cannot be empty."}
275          if not new_content:
276              return {"success": False, "error": "new_content cannot be empty. Use 'remove' to delete entries."}
277  
278          # Scan replacement content for injection/exfiltration
279          scan_error = _scan_memory_content(new_content)
280          if scan_error:
281              return {"success": False, "error": scan_error}
282  
283          with self._file_lock(self._path_for(target)):
284              self._reload_target(target)
285  
286              entries = self._entries_for(target)
287              matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
288  
289              if not matches:
290                  return {"success": False, "error": f"No entry matched '{old_text}'."}
291  
292              if len(matches) > 1:
293                  # If all matches are identical (exact duplicates), operate on the first one
294                  unique_texts = set(e for _, e in matches)
295                  if len(unique_texts) > 1:
296                      previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
297                      return {
298                          "success": False,
299                          "error": f"Multiple entries matched '{old_text}'. Be more specific.",
300                          "matches": previews,
301                      }
302                  # All identical -- safe to replace just the first
303  
304              idx = matches[0][0]
305              limit = self._char_limit(target)
306  
307              # Check that replacement doesn't blow the budget
308              test_entries = entries.copy()
309              test_entries[idx] = new_content
310              new_total = len(ENTRY_DELIMITER.join(test_entries))
311  
312              if new_total > limit:
313                  return {
314                      "success": False,
315                      "error": (
316                          f"Replacement would put memory at {new_total:,}/{limit:,} chars. "
317                          f"Shorten the new content or remove other entries first."
318                      ),
319                  }
320  
321              entries[idx] = new_content
322              self._set_entries(target, entries)
323              self.save_to_disk(target)
324  
325          return self._success_response(target, "Entry replaced.")
326  
327      def remove(self, target: str, old_text: str) -> Dict[str, Any]:
328          """Remove the entry containing old_text substring."""
329          old_text = old_text.strip()
330          if not old_text:
331              return {"success": False, "error": "old_text cannot be empty."}
332  
333          with self._file_lock(self._path_for(target)):
334              self._reload_target(target)
335  
336              entries = self._entries_for(target)
337              matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
338  
339              if not matches:
340                  return {"success": False, "error": f"No entry matched '{old_text}'."}
341  
342              if len(matches) > 1:
343                  # If all matches are identical (exact duplicates), remove the first one
344                  unique_texts = set(e for _, e in matches)
345                  if len(unique_texts) > 1:
346                      previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
347                      return {
348                          "success": False,
349                          "error": f"Multiple entries matched '{old_text}'. Be more specific.",
350                          "matches": previews,
351                      }
352                  # All identical -- safe to remove just the first
353  
354              idx = matches[0][0]
355              entries.pop(idx)
356              self._set_entries(target, entries)
357              self.save_to_disk(target)
358  
359          return self._success_response(target, "Entry removed.")
360  
361      def format_for_system_prompt(self, target: str) -> Optional[str]:
362          """
363          Return the frozen snapshot for system prompt injection.
364  
365          This returns the state captured at load_from_disk() time, NOT the live
366          state. Mid-session writes do not affect this. This keeps the system
367          prompt stable across all turns, preserving the prefix cache.
368  
369          Returns None if the snapshot is empty (no entries at load time).
370          """
371          block = self._system_prompt_snapshot.get(target, "")
372          return block if block else None
373  
374      # -- Internal helpers --
375  
376      def _success_response(self, target: str, message: str = None) -> Dict[str, Any]:
377          entries = self._entries_for(target)
378          current = self._char_count(target)
379          limit = self._char_limit(target)
380          pct = min(100, int((current / limit) * 100)) if limit > 0 else 0
381  
382          resp = {
383              "success": True,
384              "target": target,
385              "entries": entries,
386              "usage": f"{pct}% — {current:,}/{limit:,} chars",
387              "entry_count": len(entries),
388          }
389          if message:
390              resp["message"] = message
391          return resp
392  
393      def _render_block(self, target: str, entries: List[str]) -> str:
394          """Render a system prompt block with header and usage indicator."""
395          if not entries:
396              return ""
397  
398          limit = self._char_limit(target)
399          content = ENTRY_DELIMITER.join(entries)
400          current = len(content)
401          pct = min(100, int((current / limit) * 100)) if limit > 0 else 0
402  
403          if target == "user":
404              header = f"USER PROFILE (who the user is) [{pct}% — {current:,}/{limit:,} chars]"
405          else:
406              header = f"MEMORY (your personal notes) [{pct}% — {current:,}/{limit:,} chars]"
407  
408          separator = "═" * 46
409          return f"{separator}\n{header}\n{separator}\n{content}"
410  
411      @staticmethod
412      def _read_file(path: Path) -> List[str]:
413          """Read a memory file and split into entries.
414  
415          No file locking needed: _write_file uses atomic rename, so readers
416          always see either the previous complete file or the new complete file.
417          """
418          if not path.exists():
419              return []
420          try:
421              raw = path.read_text(encoding="utf-8")
422          except (OSError, IOError):
423              return []
424  
425          if not raw.strip():
426              return []
427  
428          # Use ENTRY_DELIMITER for consistency with _write_file. Splitting by "§"
429          # alone would incorrectly split entries that contain "§" in their content.
430          entries = [e.strip() for e in raw.split(ENTRY_DELIMITER)]
431          return [e for e in entries if e]
432  
433      @staticmethod
434      def _write_file(path: Path, entries: List[str]):
435          """Write entries to a memory file using atomic temp-file + rename.
436  
437          Previous implementation used open("w") + flock, but "w" truncates the
438          file *before* the lock is acquired, creating a race window where
439          concurrent readers see an empty file. Atomic rename avoids this:
440          readers always see either the old complete file or the new one.
441          """
442          content = ENTRY_DELIMITER.join(entries) if entries else ""
443          try:
444              # Write to temp file in same directory (same filesystem for atomic rename)
445              fd, tmp_path = tempfile.mkstemp(
446                  dir=str(path.parent), suffix=".tmp", prefix=".mem_"
447              )
448              try:
449                  with os.fdopen(fd, "w", encoding="utf-8") as f:
450                      f.write(content)
451                      f.flush()
452                      os.fsync(f.fileno())
453                  atomic_replace(tmp_path, path)
454              except BaseException:
455                  # Clean up temp file on any failure
456                  try:
457                      os.unlink(tmp_path)
458                  except OSError:
459                      pass
460                  raise
461          except (OSError, IOError) as e:
462              raise RuntimeError(f"Failed to write memory file {path}: {e}")
463  
464  
465  def memory_tool(
466      action: str,
467      target: str = "memory",
468      content: str = None,
469      old_text: str = None,
470      store: Optional[MemoryStore] = None,
471  ) -> str:
472      """
473      Single entry point for the memory tool. Dispatches to MemoryStore methods.
474  
475      Returns JSON string with results.
476      """
477      if store is None:
478          return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
479  
480      if target not in ("memory", "user"):
481          return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
482  
483      if action == "add":
484          if not content:
485              return tool_error("Content is required for 'add' action.", success=False)
486          result = store.add(target, content)
487  
488      elif action == "replace":
489          if not old_text:
490              return tool_error("old_text is required for 'replace' action.", success=False)
491          if not content:
492              return tool_error("content is required for 'replace' action.", success=False)
493          result = store.replace(target, old_text, content)
494  
495      elif action == "remove":
496          if not old_text:
497              return tool_error("old_text is required for 'remove' action.", success=False)
498          result = store.remove(target, old_text)
499  
500      else:
501          return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
502  
503      return json.dumps(result, ensure_ascii=False)
504  
505  
506  def check_memory_requirements() -> bool:
507      """Memory tool has no external requirements -- always available."""
508      return True
509  
510  
511  # =============================================================================
512  # OpenAI Function-Calling Schema
513  # =============================================================================
514  
515  MEMORY_SCHEMA = {
516      "name": "memory",
517      "description": (
518          "Save durable information to persistent memory that survives across sessions. "
519          "Memory is injected into future turns, so keep it compact and focused on facts "
520          "that will still matter later.\n\n"
521          "WHEN TO SAVE (do this proactively, don't wait to be asked):\n"
522          "- User corrects you or says 'remember this' / 'don't do that again'\n"
523          "- User shares a preference, habit, or personal detail (name, role, timezone, coding style)\n"
524          "- You discover something about the environment (OS, installed tools, project structure)\n"
525          "- You learn a convention, API quirk, or workflow specific to this user's setup\n"
526          "- You identify a stable fact that will be useful again in future sessions\n\n"
527          "PRIORITY: User preferences and corrections > environment facts > procedural knowledge. "
528          "The most valuable memory prevents the user from having to repeat themselves.\n\n"
529          "Do NOT save task progress, session outcomes, completed-work logs, or temporary TODO "
530          "state to memory; use session_search to recall those from past transcripts.\n"
531          "If you've discovered a new way to do something, solved a problem that could be "
532          "necessary later, save it as a skill with the skill tool.\n\n"
533          "TWO TARGETS:\n"
534          "- 'user': who the user is -- name, role, preferences, communication style, pet peeves\n"
535          "- 'memory': your notes -- environment facts, project conventions, tool quirks, lessons learned\n\n"
536          "ACTIONS: add (new entry), replace (update existing -- old_text identifies it), "
537          "remove (delete -- old_text identifies it).\n\n"
538          "SKIP: trivial/obvious info, things easily re-discovered, raw data dumps, and temporary task state."
539      ),
540      "parameters": {
541          "type": "object",
542          "properties": {
543              "action": {
544                  "type": "string",
545                  "enum": ["add", "replace", "remove"],
546                  "description": "The action to perform."
547              },
548              "target": {
549                  "type": "string",
550                  "enum": ["memory", "user"],
551                  "description": "Which memory store: 'memory' for personal notes, 'user' for user profile."
552              },
553              "content": {
554                  "type": "string",
555                  "description": "The entry content. Required for 'add' and 'replace'."
556              },
557              "old_text": {
558                  "type": "string",
559                  "description": "Short unique substring identifying the entry to replace or remove."
560              },
561          },
562          "required": ["action", "target"],
563      },
564  }
565  
566  
567  # --- Registry ---
568  from tools.registry import registry, tool_error
569  
570  registry.register(
571      name="memory",
572      toolset="memory",
573      schema=MEMORY_SCHEMA,
574      handler=lambda args, **kw: memory_tool(
575          action=args.get("action", ""),
576          target=args.get("target", "memory"),
577          content=args.get("content"),
578          old_text=args.get("old_text"),
579          store=kw.get("store")),
580      check_fn=check_memory_requirements,
581      emoji="🧠",
582  )
583  
584  
585  
586