/ tools / file_tools.py
file_tools.py
   1  #!/usr/bin/env python3
   2  """File Tools Module - LLM agent file manipulation tools."""
   3  
   4  import errno
   5  import json
   6  import logging
   7  import os
   8  import threading
   9  from pathlib import Path
  10  
  11  from agent.file_safety import get_read_block_error
  12  from tools.binary_extensions import has_binary_extension
  13  from tools.file_operations import (
  14      ShellFileOperations,
  15      normalize_read_pagination,
  16      normalize_search_pagination,
  17  )
  18  from tools import file_state
  19  from agent.redact import redact_sensitive_text
  20  
  21  logger = logging.getLogger(__name__)
  22  
  23  
  24  _EXPECTED_WRITE_ERRNOS = {errno.EACCES, errno.EPERM, errno.EROFS}
  25  
  26  # ---------------------------------------------------------------------------
  27  # Read-size guard: cap the character count returned to the model.
  28  # We're model-agnostic so we can't count tokens; characters are a safe proxy.
  29  # 100K chars ≈ 25–35K tokens across typical tokenisers.  Files larger than
  30  # this in a single read are a context-window hazard — the model should use
  31  # offset+limit to read the relevant section.
  32  #
  33  # Configurable via config.yaml:  file_read_max_chars: 200000
  34  # ---------------------------------------------------------------------------
  35  _DEFAULT_MAX_READ_CHARS = 100_000
  36  _max_read_chars_cached: int | None = None
  37  
  38  
  39  def _get_max_read_chars() -> int:
  40      """Return the configured max characters per file read.
  41  
  42      Reads ``file_read_max_chars`` from config.yaml on first call, caches
  43      the result for the lifetime of the process.  Falls back to the
  44      built-in default if the config is missing or invalid.
  45      """
  46      global _max_read_chars_cached
  47      if _max_read_chars_cached is not None:
  48          return _max_read_chars_cached
  49      try:
  50          from hermes_cli.config import load_config
  51          cfg = load_config()
  52          val = cfg.get("file_read_max_chars")
  53          if isinstance(val, (int, float)) and val > 0:
  54              _max_read_chars_cached = int(val)
  55              return _max_read_chars_cached
  56      except Exception:
  57          pass
  58      _max_read_chars_cached = _DEFAULT_MAX_READ_CHARS
  59      return _max_read_chars_cached
  60  
  61  # If the total file size exceeds this AND the caller didn't specify a narrow
  62  # range (limit <= 200), we include a hint encouraging targeted reads.
  63  _LARGE_FILE_HINT_BYTES = 512_000  # 512 KB
  64  
  65  # ---------------------------------------------------------------------------
  66  # Device path blocklist — reading these hangs the process (infinite output
  67  # or blocking on input).  Checked by path only (no I/O).
  68  # ---------------------------------------------------------------------------
  69  _BLOCKED_DEVICE_PATHS = frozenset({
  70      # Infinite output — never reach EOF
  71      "/dev/zero", "/dev/random", "/dev/urandom", "/dev/full",
  72      # Blocks waiting for input
  73      "/dev/stdin", "/dev/tty", "/dev/console",
  74      # Nonsensical to read
  75      "/dev/stdout", "/dev/stderr",
  76      # fd aliases
  77      "/dev/fd/0", "/dev/fd/1", "/dev/fd/2",
  78  })
  79  
  80  
  81  def _resolve_path(filepath: str, task_id: str = "default") -> Path:
  82      """Resolve a path relative to TERMINAL_CWD (the worktree base directory)
  83      instead of the main repository root.
  84      """
  85      return _resolve_path_for_task(filepath, task_id)
  86  
  87  
  88  def _get_live_tracking_cwd(task_id: str = "default") -> str | None:
  89      """Return the task's live terminal cwd for bookkeeping when available."""
  90      try:
  91          from tools.terminal_tool import _resolve_container_task_id
  92          container_key = _resolve_container_task_id(task_id)
  93      except Exception:
  94          container_key = task_id
  95  
  96      with _file_ops_lock:
  97          cached = _file_ops_cache.get(container_key) or _file_ops_cache.get(task_id)
  98      if cached is not None:
  99          live_cwd = getattr(getattr(cached, "env", None), "cwd", None) or getattr(
 100              cached, "cwd", None
 101          )
 102          if live_cwd:
 103              return live_cwd
 104  
 105      try:
 106          from tools.terminal_tool import _active_environments, _env_lock
 107  
 108          with _env_lock:
 109              env = _active_environments.get(container_key) or _active_environments.get(task_id)
 110              live_cwd = getattr(env, "cwd", None) if env is not None else None
 111          if live_cwd:
 112              return live_cwd
 113      except Exception:
 114          pass
 115  
 116      return None
 117  
 118  
 119  def _resolve_path_for_task(filepath: str, task_id: str = "default") -> Path:
 120      """Resolve *filepath* against the task's live terminal cwd when possible."""
 121      p = Path(filepath).expanduser()
 122      if not p.is_absolute():
 123          base = _get_live_tracking_cwd(task_id) or os.environ.get(
 124              "TERMINAL_CWD", os.getcwd()
 125          )
 126          p = Path(base) / p
 127      return p.resolve()
 128  
 129  
 130  def _is_blocked_device(filepath: str) -> bool:
 131      """Return True if the path would hang the process (infinite output or blocking input).
 132  
 133      Uses the *literal* path — no symlink resolution — because the model
 134      specifies paths directly and realpath follows symlinks all the way
 135      through (e.g. /dev/stdin → /proc/self/fd/0 → /dev/pts/0), defeating
 136      the check.
 137      """
 138      normalized = os.path.expanduser(filepath)
 139      if normalized in _BLOCKED_DEVICE_PATHS:
 140          return True
 141      # /proc/self/fd/0-2 and /proc/<pid>/fd/0-2 are Linux aliases for stdio
 142      if normalized.startswith("/proc/") and normalized.endswith(
 143          ("/fd/0", "/fd/1", "/fd/2")
 144      ):
 145          return True
 146      return False
 147  
 148  
 149  # Paths that file tools should refuse to write to without going through the
 150  # terminal tool's approval system.  These match prefixes after os.path.realpath.
 151  _SENSITIVE_PATH_PREFIXES = (
 152      "/etc/", "/boot/", "/usr/lib/systemd/",
 153      "/private/etc/", "/private/var/",
 154  )
 155  _SENSITIVE_EXACT_PATHS = {"/var/run/docker.sock", "/run/docker.sock"}
 156  
 157  
 158  def _check_sensitive_path(filepath: str, task_id: str = "default") -> str | None:
 159      """Return an error message if the path targets a sensitive system location."""
 160      try:
 161          resolved = str(_resolve_path_for_task(filepath, task_id))
 162      except (OSError, ValueError):
 163          resolved = filepath
 164      normalized = os.path.normpath(os.path.expanduser(filepath))
 165      _err = (
 166          f"Refusing to write to sensitive system path: {filepath}\n"
 167          "Use the terminal tool with sudo if you need to modify system files."
 168      )
 169      for prefix in _SENSITIVE_PATH_PREFIXES:
 170          if resolved.startswith(prefix) or normalized.startswith(prefix):
 171              return _err
 172      if resolved in _SENSITIVE_EXACT_PATHS or normalized in _SENSITIVE_EXACT_PATHS:
 173          return _err
 174      return None
 175  
 176  
 177  def _is_expected_write_exception(exc: Exception) -> bool:
 178      """Return True for expected write denials that should not hit error logs."""
 179      if isinstance(exc, PermissionError):
 180          return True
 181      if isinstance(exc, OSError) and exc.errno in _EXPECTED_WRITE_ERRNOS:
 182          return True
 183      return False
 184  
 185  
 186  _file_ops_lock = threading.Lock()
 187  _file_ops_cache: dict = {}
 188  
 189  # Track files read per task to detect re-read loops and deduplicate reads.
 190  # Per task_id we store:
 191  #   "last_key":     the key of the most recent read/search call (or None)
 192  #   "consecutive":  how many times that exact call has been repeated in a row
 193  #   "read_history": set of (path, offset, limit) tuples for get_read_files_summary
 194  #   "dedup":        dict mapping (resolved_path, offset, limit) → mtime float
 195  #                   Used to skip re-reads of unchanged files.  Reset on
 196  #                   context compression (the original content is summarised
 197  #                   away so the model needs the full content again).
 198  #   "read_timestamps": dict mapping resolved_path → modification-time float
 199  #                      recorded when the file was last read (or written) by
 200  #                      this task.  Used by write_file and patch to detect
 201  #                      external changes between the agent's read and write.
 202  #                      Updated after successful writes so consecutive edits
 203  #                      by the same task don't trigger false warnings.
 204  _read_tracker_lock = threading.Lock()
 205  _read_tracker: dict = {}
 206  
 207  # Per-task bounds for the containers inside each _read_tracker[task_id].
 208  # A CLI session uses one stable task_id for its lifetime; without these
 209  # caps, a 10k-read session would accumulate ~1.5MB of dict/set state that
 210  # is never referenced again (only the most recent reads matter for dedup,
 211  # loop detection, and external-edit warnings).  Hard caps bound the
 212  # accretion to a few hundred KB regardless of session length.
 213  _READ_HISTORY_CAP = 500       # set; used only by get_read_files_summary
 214  _DEDUP_CAP = 1000             # dict; skip-identical-reread guard
 215  _READ_TIMESTAMPS_CAP = 1000   # dict; external-edit detection for write/patch
 216  _READ_DEDUP_STATUS_MESSAGE = (
 217      "File unchanged since last read. The content from "
 218      "the earlier read_file result in this conversation is "
 219      "still current — refer to that instead of re-reading."
 220  )
 221  
 222  
 223  def _cap_read_tracker_data(task_data: dict) -> None:
 224      """Enforce size caps on the per-task read-tracker sub-containers.
 225  
 226      Must be called with ``_read_tracker_lock`` held.  Eviction policy:
 227  
 228        * ``read_history`` (set): pop arbitrary entries on overflow.  This
 229          is fine because the set only feeds diagnostic summaries; losing
 230          old entries just trims the summary's tail.
 231        * ``dedup`` / ``read_timestamps`` (dict): pop oldest by insertion
 232          order (Python 3.7+ dicts).  Evicted entries lose their dedup
 233          skip on a future re-read (the file gets re-sent once) and
 234          external-edit mtime comparison (the write/patch falls back to
 235          a non-mtime check).  Both are graceful degradations, not bugs.
 236      """
 237      rh = task_data.get("read_history")
 238      if rh is not None and len(rh) > _READ_HISTORY_CAP:
 239          excess = len(rh) - _READ_HISTORY_CAP
 240          for _ in range(excess):
 241              try:
 242                  rh.pop()
 243              except KeyError:
 244                  break
 245  
 246      dedup = task_data.get("dedup")
 247      if dedup is not None and len(dedup) > _DEDUP_CAP:
 248          excess = len(dedup) - _DEDUP_CAP
 249          for _ in range(excess):
 250              try:
 251                  dedup.pop(next(iter(dedup)))
 252              except (StopIteration, KeyError):
 253                  break
 254  
 255      dedup_hits = task_data.get("dedup_hits")
 256      if dedup_hits is not None and len(dedup_hits) > _DEDUP_CAP:
 257          excess = len(dedup_hits) - _DEDUP_CAP
 258          for _ in range(excess):
 259              try:
 260                  dedup_hits.pop(next(iter(dedup_hits)))
 261              except (StopIteration, KeyError):
 262                  break
 263  
 264      ts = task_data.get("read_timestamps")
 265      if ts is not None and len(ts) > _READ_TIMESTAMPS_CAP:
 266          excess = len(ts) - _READ_TIMESTAMPS_CAP
 267          for _ in range(excess):
 268              try:
 269                  ts.pop(next(iter(ts)))
 270              except (StopIteration, KeyError):
 271                  break
 272  
 273  
 274  def _is_internal_file_status_text(content: str) -> bool:
 275      """Return True when content looks like an internal file-tool status, not real file bytes.
 276  
 277      The read_file dedup status message must never be persisted as file
 278      content.  The obvious shape is the model echoing the message verbatim,
 279      but in practice it also wraps it with small framing text (a leading
 280      "Note:", a trailing newline + short comment, etc.) before calling
 281      write_file.  We treat any short-ish write whose body is dominated by
 282      the status message as the same class of corruption.
 283  
 284      Heuristic:
 285        * Strict equality (after strip) — the verbatim shape.
 286        * OR the stripped content contains the full status message AND is
 287          short enough that the status dominates it (<=2x the message length).
 288          Short, status-dominated writes can't plausibly be real files —
 289          legitimate docs/notes that happen to quote this internal message
 290          are always dramatically longer.
 291      """
 292      if not isinstance(content, str):
 293          return False
 294      stripped = content.strip()
 295      if not stripped:
 296          return False
 297      if stripped == _READ_DEDUP_STATUS_MESSAGE:
 298          return True
 299      if _READ_DEDUP_STATUS_MESSAGE in stripped and \
 300              len(stripped) <= 2 * len(_READ_DEDUP_STATUS_MESSAGE):
 301          return True
 302      return False
 303  
 304  
 305  def _get_file_ops(task_id: str = "default") -> ShellFileOperations:
 306      """Get or create ShellFileOperations for a terminal environment.
 307  
 308      Respects the TERMINAL_ENV setting -- if the task_id doesn't have an
 309      environment yet, creates one using the configured backend (local, docker,
 310      modal, etc.) rather than always defaulting to local.
 311  
 312      Thread-safe: uses the same per-task creation locks as terminal_tool to
 313      prevent duplicate sandbox creation from concurrent tool calls.
 314  
 315      Note: subagent task_ids are collapsed to "default" via
 316      ``_resolve_container_task_id`` so delegate_task children share the
 317      parent's container and its cached file_ops. RL/benchmark task_ids with
 318      a registered env override keep their isolation.
 319      """
 320      from tools.terminal_tool import (
 321          _active_environments, _env_lock, _create_environment,
 322          _get_env_config, _last_activity, _start_cleanup_thread,
 323          _creation_locks,
 324          _creation_locks_lock,
 325          _resolve_container_task_id,
 326      )
 327      import time
 328  
 329      task_id = _resolve_container_task_id(task_id)
 330  
 331      # Fast path: check cache -- but also verify the underlying environment
 332      # is still alive (it may have been killed by the cleanup thread).
 333      with _file_ops_lock:
 334          cached = _file_ops_cache.get(task_id)
 335      if cached is not None:
 336          with _env_lock:
 337              if task_id in _active_environments:
 338                  _last_activity[task_id] = time.time()
 339                  return cached
 340              else:
 341                  # Environment was cleaned up -- invalidate stale cache entry
 342                  with _file_ops_lock:
 343                      _file_ops_cache.pop(task_id, None)
 344  
 345      # Need to ensure the environment exists before building file_ops.
 346      # Acquire per-task lock so only one thread creates the sandbox.
 347      with _creation_locks_lock:
 348          if task_id not in _creation_locks:
 349              _creation_locks[task_id] = threading.Lock()
 350          task_lock = _creation_locks[task_id]
 351  
 352      with task_lock:
 353          # Double-check: another thread may have created it while we waited
 354          with _env_lock:
 355              if task_id in _active_environments:
 356                  _last_activity[task_id] = time.time()
 357                  terminal_env = _active_environments[task_id]
 358              else:
 359                  terminal_env = None
 360  
 361          if terminal_env is None:
 362              from tools.terminal_tool import _task_env_overrides
 363  
 364              config = _get_env_config()
 365              env_type = config["env_type"]
 366              overrides = _task_env_overrides.get(task_id, {})
 367  
 368              if env_type == "docker":
 369                  image = overrides.get("docker_image") or config["docker_image"]
 370              elif env_type == "singularity":
 371                  image = overrides.get("singularity_image") or config["singularity_image"]
 372              elif env_type == "modal":
 373                  image = overrides.get("modal_image") or config["modal_image"]
 374              elif env_type == "daytona":
 375                  image = overrides.get("daytona_image") or config["daytona_image"]
 376              else:
 377                  image = ""
 378  
 379              cwd = overrides.get("cwd") or config["cwd"]
 380              logger.info("Creating new %s environment for task %s...", env_type, task_id[:8])
 381  
 382              container_config = None
 383              if env_type in ("docker", "singularity", "modal", "daytona", "vercel_sandbox"):
 384                  container_config = {
 385                      "container_cpu": config.get("container_cpu", 1),
 386                      "container_memory": config.get("container_memory", 5120),
 387                      "container_disk": config.get("container_disk", 51200),
 388                      "container_persistent": config.get("container_persistent", True),
 389                      "vercel_runtime": config.get("vercel_runtime", ""),
 390                      "docker_volumes": config.get("docker_volumes", []),
 391                      "docker_mount_cwd_to_workspace": config.get("docker_mount_cwd_to_workspace", False),
 392                      "docker_forward_env": config.get("docker_forward_env", []),
 393                      "docker_run_as_host_user": config.get("docker_run_as_host_user", False),
 394                  }
 395  
 396              ssh_config = None
 397              if env_type == "ssh":
 398                  ssh_config = {
 399                      "host": config.get("ssh_host", ""),
 400                      "user": config.get("ssh_user", ""),
 401                      "port": config.get("ssh_port", 22),
 402                      "key": config.get("ssh_key", ""),
 403                      "persistent": config.get("ssh_persistent", False),
 404                  }
 405  
 406              local_config = None
 407              if env_type == "local":
 408                  local_config = {
 409                      "persistent": config.get("local_persistent", False),
 410                  }
 411  
 412              terminal_env = _create_environment(
 413                  env_type=env_type,
 414                  image=image,
 415                  cwd=cwd,
 416                  timeout=config["timeout"],
 417                  ssh_config=ssh_config,
 418                  container_config=container_config,
 419                  local_config=local_config,
 420                  task_id=task_id,
 421                  host_cwd=config.get("host_cwd"),
 422              )
 423  
 424              with _env_lock:
 425                  _active_environments[task_id] = terminal_env
 426                  _last_activity[task_id] = time.time()
 427  
 428              _start_cleanup_thread()
 429              logger.info("%s environment ready for task %s", env_type, task_id[:8])
 430  
 431      # Build file_ops from the (guaranteed live) environment and cache it
 432      file_ops = ShellFileOperations(terminal_env)
 433      with _file_ops_lock:
 434          _file_ops_cache[task_id] = file_ops
 435      return file_ops
 436  
 437  
 438  def clear_file_ops_cache(task_id: str = None):
 439      """Clear the file operations cache."""
 440      with _file_ops_lock:
 441          if task_id:
 442              _file_ops_cache.pop(task_id, None)
 443          else:
 444              _file_ops_cache.clear()
 445  
 446  
 447  def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
 448      """Read a file with pagination and line numbers."""
 449      try:
 450          offset, limit = normalize_read_pagination(offset, limit)
 451  
 452          # ── Device path guard ─────────────────────────────────────────
 453          # Block paths that would hang the process (infinite output,
 454          # blocking on input).  Pure path check — no I/O.
 455          if _is_blocked_device(path):
 456              return json.dumps({
 457                  "error": (
 458                      f"Cannot read '{path}': this is a device file that would "
 459                      "block or produce infinite output."
 460                  ),
 461              })
 462  
 463          _resolved = _resolve_path_for_task(path, task_id)
 464  
 465          # ── Binary file guard ─────────────────────────────────────────
 466          # Block binary files by extension (no I/O).
 467          if has_binary_extension(str(_resolved)):
 468              _ext = _resolved.suffix.lower()
 469              return json.dumps({
 470                  "error": (
 471                      f"Cannot read binary file '{path}' ({_ext}). "
 472                      "Use vision_analyze for images, or terminal to inspect binary files."
 473                  ),
 474              })
 475  
 476          # ── Hermes internal path guard ────────────────────────────────
 477          # Prevent prompt injection via catalog or hub metadata files.
 478          block_error = get_read_block_error(path)
 479          if block_error:
 480              return json.dumps({"error": block_error})
 481  
 482          # ── Dedup check ───────────────────────────────────────────────
 483          # If we already read this exact (path, offset, limit) and the
 484          # file hasn't been modified since, return a lightweight stub
 485          # instead of re-sending the same content.  Saves context tokens.
 486          resolved_str = str(_resolved)
 487          dedup_key = (resolved_str, offset, limit)
 488          with _read_tracker_lock:
 489              task_data = _read_tracker.setdefault(task_id, {
 490                  "last_key": None, "consecutive": 0,
 491                  "read_history": set(), "dedup": {},
 492                  "dedup_hits": {}, "read_timestamps": {},
 493              })
 494              # Backward-compat for pre-existing tracker entries that predate
 495              # dedup_hits/read_timestamps (long-lived task or crossed an
 496              # upgrade boundary).
 497              if "dedup_hits" not in task_data:
 498                  task_data["dedup_hits"] = {}
 499              if "read_timestamps" not in task_data:
 500                  task_data["read_timestamps"] = {}
 501              cached_mtime = task_data.get("dedup", {}).get(dedup_key)
 502  
 503          if cached_mtime is not None:
 504              try:
 505                  current_mtime = os.path.getmtime(resolved_str)
 506                  if current_mtime == cached_mtime:
 507                      # Count repeated stub returns so weak tool-followers that
 508                      # ignore the "refer to earlier result" hint don't burn
 509                      # their iteration budget in an infinite read loop.  After
 510                      # 2 stubs for the same key we escalate to a hard block
 511                      # mirroring the count>=4 path on real reads.
 512                      with _read_tracker_lock:
 513                          hits = task_data["dedup_hits"].get(dedup_key, 0) + 1
 514                          task_data["dedup_hits"][dedup_key] = hits
 515                          _cap_read_tracker_data(task_data)
 516  
 517                      if hits >= 2:
 518                          return json.dumps({
 519                              "error": (
 520                                  f"BLOCKED: You have called read_file on this "
 521                                  f"exact region {hits + 1} times and the file "
 522                                  "has NOT changed. STOP calling read_file for "
 523                                  "this path — the content from your earlier "
 524                                  "read_file result in this conversation is "
 525                                  "still current. Proceed with your task using "
 526                                  "the information you already have."
 527                              ),
 528                              "path": path,
 529                              "already_read": hits + 1,
 530                          }, ensure_ascii=False)
 531  
 532                      return json.dumps({
 533                          "status": "unchanged",
 534                          "message": _READ_DEDUP_STATUS_MESSAGE,
 535                          "path": path,
 536                          "dedup": True,
 537                          "content_returned": False,
 538                      }, ensure_ascii=False)
 539              except OSError:
 540                  pass  # stat failed — fall through to full read
 541  
 542          # ── Perform the read ──────────────────────────────────────────
 543          file_ops = _get_file_ops(task_id)
 544          result = file_ops.read_file(path, offset, limit)
 545          result_dict = result.to_dict()
 546  
 547          # ── Character-count guard ─────────────────────────────────────
 548          # We're model-agnostic so we can't count tokens; characters are
 549          # the best proxy we have.  If the read produced an unreasonable
 550          # amount of content, reject it and tell the model to narrow down.
 551          # Note: we check the formatted content (with line-number prefixes),
 552          # not the raw file size, because that's what actually enters context.
 553          # Check BEFORE redaction to avoid expensive regex on huge content.
 554          content_len = len(result.content or "")
 555          file_size = result_dict.get("file_size", 0)
 556          max_chars = _get_max_read_chars()
 557          if content_len > max_chars:
 558              total_lines = result_dict.get("total_lines", "unknown")
 559              return json.dumps({
 560                  "error": (
 561                      f"Read produced {content_len:,} characters which exceeds "
 562                      f"the safety limit ({max_chars:,} chars). "
 563                      "Use offset and limit to read a smaller range. "
 564                      f"The file has {total_lines} lines total."
 565                  ),
 566                  "path": path,
 567                  "total_lines": total_lines,
 568                  "file_size": file_size,
 569              }, ensure_ascii=False)
 570  
 571          # ── Redact secrets (after guard check to skip oversized content) ──
 572          if result.content:
 573              result.content = redact_sensitive_text(result.content, code_file=True)
 574              result_dict["content"] = result.content
 575  
 576          # Large-file hint: if the file is big and the caller didn't ask
 577          # for a narrow window, nudge toward targeted reads.
 578          if (file_size and file_size > _LARGE_FILE_HINT_BYTES
 579                  and limit > 200
 580                  and result_dict.get("truncated")):
 581              result_dict.setdefault("_hint", (
 582                  f"This file is large ({file_size:,} bytes). "
 583                  "Consider reading only the section you need with offset and limit "
 584                  "to keep context usage efficient."
 585              ))
 586  
 587          # ── Track for consecutive-loop detection ──────────────────────
 588          read_key = ("read", path, offset, limit)
 589          with _read_tracker_lock:
 590              # Ensure "dedup" / "dedup_hits" keys exist (backward compat with
 591              # old tracker state from pre-dedup-guard sessions).
 592              if "dedup" not in task_data:
 593                  task_data["dedup"] = {}
 594              if "dedup_hits" not in task_data:
 595                  task_data["dedup_hits"] = {}
 596              # Real read succeeded — this key is no longer in a stub-loop, so
 597              # reset its hit counter.  (File either changed or stat failed
 598              # earlier and we fell through.)
 599              task_data["dedup_hits"].pop(dedup_key, None)
 600              task_data["read_history"].add((path, offset, limit))
 601              if task_data["last_key"] == read_key:
 602                  task_data["consecutive"] += 1
 603              else:
 604                  task_data["last_key"] = read_key
 605                  task_data["consecutive"] = 1
 606              count = task_data["consecutive"]
 607  
 608              # Store mtime at read time for two purposes:
 609              # 1. Dedup: skip identical re-reads of unchanged files.
 610              # 2. Staleness: warn on write/patch if the file changed since
 611              #    the agent last read it (external edit, concurrent agent, etc.).
 612              try:
 613                  _mtime_now = os.path.getmtime(resolved_str)
 614                  task_data["dedup"][dedup_key] = _mtime_now
 615                  task_data.setdefault("read_timestamps", {})[resolved_str] = _mtime_now
 616              except OSError:
 617                  pass  # Can't stat — skip tracking for this entry
 618  
 619              # Bound the per-task containers so a long CLI session doesn't
 620              # accumulate megabytes of dict/set state.  See _cap_read_tracker_data.
 621              _cap_read_tracker_data(task_data)
 622  
 623          # Cross-agent file-state registry (separate from per-task read
 624          # tracker above): records that THIS agent has read this path so
 625          # write/patch can detect sibling-subagent writes that happened
 626          # after our read.  Partial read when offset>1 or the read was
 627          # truncated (large file with more content than limit covered).
 628          # Outside the _read_tracker_lock so the registry's own locking
 629          # isn't nested under ours.
 630          try:
 631              _partial = (offset > 1) or bool(result_dict.get("truncated"))
 632              file_state.record_read(task_id, resolved_str, partial=_partial)
 633          except Exception:
 634              logger.debug("file_state.record_read failed", exc_info=True)
 635  
 636          if count >= 4:
 637              # Hard block: stop returning content to break the loop
 638              return json.dumps({
 639                  "error": (
 640                      f"BLOCKED: You have read this exact file region {count} times in a row. "
 641                      "The content has NOT changed. You already have this information. "
 642                      "STOP re-reading and proceed with your task."
 643                  ),
 644                  "path": path,
 645                  "already_read": count,
 646              }, ensure_ascii=False)
 647          elif count >= 3:
 648              result_dict["_warning"] = (
 649                  f"You have read this exact file region {count} times consecutively. "
 650                  "The content has not changed since your last read. Use the information you already have. "
 651                  "If you are stuck in a loop, stop reading and proceed with writing or responding."
 652              )
 653  
 654          return json.dumps(result_dict, ensure_ascii=False)
 655      except Exception as e:
 656          return tool_error(str(e))
 657  
 658  
 659  
 660  
 661  def reset_file_dedup(task_id: str = None):
 662      """Clear the deduplication cache for file reads.
 663  
 664      Called after context compression — the original read content has been
 665      summarised away, so the model needs the full content if it reads the
 666      same file again.  Without this, reads after compression would return
 667      a "file unchanged" stub pointing at content that no longer exists in
 668      context.
 669  
 670      Call with a task_id to clear just that task, or without to clear all.
 671      """
 672      with _read_tracker_lock:
 673          if task_id:
 674              task_data = _read_tracker.get(task_id)
 675              if task_data:
 676                  if "dedup" in task_data:
 677                      task_data["dedup"].clear()
 678                  if "dedup_hits" in task_data:
 679                      task_data["dedup_hits"].clear()
 680          else:
 681              for task_data in _read_tracker.values():
 682                  if "dedup" in task_data:
 683                      task_data["dedup"].clear()
 684                  if "dedup_hits" in task_data:
 685                      task_data["dedup_hits"].clear()
 686  
 687  
 688  def notify_other_tool_call(task_id: str = "default"):
 689      """Reset consecutive read/search counter for a task.
 690  
 691      Called by the tool dispatcher (model_tools.py) whenever a tool OTHER
 692      than read_file / search_files is executed.  This ensures we only warn
 693      or block on *truly consecutive* repeated reads — if the agent does
 694      anything else in between (write, patch, terminal, etc.) the counter
 695      resets and the next read is treated as fresh.
 696      """
 697      with _read_tracker_lock:
 698          task_data = _read_tracker.get(task_id)
 699          if task_data:
 700              task_data["last_key"] = None
 701              task_data["consecutive"] = 0
 702              # An intervening non-read tool call breaks any stub-loop in
 703              # progress, so clear per-key dedup hit counters too.
 704              if "dedup_hits" in task_data:
 705                  task_data["dedup_hits"].clear()
 706  
 707  
 708  def _invalidate_dedup_for_path(filepath: str, task_id: str) -> None:
 709      """Remove all dedup cache entries whose resolved path matches *filepath*.
 710  
 711      Called after write_file and patch so that a subsequent read_file on
 712      the same path always returns fresh content instead of a stale
 713      "File unchanged" stub.  The dedup cache keys are tuples of
 714      ``(resolved_path, offset, limit)``; we must evict **all** offset/limit
 715      combinations for the written path because any cached range could now
 716      be stale.
 717  
 718      Must be called with ``_read_tracker_lock`` **not** held — acquires it
 719      internally.
 720      """
 721      try:
 722          resolved = str(_resolve_path(filepath))
 723      except (OSError, ValueError):
 724          return
 725      with _read_tracker_lock:
 726          task_data = _read_tracker.get(task_id)
 727          if task_data is None:
 728              return
 729          dedup = task_data.get("dedup")
 730          if not dedup:
 731              return
 732          # Collect keys to remove (can't mutate dict during iteration).
 733          stale_keys = [k for k in dedup if k[0] == resolved]
 734          for k in stale_keys:
 735              del dedup[k]
 736  
 737  
 738  def _update_read_timestamp(filepath: str, task_id: str) -> None:
 739      """Record the file's current modification time after a successful write.
 740  
 741      Called after write_file and patch so that consecutive edits by the
 742      same task don't trigger false staleness warnings — each write
 743      refreshes the stored timestamp to match the file's new state.
 744  
 745      Also invalidates the dedup cache for the written path so that
 746      subsequent reads return fresh content (fixes #13144).
 747      """
 748      # Invalidate dedup first (before acquiring lock for timestamp update).
 749      _invalidate_dedup_for_path(filepath, task_id)
 750      try:
 751          resolved = str(_resolve_path_for_task(filepath, task_id))
 752          current_mtime = os.path.getmtime(resolved)
 753      except (OSError, ValueError):
 754          return
 755      with _read_tracker_lock:
 756          task_data = _read_tracker.get(task_id)
 757          if task_data is not None:
 758              task_data.setdefault("read_timestamps", {})[resolved] = current_mtime
 759              _cap_read_tracker_data(task_data)
 760  
 761  
 762  def _check_file_staleness(filepath: str, task_id: str) -> str | None:
 763      """Check whether a file was modified since the agent last read it.
 764  
 765      Returns a warning string if the file is stale (mtime changed since
 766      the last read_file call for this task), or None if the file is fresh
 767      or was never read.  Does not block — the write still proceeds.
 768      """
 769      try:
 770          resolved = str(_resolve_path_for_task(filepath, task_id))
 771      except (OSError, ValueError):
 772          return None
 773      with _read_tracker_lock:
 774          task_data = _read_tracker.get(task_id)
 775          if not task_data:
 776              return None
 777          read_mtime = task_data.get("read_timestamps", {}).get(resolved)
 778      if read_mtime is None:
 779          return None  # File was never read — nothing to compare against
 780      try:
 781          current_mtime = os.path.getmtime(resolved)
 782      except OSError:
 783          return None  # Can't stat — file may have been deleted, let write handle it
 784      if current_mtime != read_mtime:
 785          return (
 786              f"Warning: {filepath} was modified since you last read it "
 787              "(external edit or concurrent agent). The content you read may be "
 788              "stale. Consider re-reading the file to verify before writing."
 789          )
 790      return None
 791  
 792  
 793  def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
 794      """Write content to a file."""
 795      sensitive_err = _check_sensitive_path(path, task_id)
 796      if sensitive_err:
 797          return tool_error(sensitive_err)
 798      if _is_internal_file_status_text(content):
 799          return tool_error(
 800              "Refusing to write internal read_file status text as file content. "
 801              "Re-read the file or reconstruct the intended file contents before writing."
 802          )
 803      try:
 804          # Resolve once for the registry lock + stale check.  Failures here
 805          # fall back to the legacy path — write proceeds, per-task staleness
 806          # check below still runs.
 807          try:
 808              _resolved = str(_resolve_path_for_task(path, task_id))
 809          except Exception:
 810              _resolved = None
 811  
 812          if _resolved is None:
 813              stale_warning = _check_file_staleness(path, task_id)
 814              file_ops = _get_file_ops(task_id)
 815              result = file_ops.write_file(path, content)
 816              result_dict = result.to_dict()
 817              if stale_warning:
 818                  result_dict["_warning"] = stale_warning
 819              _update_read_timestamp(path, task_id)
 820              return json.dumps(result_dict, ensure_ascii=False)
 821  
 822          # Serialize the read→modify→write region per-path so concurrent
 823          # subagents can't interleave on the same file.  Different paths
 824          # remain fully parallel.
 825          with file_state.lock_path(_resolved):
 826              # Cross-agent staleness wins over per-task warning when both
 827              # fire — its message names the sibling subagent.
 828              cross_warning = file_state.check_stale(task_id, _resolved)
 829              stale_warning = _check_file_staleness(path, task_id)
 830              file_ops = _get_file_ops(task_id)
 831              result = file_ops.write_file(path, content)
 832              result_dict = result.to_dict()
 833              effective_warning = cross_warning or stale_warning
 834              if effective_warning:
 835                  result_dict["_warning"] = effective_warning
 836              # Refresh stamps after the successful write so consecutive
 837              # writes by this task don't trigger false staleness warnings.
 838              _update_read_timestamp(path, task_id)
 839              if not result_dict.get("error"):
 840                  file_state.note_write(task_id, _resolved)
 841          return json.dumps(result_dict, ensure_ascii=False)
 842      except Exception as e:
 843          if _is_expected_write_exception(e):
 844              logger.debug("write_file expected denial: %s: %s", type(e).__name__, e)
 845          else:
 846              logger.error("write_file error: %s: %s", type(e).__name__, e, exc_info=True)
 847          return tool_error(str(e))
 848  
 849  
 850  def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
 851                 new_string: str = None, replace_all: bool = False, patch: str = None,
 852                 task_id: str = "default") -> str:
 853      """Patch a file using replace mode or V4A patch format."""
 854      # Check sensitive paths for both replace (explicit path) and V4A patch (extract paths)
 855      _paths_to_check = []
 856      if path:
 857          _paths_to_check.append(path)
 858      if mode == "patch" and patch:
 859          import re as _re
 860          for _m in _re.finditer(r'^\*\*\*\s+(?:Update|Add|Delete)\s+File:\s*(.+)$', patch, _re.MULTILINE):
 861              _paths_to_check.append(_m.group(1).strip())
 862      for _p in _paths_to_check:
 863          sensitive_err = _check_sensitive_path(_p, task_id)
 864          if sensitive_err:
 865              return tool_error(sensitive_err)
 866      try:
 867          # Resolve paths for locking.  Ordered + deduplicated so concurrent
 868          # callers lock in the same order — prevents deadlock on overlapping
 869          # multi-file V4A patches.
 870          _resolved_paths: list[str] = []
 871          _seen: set[str] = set()
 872          for _p in _paths_to_check:
 873              try:
 874                  _r = str(_resolve_path_for_task(_p, task_id))
 875              except Exception:
 876                  _r = None
 877              if _r and _r not in _seen:
 878                  _resolved_paths.append(_r)
 879                  _seen.add(_r)
 880          _resolved_paths.sort()
 881  
 882          # Acquire per-path locks in sorted order via ExitStack.  On single
 883          # path this degenerates to one lock; on empty list (unresolvable)
 884          # it's a no-op and execution falls through unchanged.
 885          from contextlib import ExitStack
 886          with ExitStack() as _locks:
 887              for _r in _resolved_paths:
 888                  _locks.enter_context(file_state.lock_path(_r))
 889  
 890              # Collect warnings — cross-agent registry first (names sibling),
 891              # then per-task tracker as a fallback.
 892              stale_warnings: list[str] = []
 893              _path_to_resolved: dict[str, str] = {}
 894              for _p in _paths_to_check:
 895                  try:
 896                      _r = str(_resolve_path_for_task(_p, task_id))
 897                  except Exception:
 898                      _r = None
 899                  _path_to_resolved[_p] = _r
 900                  _cross = file_state.check_stale(task_id, _r) if _r else None
 901                  _sw = _cross or _check_file_staleness(_p, task_id)
 902                  if _sw:
 903                      stale_warnings.append(_sw)
 904  
 905              file_ops = _get_file_ops(task_id)
 906  
 907              if mode == "replace":
 908                  if not path:
 909                      return tool_error("path required")
 910                  if old_string is None or new_string is None:
 911                      return tool_error("old_string and new_string required")
 912                  result = file_ops.patch_replace(path, old_string, new_string, replace_all)
 913              elif mode == "patch":
 914                  if not patch:
 915                      return tool_error("patch content required")
 916                  result = file_ops.patch_v4a(patch)
 917              else:
 918                  return tool_error(f"Unknown mode: {mode}")
 919  
 920              result_dict = result.to_dict()
 921              if stale_warnings:
 922                  result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings)
 923              # Refresh stored timestamps for all successfully-patched paths so
 924              # consecutive edits by this task don't trigger false warnings.
 925              if not result_dict.get("error"):
 926                  for _p in _paths_to_check:
 927                      _update_read_timestamp(_p, task_id)
 928                      _r = _path_to_resolved.get(_p)
 929                      if _r:
 930                          file_state.note_write(task_id, _r)
 931          # Hint when old_string not found — saves iterations where the agent
 932          # retries with stale content instead of re-reading the file.
 933          # Suppressed when patch_replace already attached a rich "Did you mean?"
 934          # snippet (which is strictly more useful than the generic hint).
 935          if result_dict.get("error") and "Could not find" in str(result_dict["error"]):
 936              if "Did you mean one of these sections?" not in str(result_dict["error"]):
 937                  result_dict["_hint"] = (
 938                      "old_string not found. Use read_file to verify the current "
 939                      "content, or search_files to locate the text."
 940                  )
 941          return json.dumps(result_dict, ensure_ascii=False)
 942      except Exception as e:
 943          return tool_error(str(e))
 944  
 945  
 946  def search_tool(pattern: str, target: str = "content", path: str = ".",
 947                  file_glob: str = None, limit: int = 50, offset: int = 0,
 948                  output_mode: str = "content", context: int = 0,
 949                  task_id: str = "default") -> str:
 950      """Search for content or files."""
 951      try:
 952          offset, limit = normalize_search_pagination(offset, limit)
 953  
 954          # Track searches to detect *consecutive* repeated search loops.
 955          # Include pagination args so users can page through truncated
 956          # results without tripping the repeated-search guard.
 957          search_key = (
 958              "search",
 959              pattern,
 960              target,
 961              str(path),
 962              file_glob or "",
 963              limit,
 964              offset,
 965          )
 966          with _read_tracker_lock:
 967              task_data = _read_tracker.setdefault(task_id, {
 968                  "last_key": None, "consecutive": 0, "read_history": set(),
 969              })
 970              if task_data["last_key"] == search_key:
 971                  task_data["consecutive"] += 1
 972              else:
 973                  task_data["last_key"] = search_key
 974                  task_data["consecutive"] = 1
 975              count = task_data["consecutive"]
 976  
 977          if count >= 4:
 978              return json.dumps({
 979                  "error": (
 980                      f"BLOCKED: You have run this exact search {count} times in a row. "
 981                      "The results have NOT changed. You already have this information. "
 982                      "STOP re-searching and proceed with your task."
 983                  ),
 984                  "pattern": pattern,
 985                  "already_searched": count,
 986              }, ensure_ascii=False)
 987  
 988          file_ops = _get_file_ops(task_id)
 989          result = file_ops.search(
 990              pattern=pattern, path=path, target=target, file_glob=file_glob,
 991              limit=limit, offset=offset, output_mode=output_mode, context=context
 992          )
 993          if hasattr(result, 'matches'):
 994              for m in result.matches:
 995                  if hasattr(m, 'content') and m.content:
 996                      m.content = redact_sensitive_text(m.content, code_file=True)
 997          result_dict = result.to_dict()
 998  
 999          if count >= 3:
1000              result_dict["_warning"] = (
1001                  f"You have run this exact search {count} times consecutively. "
1002                  "The results have not changed. Use the information you already have."
1003              )
1004  
1005          result_json = json.dumps(result_dict, ensure_ascii=False)
1006          # Hint when results were truncated — explicit next offset is clearer
1007          # than relying on the model to infer it from total_count vs match count.
1008          if result_dict.get("truncated"):
1009              next_offset = offset + limit
1010              result_json += f"\n\n[Hint: Results truncated. Use offset={next_offset} to see more, or narrow with a more specific pattern or file_glob.]"
1011          return result_json
1012      except Exception as e:
1013          return tool_error(str(e))
1014  
1015  
1016  
1017  
1018  # ---------------------------------------------------------------------------
1019  # Schemas + Registry
1020  # ---------------------------------------------------------------------------
1021  from tools.registry import registry, tool_error
1022  
1023  
1024  def _check_file_reqs():
1025      """Lazy wrapper to avoid circular import with tools/__init__.py."""
1026      from tools import check_file_requirements
1027      return check_file_requirements()
1028  
1029  READ_FILE_SCHEMA = {
1030      "name": "read_file",
1031      "description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
1032      "parameters": {
1033          "type": "object",
1034          "properties": {
1035              "path": {"type": "string", "description": "Path to the file to read (absolute, relative, or ~/path)"},
1036              "offset": {"type": "integer", "description": "Line number to start reading from (1-indexed, default: 1)", "default": 1, "minimum": 1},
1037              "limit": {"type": "integer", "description": "Maximum number of lines to read (default: 500, max: 2000)", "default": 500, "maximum": 2000}
1038          },
1039          "required": ["path"]
1040      }
1041  }
1042  
1043  WRITE_FILE_SCHEMA = {
1044      "name": "write_file",
1045      "description": "Write content to a file, completely replacing existing content. Use this instead of echo/cat heredoc in terminal. Creates parent directories automatically. OVERWRITES the entire file — use 'patch' for targeted edits.",
1046      "parameters": {
1047          "type": "object",
1048          "properties": {
1049              "path": {"type": "string", "description": "Path to the file to write (will be created if it doesn't exist, overwritten if it does)"},
1050              "content": {"type": "string", "description": "Complete content to write to the file"}
1051          },
1052          "required": ["path", "content"]
1053      }
1054  }
1055  
1056  PATCH_SCHEMA = {
1057      "name": "patch",
1058      "description": "Targeted find-and-replace edits in files. Use this instead of sed/awk in terminal. Uses fuzzy matching (9 strategies) so minor whitespace/indentation differences won't break it. Returns a unified diff. Auto-runs syntax checks after editing.\n\nReplace mode (default): find a unique string and replace it.\nPatch mode: apply V4A multi-file patches for bulk changes.",
1059      "parameters": {
1060          "type": "object",
1061          "properties": {
1062              "mode": {"type": "string", "enum": ["replace", "patch"], "description": "Edit mode: 'replace' for targeted find-and-replace, 'patch' for V4A multi-file patches", "default": "replace"},
1063              "path": {"type": "string", "description": "File path to edit (required for 'replace' mode)"},
1064              "old_string": {"type": "string", "description": "Text to find in the file (required for 'replace' mode). Must be unique in the file unless replace_all=true. Include enough surrounding context to ensure uniqueness."},
1065              "new_string": {"type": "string", "description": "Replacement text (required for 'replace' mode). Can be empty string to delete the matched text."},
1066              "replace_all": {"type": "boolean", "description": "Replace all occurrences instead of requiring a unique match (default: false)", "default": False},
1067              "patch": {"type": "string", "description": "V4A format patch content (required for 'patch' mode). Format:\n*** Begin Patch\n*** Update File: path/to/file\n@@ context hint @@\n context line\n-removed line\n+added line\n*** End Patch"}
1068          },
1069          "required": ["mode"]
1070      }
1071  }
1072  
1073  SEARCH_FILES_SCHEMA = {
1074      "name": "search_files",
1075      "description": "Search file contents or find files by name. Use this instead of grep/rg/find/ls in terminal. Ripgrep-backed, faster than shell equivalents.\n\nContent search (target='content'): Regex search inside files. Output modes: full matches with line numbers, file paths only, or match counts.\n\nFile search (target='files'): Find files by glob pattern (e.g., '*.py', '*config*'). Also use this instead of ls — results sorted by modification time.",
1076      "parameters": {
1077          "type": "object",
1078          "properties": {
1079              "pattern": {"type": "string", "description": "Regex pattern for content search, or glob pattern (e.g., '*.py') for file search"},
1080              "target": {"type": "string", "enum": ["content", "files"], "description": "'content' searches inside file contents, 'files' searches for files by name", "default": "content"},
1081              "path": {"type": "string", "description": "Directory or file to search in (default: current working directory)", "default": "."},
1082              "file_glob": {"type": "string", "description": "Filter files by pattern in grep mode (e.g., '*.py' to only search Python files)"},
1083              "limit": {"type": "integer", "description": "Maximum number of results to return (default: 50)", "default": 50},
1084              "offset": {"type": "integer", "description": "Skip first N results for pagination (default: 0)", "default": 0},
1085              "output_mode": {"type": "string", "enum": ["content", "files_only", "count"], "description": "Output format for grep mode: 'content' shows matching lines with line numbers, 'files_only' lists file paths, 'count' shows match counts per file", "default": "content"},
1086              "context": {"type": "integer", "description": "Number of context lines before and after each match (grep mode only)", "default": 0}
1087          },
1088          "required": ["pattern"]
1089      }
1090  }
1091  
1092  
1093  def _handle_read_file(args, **kw):
1094      tid = kw.get("task_id") or "default"
1095      return read_file_tool(path=args.get("path", ""), offset=args.get("offset", 1), limit=args.get("limit", 500), task_id=tid)
1096  
1097  
1098  def _handle_write_file(args, **kw):
1099      tid = kw.get("task_id") or "default"
1100      if not args.get("path") or not isinstance(args.get("path"), str):
1101          return tool_error(
1102              "write_file: missing required field 'path'. Re-emit the tool call with "
1103              "both 'path' and 'content' set."
1104          )
1105      if "content" not in args:
1106          return tool_error(
1107              "write_file: missing required field 'content'. The tool call included a "
1108              "path but no content argument — this is almost always a dropped-arg bug "
1109              "under context pressure. Re-emit the tool call with the full content "
1110              "payload, or use execute_code with hermes_tools.write_file() for very "
1111              "large files."
1112          )
1113      if not isinstance(args["content"], str):
1114          return tool_error(
1115              f"write_file: 'content' must be a string, got "
1116              f"{type(args['content']).__name__}."
1117          )
1118      return write_file_tool(path=args["path"], content=args["content"], task_id=tid)
1119  
1120  
1121  def _handle_patch(args, **kw):
1122      tid = kw.get("task_id") or "default"
1123      return patch_tool(
1124          mode=args.get("mode", "replace"), path=args.get("path"),
1125          old_string=args.get("old_string"), new_string=args.get("new_string"),
1126          replace_all=args.get("replace_all", False), patch=args.get("patch"), task_id=tid)
1127  
1128  
1129  def _handle_search_files(args, **kw):
1130      tid = kw.get("task_id") or "default"
1131      target_map = {"grep": "content", "find": "files"}
1132      raw_target = args.get("target", "content")
1133      target = target_map.get(raw_target, raw_target)
1134      return search_tool(
1135          pattern=args.get("pattern", ""), target=target, path=args.get("path", "."),
1136          file_glob=args.get("file_glob"), limit=args.get("limit", 50), offset=args.get("offset", 0),
1137          output_mode=args.get("output_mode", "content"), context=args.get("context", 0), task_id=tid)
1138  
1139  
1140  registry.register(name="read_file", toolset="file", schema=READ_FILE_SCHEMA, handler=_handle_read_file, check_fn=_check_file_reqs, emoji="📖", max_result_size_chars=100_000)
1141  registry.register(name="write_file", toolset="file", schema=WRITE_FILE_SCHEMA, handler=_handle_write_file, check_fn=_check_file_reqs, emoji="✍️", max_result_size_chars=100_000)
1142  registry.register(name="patch", toolset="file", schema=PATCH_SCHEMA, handler=_handle_patch, check_fn=_check_file_reqs, emoji="🔧", max_result_size_chars=100_000)
1143  registry.register(name="search_files", toolset="file", schema=SEARCH_FILES_SCHEMA, handler=_handle_search_files, check_fn=_check_file_reqs, emoji="🔎", max_result_size_chars=100_000)