/ tools / process_registry.py
process_registry.py
   1  """
   2  Process Registry -- In-memory registry for managed background processes.
   3  
   4  Tracks processes spawned via terminal(background=true), providing:
   5    - Output buffering (rolling 200KB window)
   6    - Status polling and log retrieval
   7    - Blocking wait with interrupt support
   8    - Process killing
   9    - Crash recovery via JSON checkpoint file
  10    - Session-scoped tracking for gateway reset protection
  11  
  12  Background processes execute THROUGH the environment interface -- nothing
  13  runs on the host machine unless TERMINAL_ENV=local. For Docker, Singularity,
  14  Modal, Daytona, and SSH backends, the command runs inside the sandbox.
  15  
  16  Usage:
  17      from tools.process_registry import process_registry
  18  
  19      # Spawn a background process (called from terminal_tool)
  20      session = process_registry.spawn(env, "pytest -v", task_id="task_123")
  21  
  22      # Poll for status
  23      result = process_registry.poll(session.id)
  24  
  25      # Block until done
  26      result = process_registry.wait(session.id, timeout=300)
  27  
  28      # Kill it
  29      process_registry.kill(session.id)
  30  """
  31  
  32  import json
  33  import logging
  34  import os
  35  import platform
  36  import shlex
  37  import signal
  38  import subprocess
  39  import threading
  40  import time
  41  import uuid
  42  
  43  _IS_WINDOWS = platform.system() == "Windows"
  44  from tools.environments.local import _find_shell, _sanitize_subprocess_env
  45  from dataclasses import dataclass, field
  46  from typing import Any, Dict, List, Optional
  47  
  48  from hermes_cli.config import get_hermes_home
  49  
  50  logger = logging.getLogger(__name__)
  51  
  52  
  53  # Checkpoint file for crash recovery (gateway only)
  54  CHECKPOINT_PATH = get_hermes_home() / "processes.json"
  55  
  56  # Limits
  57  MAX_OUTPUT_CHARS = 200_000      # 200KB rolling output buffer
  58  FINISHED_TTL_SECONDS = 1800     # Keep finished processes for 30 minutes
  59  MAX_PROCESSES = 64              # Max concurrent tracked processes (LRU pruning)
  60  
  61  # Watch pattern rate limiting — PER SESSION.
  62  # Hard rule: at most ONE watch-match notification every WATCH_MIN_INTERVAL_SECONDS.
  63  # Any match arriving inside that cooldown window is dropped and counted as a strike.
  64  # After WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns for that
  65  # session is permanently disabled and the session falls back to notify_on_complete
  66  # semantics (one notification when the process actually exits).
  67  WATCH_MIN_INTERVAL_SECONDS = 15   # Minimum spacing between consecutive watch matches
  68  WATCH_STRIKE_LIMIT = 3            # Strikes in a row → disable watch + promote to notify_on_complete
  69  
  70  # Global circuit breaker — across all sessions. Secondary safety net so concurrent
  71  # siblings can't collectively flood the user even when each is under its own cap.
  72  WATCH_GLOBAL_MAX_PER_WINDOW = 15
  73  WATCH_GLOBAL_WINDOW_SECONDS = 10
  74  WATCH_GLOBAL_COOLDOWN_SECONDS = 30
  75  
  76  
  77  def format_uptime_short(seconds: int) -> str:
  78      s = max(0, int(seconds))
  79      if s < 60:
  80          return f"{s}s"
  81      mins, secs = divmod(s, 60)
  82      if mins < 60:
  83          return f"{mins}m {secs}s"
  84      hours, mins = divmod(mins, 60)
  85      return f"{hours}h {mins}m"
  86  
  87  
  88  @dataclass
  89  class ProcessSession:
  90      """A tracked background process with output buffering."""
  91      id: str                                     # Unique session ID ("proc_xxxxxxxxxxxx")
  92      command: str                                 # Original command string
  93      task_id: str = ""                           # Task/sandbox isolation key
  94      session_key: str = ""                       # Gateway session key (for reset protection)
  95      pid: Optional[int] = None                   # OS process ID
  96      process: Optional[subprocess.Popen] = None  # Popen handle (local only)
  97      env_ref: Any = None                         # Reference to the environment object
  98      cwd: Optional[str] = None                   # Working directory
  99      started_at: float = 0.0                     # time.time() of spawn
 100      exited: bool = False                        # Whether the process has finished
 101      exit_code: Optional[int] = None             # Exit code (None if still running)
 102      output_buffer: str = ""                     # Rolling output (last MAX_OUTPUT_CHARS)
 103      max_output_chars: int = MAX_OUTPUT_CHARS
 104      detached: bool = False                      # True if recovered from crash (no pipe)
 105      pid_scope: str = "host"                     # "host" for local/PTY PIDs, "sandbox" for env-local PIDs
 106      # Watcher/notification metadata (persisted for crash recovery)
 107      watcher_platform: str = ""
 108      watcher_chat_id: str = ""
 109      watcher_user_id: str = ""
 110      watcher_user_name: str = ""
 111      watcher_thread_id: str = ""
 112      watcher_interval: int = 0                   # 0 = no watcher configured
 113      notify_on_complete: bool = False             # Queue agent notification on exit
 114      # Watch patterns — trigger agent notification when output matches any pattern
 115      watch_patterns: List[str] = field(default_factory=list)
 116      _watch_hits: int = field(default=0, repr=False)          # total matches delivered
 117      _watch_suppressed: int = field(default=0, repr=False)    # matches dropped by rate limit
 118      _watch_disabled: bool = field(default=False, repr=False) # permanently killed after strike limit
 119      # Per-session rate limit state: at most one match every WATCH_MIN_INTERVAL_SECONDS.
 120      # When an emission happens, _watch_cooldown_until is set to now + interval and
 121      # _watch_strike_candidate becomes True. The next match to arrive before that
 122      # deadline counts as one strike (regardless of how many matches were dropped in
 123      # between — a strike is a window, not a match). After WATCH_STRIKE_LIMIT strikes
 124      # in a row, watch_patterns is disabled and the session promotes to
 125      # notify_on_complete.
 126      _watch_last_emit_at: float = field(default=0.0, repr=False)
 127      _watch_cooldown_until: float = field(default=0.0, repr=False)
 128      _watch_strike_candidate: bool = field(default=False, repr=False)
 129      _watch_consecutive_strikes: int = field(default=0, repr=False)
 130      _lock: threading.Lock = field(default_factory=threading.Lock)
 131      _reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
 132      _pty: Any = field(default=None, repr=False)  # ptyprocess handle (when use_pty=True)
 133  
 134  
 135  class ProcessRegistry:
 136      """
 137      In-memory registry of running and finished background processes.
 138  
 139      Thread-safe. Accessed from:
 140        - Executor threads (terminal_tool, process tool handlers)
 141        - Gateway asyncio loop (watcher tasks, session reset checks)
 142        - Cleanup thread (sandbox reaping coordination)
 143      """
 144  
 145      _SHELL_NOISE_SUBSTRINGS = (
 146          "bash: cannot set terminal process group",
 147          "bash: no job control in this shell",
 148          "no job control in this shell",
 149          "cannot set terminal process group",
 150          "tcsetattr: Inappropriate ioctl for device",
 151      )
 152  
 153      def __init__(self):
 154          self._running: Dict[str, ProcessSession] = {}
 155          self._finished: Dict[str, ProcessSession] = {}
 156          self._lock = threading.Lock()
 157  
 158          # Side-channel for check_interval watchers (gateway reads after agent run)
 159          self.pending_watchers: List[Dict[str, Any]] = []
 160  
 161          # Notification queue — unified queue for all background process events.
 162          # Completion notifications (notify_on_complete) and watch pattern matches
 163          # both land here, distinguished by "type" field.  CLI process_loop and
 164          # gateway drain this after each agent turn to auto-trigger new turns.
 165          import queue as _queue_mod
 166          self.completion_queue: _queue_mod.Queue = _queue_mod.Queue()
 167  
 168          # Track sessions whose completion was already consumed by the agent
 169          # via wait/poll/log.  Drain loops skip notifications for these.
 170          self._completion_consumed: set = set()
 171  
 172          # Global watch-match circuit breaker — across all sessions.
 173          # Prevents sibling processes from collectively flooding the user even
 174          # when each stays under its own per-session cap.
 175          self._global_watch_lock = threading.Lock()
 176          self._global_watch_window_start: float = 0.0
 177          self._global_watch_window_hits: int = 0
 178          self._global_watch_tripped_until: float = 0.0
 179          self._global_watch_suppressed_during_trip: int = 0
 180  
 181      @staticmethod
 182      def _clean_shell_noise(text: str) -> str:
 183          """Strip shell startup warnings from the beginning of output."""
 184          lines = text.split("\n")
 185          while lines and any(noise in lines[0] for noise in ProcessRegistry._SHELL_NOISE_SUBSTRINGS):
 186              lines.pop(0)
 187          return "\n".join(lines)
 188  
 189      def _check_watch_patterns(self, session: ProcessSession, new_text: str) -> None:
 190          """Scan new output for watch patterns and queue notifications.
 191  
 192          Called from reader threads with new_text being the freshly-read chunk.
 193  
 194          Per-session rate limit: at most ONE watch-match notification per
 195          WATCH_MIN_INTERVAL_SECONDS. Any match arriving inside the cooldown
 196          window is dropped and counts as ONE strike for that window. After
 197          WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns is
 198          disabled for this session and the session is promoted to
 199          notify_on_complete semantics — one notification when the process
 200          actually exits, no more mid-process spam.
 201          """
 202          if not session.watch_patterns or session._watch_disabled:
 203              return
 204          # Suppress-after-exit: once the reader loop has declared the process
 205          # exited, any late chunk we still see is post-exit noise. Dropping these
 206          # prevents the "stale notifications delivered minutes after the process
 207          # ended" spam when completion_queue consumers run async.
 208          if session.exited:
 209              return
 210  
 211          # Scan new text line-by-line for pattern matches
 212          matched_lines = []
 213          matched_pattern = None
 214          for line in new_text.splitlines():
 215              for pat in session.watch_patterns:
 216                  if pat in line:
 217                      matched_lines.append(line.rstrip())
 218                      if matched_pattern is None:
 219                          matched_pattern = pat
 220                      break  # one match per line is enough
 221  
 222          if not matched_lines:
 223              return
 224  
 225          now = time.time()
 226          should_disable = False
 227          with session._lock:
 228              # Case 1: still inside the cooldown from the last emission.
 229              # Count this as a strike for the current window (only once per window)
 230              # and drop the event. If we've hit the strike limit, disable watch
 231              # and promote to notify_on_complete.
 232              if session._watch_cooldown_until and now < session._watch_cooldown_until:
 233                  session._watch_suppressed += len(matched_lines)
 234                  if not session._watch_strike_candidate:
 235                      # First drop in this window — count one strike.
 236                      session._watch_strike_candidate = True
 237                      session._watch_consecutive_strikes += 1
 238                      if session._watch_consecutive_strikes >= WATCH_STRIKE_LIMIT:
 239                          session._watch_disabled = True
 240                          # Promote to notify_on_complete so the agent still gets
 241                          # exactly one notification when the process actually ends.
 242                          session.notify_on_complete = True
 243                          should_disable = True
 244                  return_early = True
 245              else:
 246                  # Case 2: cooldown has expired.
 247                  # Decide whether this window was a "clean" one (no drops) or a
 248                  # strike window. If no strike candidate was set during the prior
 249                  # cooldown, reset the consecutive-strike counter — we're back to
 250                  # healthy emission cadence.
 251                  if (
 252                      session._watch_cooldown_until
 253                      and not session._watch_strike_candidate
 254                  ):
 255                      session._watch_consecutive_strikes = 0
 256                  session._watch_strike_candidate = False
 257  
 258                  # Emit the notification and start a new cooldown window.
 259                  session._watch_last_emit_at = now
 260                  session._watch_cooldown_until = now + WATCH_MIN_INTERVAL_SECONDS
 261                  session._watch_hits += 1
 262                  suppressed = session._watch_suppressed
 263                  session._watch_suppressed = 0
 264                  return_early = False
 265  
 266          if return_early:
 267              if should_disable:
 268                  # Emit exactly one "watch disabled, falling back to notify_on_complete"
 269                  # summary event so the agent/user sees why things went quiet.
 270                  self.completion_queue.put({
 271                      "session_id": session.id,
 272                      "session_key": session.session_key,
 273                      "command": session.command,
 274                      "type": "watch_disabled",
 275                      "suppressed": session._watch_suppressed,
 276                      "platform": session.watcher_platform,
 277                      "chat_id": session.watcher_chat_id,
 278                      "user_id": session.watcher_user_id,
 279                      "user_name": session.watcher_user_name,
 280                      "thread_id": session.watcher_thread_id,
 281                      "message": (
 282                          f"Watch patterns disabled for process {session.id} — "
 283                          f"{WATCH_STRIKE_LIMIT} consecutive rate-limit windows triggered "
 284                          f"(min spacing {WATCH_MIN_INTERVAL_SECONDS}s). "
 285                          f"Falling back to notify_on_complete semantics; you'll get "
 286                          f"exactly one notification when the process exits."
 287                      ),
 288                  })
 289              return
 290  
 291          # Trim matched output to a reasonable size
 292          output = "\n".join(matched_lines[:20])
 293          if len(output) > 2000:
 294              output = output[:2000] + "\n...(truncated)"
 295  
 296          # Global circuit breaker — across all sessions (secondary safety net).
 297          if not self._global_watch_admit(now):
 298              return
 299  
 300          self.completion_queue.put({
 301              "session_id": session.id,
 302              "session_key": session.session_key,
 303              "command": session.command,
 304              "type": "watch_match",
 305              "pattern": matched_pattern,
 306              "output": output,
 307              "suppressed": suppressed,
 308              "platform": session.watcher_platform,
 309              "chat_id": session.watcher_chat_id,
 310              "user_id": session.watcher_user_id,
 311              "user_name": session.watcher_user_name,
 312              "thread_id": session.watcher_thread_id,
 313          })
 314  
 315      def _global_watch_admit(self, now: float) -> bool:
 316          """Return True if this watch_match event is allowed through the global breaker.
 317  
 318          Semantics:
 319          - If we're currently in a cooldown period, drop the event and count it.
 320          - Otherwise, slide the rolling window and check the global cap.
 321          - If the cap is exceeded, trip the breaker for WATCH_GLOBAL_COOLDOWN_SECONDS
 322            and emit ONE summary event so the agent/user sees "N notifications were
 323            suppressed" instead of getting them individually.
 324          - When the cooldown ends, emit a release summary and reset counters.
 325          """
 326          with self._global_watch_lock:
 327              # Handle cooldown expiry first so we can emit the release summary.
 328              if self._global_watch_tripped_until and now >= self._global_watch_tripped_until:
 329                  suppressed = self._global_watch_suppressed_during_trip
 330                  self._global_watch_tripped_until = 0.0
 331                  self._global_watch_suppressed_during_trip = 0
 332                  self._global_watch_window_start = now
 333                  self._global_watch_window_hits = 0
 334                  if suppressed > 0:
 335                      # Queue a summary event outside the lock (below).
 336                      release_msg = {
 337                          "session_id": "",
 338                          "session_key": "",
 339                          "command": "",
 340                          "type": "watch_overflow_released",
 341                          "suppressed": suppressed,
 342                          "message": (
 343                              f"Watch-pattern notifications resumed. "
 344                              f"{suppressed} match event(s) were suppressed during the flood."
 345                          ),
 346                          "platform": "",
 347                          "chat_id": "",
 348                          "user_id": "",
 349                          "user_name": "",
 350                          "thread_id": "",
 351                      }
 352                  else:
 353                      release_msg = None
 354              else:
 355                  release_msg = None
 356  
 357              # Still in cooldown — drop and count.
 358              if self._global_watch_tripped_until and now < self._global_watch_tripped_until:
 359                  self._global_watch_suppressed_during_trip += 1
 360                  admit = False
 361                  trip_now = None
 362              else:
 363                  # Slide the window.
 364                  if now - self._global_watch_window_start >= WATCH_GLOBAL_WINDOW_SECONDS:
 365                      self._global_watch_window_start = now
 366                      self._global_watch_window_hits = 0
 367  
 368                  if self._global_watch_window_hits >= WATCH_GLOBAL_MAX_PER_WINDOW:
 369                      # Trip the breaker.
 370                      self._global_watch_tripped_until = now + WATCH_GLOBAL_COOLDOWN_SECONDS
 371                      self._global_watch_suppressed_during_trip += 1
 372                      trip_now = now
 373                      admit = False
 374                  else:
 375                      self._global_watch_window_hits += 1
 376                      trip_now = None
 377                      admit = True
 378  
 379          # Queue summary events outside the lock.
 380          if release_msg is not None:
 381              self.completion_queue.put(release_msg)
 382          if trip_now is not None:
 383              self.completion_queue.put({
 384                  "session_id": "",
 385                  "session_key": "",
 386                  "command": "",
 387                  "type": "watch_overflow_tripped",
 388                  "message": (
 389                      f"Watch-pattern overflow: >{WATCH_GLOBAL_MAX_PER_WINDOW} "
 390                      f"notifications in {WATCH_GLOBAL_WINDOW_SECONDS}s across all processes. "
 391                      f"Suppressing further watch_match events for "
 392                      f"{WATCH_GLOBAL_COOLDOWN_SECONDS}s."
 393                  ),
 394                  "platform": "",
 395                  "chat_id": "",
 396                  "user_id": "",
 397                  "user_name": "",
 398                  "thread_id": "",
 399              })
 400          return admit
 401  
 402      @staticmethod
 403      def _is_host_pid_alive(pid: Optional[int]) -> bool:
 404          """Best-effort liveness check for host-visible PIDs."""
 405          if not pid:
 406              return False
 407          try:
 408              os.kill(pid, 0)
 409              return True
 410          except (ProcessLookupError, PermissionError):
 411              return False
 412  
 413      def _refresh_detached_session(self, session: Optional[ProcessSession]) -> Optional[ProcessSession]:
 414          """Update recovered host-PID sessions when the underlying process has exited."""
 415          if session is None or session.exited or not session.detached or session.pid_scope != "host":
 416              return session
 417  
 418          if self._is_host_pid_alive(session.pid):
 419              return session
 420  
 421          with session._lock:
 422              if session.exited:
 423                  return session
 424              session.exited = True
 425              # Recovered sessions no longer have a waitable handle, so the real
 426              # exit code is unavailable once the original process object is gone.
 427              session.exit_code = None
 428  
 429          self._move_to_finished(session)
 430          return session
 431  
 432      @staticmethod
 433      def _terminate_host_pid(pid: int) -> None:
 434          """Terminate a host-visible PID without requiring the original process handle."""
 435          if _IS_WINDOWS:
 436              os.kill(pid, signal.SIGTERM)
 437              return
 438  
 439          try:
 440              os.killpg(os.getpgid(pid), signal.SIGTERM)
 441          except (OSError, ProcessLookupError, PermissionError):
 442              os.kill(pid, signal.SIGTERM)
 443  
 444      # ----- Spawn -----
 445  
 446      @staticmethod
 447      def _env_temp_dir(env: Any) -> str:
 448          """Return the writable sandbox temp dir for env-backed background tasks."""
 449          get_temp_dir = getattr(env, "get_temp_dir", None)
 450          if callable(get_temp_dir):
 451              try:
 452                  temp_dir = get_temp_dir()
 453                  if isinstance(temp_dir, str) and temp_dir.startswith("/"):
 454                      return temp_dir.rstrip("/") or "/"
 455              except Exception as exc:
 456                  logger.debug("Could not resolve environment temp dir: %s", exc)
 457          return "/tmp"
 458  
 459      def spawn_local(
 460          self,
 461          command: str,
 462          cwd: str = None,
 463          task_id: str = "",
 464          session_key: str = "",
 465          env_vars: dict = None,
 466          use_pty: bool = False,
 467      ) -> ProcessSession:
 468          """
 469          Spawn a background process locally.
 470  
 471          Only for TERMINAL_ENV=local. Other backends use spawn_via_env().
 472  
 473          Args:
 474              use_pty: If True, use a pseudo-terminal via ptyprocess for interactive
 475                       CLI tools (Codex, Claude Code, Python REPL). Falls back to
 476                       subprocess.Popen if ptyprocess is not installed.
 477          """
 478          session = ProcessSession(
 479              id=f"proc_{uuid.uuid4().hex[:12]}",
 480              command=command,
 481              task_id=task_id,
 482              session_key=session_key,
 483              cwd=cwd or os.getcwd(),
 484              started_at=time.time(),
 485          )
 486  
 487          if use_pty:
 488              # Try PTY mode for interactive CLI tools
 489              try:
 490                  if _IS_WINDOWS:
 491                      from winpty import PtyProcess as _PtyProcessCls
 492                  else:
 493                      from ptyprocess import PtyProcess as _PtyProcessCls
 494                  user_shell = _find_shell()
 495                  pty_env = _sanitize_subprocess_env(os.environ, env_vars)
 496                  pty_env["PYTHONUNBUFFERED"] = "1"
 497                  pty_proc = _PtyProcessCls.spawn(
 498                      [user_shell, "-lic", f"set +m; {command}"],
 499                      cwd=session.cwd,
 500                      env=pty_env,
 501                      dimensions=(30, 120),
 502                  )
 503                  session.pid = pty_proc.pid
 504                  # Store the pty handle on the session for read/write
 505                  session._pty = pty_proc
 506  
 507                  # PTY reader thread
 508                  reader = threading.Thread(
 509                      target=self._pty_reader_loop,
 510                      args=(session,),
 511                      daemon=True,
 512                      name=f"proc-pty-reader-{session.id}",
 513                  )
 514                  session._reader_thread = reader
 515                  reader.start()
 516  
 517                  with self._lock:
 518                      self._prune_if_needed()
 519                      self._running[session.id] = session
 520  
 521                  self._write_checkpoint()
 522                  return session
 523  
 524              except ImportError:
 525                  logger.warning("ptyprocess not installed, falling back to pipe mode")
 526              except Exception as e:
 527                  logger.warning("PTY spawn failed (%s), falling back to pipe mode", e)
 528  
 529          # Standard Popen path (non-PTY or PTY fallback)
 530          # Use the user's login shell for consistency with LocalEnvironment --
 531          # ensures rc files are sourced and user tools are available.
 532          user_shell = _find_shell()
 533          # Force unbuffered output for Python scripts so progress is visible
 534          # during background execution (libraries like tqdm/datasets buffer when
 535          # stdout is a pipe, hiding output from process(action="poll")).
 536          bg_env = _sanitize_subprocess_env(os.environ, env_vars)
 537          bg_env["PYTHONUNBUFFERED"] = "1"
 538          proc = subprocess.Popen(
 539              [user_shell, "-lic", f"set +m; {command}"],
 540              text=True,
 541              cwd=session.cwd,
 542              env=bg_env,
 543              encoding="utf-8",
 544              errors="replace",
 545              stdout=subprocess.PIPE,
 546              stderr=subprocess.STDOUT,
 547              stdin=subprocess.PIPE,
 548              preexec_fn=None if _IS_WINDOWS else os.setsid,
 549          )
 550  
 551          session.process = proc
 552          session.pid = proc.pid
 553  
 554          # Start output reader thread
 555          reader = threading.Thread(
 556              target=self._reader_loop,
 557              args=(session,),
 558              daemon=True,
 559              name=f"proc-reader-{session.id}",
 560          )
 561          session._reader_thread = reader
 562          reader.start()
 563  
 564          with self._lock:
 565              self._prune_if_needed()
 566              self._running[session.id] = session
 567  
 568          self._write_checkpoint()
 569          return session
 570  
 571      def spawn_via_env(
 572          self,
 573          env: Any,
 574          command: str,
 575          cwd: str = None,
 576          task_id: str = "",
 577          session_key: str = "",
 578          timeout: int = 10,
 579      ) -> ProcessSession:
 580          """
 581          Spawn a background process through a non-local environment backend.
 582  
 583          For Docker/Singularity/Modal/Daytona/SSH: runs the command inside the sandbox
 584          using the environment's execute() interface. We wrap the command to
 585          capture the in-sandbox PID and redirect output to a log file inside
 586          the sandbox, then poll the log via subsequent execute() calls.
 587  
 588          This is less capable than local spawn (no live stdout pipe, no stdin),
 589          but it ensures the command runs in the correct sandbox context.
 590          """
 591          session = ProcessSession(
 592              id=f"proc_{uuid.uuid4().hex[:12]}",
 593              command=command,
 594              task_id=task_id,
 595              session_key=session_key,
 596              cwd=cwd,
 597              started_at=time.time(),
 598              env_ref=env,
 599              pid_scope="sandbox",
 600          )
 601  
 602          # Run the command in the sandbox with output capture
 603          temp_dir = self._env_temp_dir(env)
 604          log_path = f"{temp_dir}/hermes_bg_{session.id}.log"
 605          pid_path = f"{temp_dir}/hermes_bg_{session.id}.pid"
 606          exit_path = f"{temp_dir}/hermes_bg_{session.id}.exit"
 607          quoted_command = shlex.quote(command)
 608          quoted_temp_dir = shlex.quote(temp_dir)
 609          quoted_log_path = shlex.quote(log_path)
 610          quoted_pid_path = shlex.quote(pid_path)
 611          quoted_exit_path = shlex.quote(exit_path)
 612          bg_command = (
 613              f"mkdir -p {quoted_temp_dir} && "
 614              f"( nohup bash -lc {quoted_command} > {quoted_log_path} 2>&1; "
 615              f"rc=$?; printf '%s\\n' \"$rc\" > {quoted_exit_path} ) & "
 616              f"echo $! > {quoted_pid_path} && cat {quoted_pid_path}"
 617          )
 618  
 619          try:
 620              result = env.execute(bg_command, timeout=timeout)
 621              output = result.get("output", "").strip()
 622              # Try to extract the PID from the output
 623              for line in output.splitlines():
 624                  line = line.strip()
 625                  if line.isdigit():
 626                      session.pid = int(line)
 627                      break
 628          except Exception as e:
 629              session.exited = True
 630              session.exit_code = -1
 631              session.output_buffer = f"Failed to start: {e}"
 632  
 633          if not session.exited:
 634              # Start a poller thread that periodically reads the log file
 635              reader = threading.Thread(
 636                  target=self._env_poller_loop,
 637                  args=(session, env, log_path, pid_path, exit_path),
 638                  daemon=True,
 639                  name=f"proc-poller-{session.id}",
 640              )
 641              session._reader_thread = reader
 642              reader.start()
 643  
 644          with self._lock:
 645              self._prune_if_needed()
 646              self._running[session.id] = session
 647  
 648          self._write_checkpoint()
 649          return session
 650  
 651      # ----- Reader / Poller Threads -----
 652  
 653      def _reader_loop(self, session: ProcessSession):
 654          """Background thread: read stdout from a local Popen process."""
 655          first_chunk = True
 656          try:
 657              while True:
 658                  chunk = session.process.stdout.read(4096)
 659                  if not chunk:
 660                      break
 661                  if first_chunk:
 662                      chunk = self._clean_shell_noise(chunk)
 663                      first_chunk = False
 664                  with session._lock:
 665                      session.output_buffer += chunk
 666                      if len(session.output_buffer) > session.max_output_chars:
 667                          session.output_buffer = session.output_buffer[-session.max_output_chars:]
 668                  self._check_watch_patterns(session, chunk)
 669          except Exception as e:
 670              logger.debug("Process stdout reader ended: %s", e)
 671          finally:
 672              # Always reap the child to prevent zombie processes.
 673              try:
 674                  session.process.wait(timeout=5)
 675              except Exception as e:
 676                  logger.debug("Process wait timed out or failed: %s", e)
 677              session.exited = True
 678              session.exit_code = session.process.returncode
 679              self._move_to_finished(session)
 680  
 681      def _env_poller_loop(
 682          self, session: ProcessSession, env: Any, log_path: str, pid_path: str, exit_path: str
 683      ):
 684          """Background thread: poll a sandbox log file for non-local backends."""
 685          quoted_log_path = shlex.quote(log_path)
 686          quoted_pid_path = shlex.quote(pid_path)
 687          quoted_exit_path = shlex.quote(exit_path)
 688          prev_output_len = 0  # track delta for watch pattern scanning
 689          while not session.exited:
 690              time.sleep(2)  # Poll every 2 seconds
 691              try:
 692                  # Read new output from the log file
 693                  result = env.execute(f"cat {quoted_log_path} 2>/dev/null", timeout=10)
 694                  new_output = result.get("output", "")
 695                  if new_output:
 696                      # Compute delta for watch pattern scanning
 697                      delta = new_output[prev_output_len:] if len(new_output) > prev_output_len else ""
 698                      prev_output_len = len(new_output)
 699                      with session._lock:
 700                          session.output_buffer = new_output
 701                          if len(session.output_buffer) > session.max_output_chars:
 702                              session.output_buffer = session.output_buffer[-session.max_output_chars:]
 703                      if delta:
 704                          self._check_watch_patterns(session, delta)
 705  
 706                  # Check if process is still running
 707                  check = env.execute(
 708                      f"kill -0 \"$(cat {quoted_pid_path} 2>/dev/null)\" 2>/dev/null; echo $?",
 709                      timeout=5,
 710                  )
 711                  check_output = check.get("output", "").strip()
 712                  if check_output and check_output.splitlines()[-1].strip() != "0":
 713                      # Process has exited -- get exit code captured by the wrapper shell.
 714                      exit_result = env.execute(
 715                          f"cat {quoted_exit_path} 2>/dev/null",
 716                          timeout=5,
 717                      )
 718                      exit_str = exit_result.get("output", "").strip()
 719                      try:
 720                          session.exit_code = int(exit_str.splitlines()[-1].strip())
 721                      except (ValueError, IndexError):
 722                          session.exit_code = -1
 723                      session.exited = True
 724                      self._move_to_finished(session)
 725                      return
 726  
 727              except Exception:
 728                  # Environment might be gone (sandbox reaped, etc.)
 729                  session.exited = True
 730                  session.exit_code = -1
 731                  self._move_to_finished(session)
 732                  return
 733  
 734      def _pty_reader_loop(self, session: ProcessSession):
 735          """Background thread: read output from a PTY process."""
 736          pty = session._pty
 737          try:
 738              while pty.isalive():
 739                  try:
 740                      chunk = pty.read(4096)
 741                      if chunk:
 742                          # ptyprocess returns bytes
 743                          text = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace")
 744                          with session._lock:
 745                              session.output_buffer += text
 746                              if len(session.output_buffer) > session.max_output_chars:
 747                                  session.output_buffer = session.output_buffer[-session.max_output_chars:]
 748                          self._check_watch_patterns(session, text)
 749                  except EOFError:
 750                      break
 751                  except Exception:
 752                      break
 753          except Exception as e:
 754              logger.debug("PTY stdout reader ended: %s", e)
 755  
 756          # Process exited
 757          try:
 758              pty.wait()
 759          except Exception as e:
 760              logger.debug("PTY wait timed out or failed: %s", e)
 761          session.exited = True
 762          session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1
 763          self._move_to_finished(session)
 764  
 765      def _move_to_finished(self, session: ProcessSession):
 766          """Move a session from running to finished.
 767  
 768          Idempotent: if the session was already moved (e.g. kill_process raced
 769          with the reader thread), the second call is a no-op — no duplicate
 770          completion notification is enqueued.
 771          """
 772          with self._lock:
 773              was_running = self._running.pop(session.id, None) is not None
 774              self._finished[session.id] = session
 775          self._write_checkpoint()
 776  
 777          # Only enqueue completion notification on the FIRST move.  Without
 778          # this guard, kill_process() and the reader thread can both call
 779          # _move_to_finished(), producing duplicate [IMPORTANT: ...] messages.
 780          if was_running and session.notify_on_complete:
 781              from tools.ansi_strip import strip_ansi
 782              output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else ""
 783              self.completion_queue.put({
 784                  "type": "completion",
 785                  "session_id": session.id,
 786                  "command": session.command,
 787                  "exit_code": session.exit_code,
 788                  "output": output_tail,
 789              })
 790  
 791      # ----- Query Methods -----
 792  
 793      def is_completion_consumed(self, session_id: str) -> bool:
 794          """Check if a completion notification was already consumed via wait/poll/log."""
 795          return session_id in self._completion_consumed
 796  
 797      def get(self, session_id: str) -> Optional[ProcessSession]:
 798          """Get a session by ID (running or finished)."""
 799          with self._lock:
 800              session = self._running.get(session_id) or self._finished.get(session_id)
 801          return self._refresh_detached_session(session)
 802  
 803      def _reconcile_local_exit(self, session: "ProcessSession") -> None:
 804          """Reconcile session.exited against the real child process state.
 805  
 806          The reader thread (`_reader_loop`) sets `session.exited = True` only
 807          in its `finally` block, which runs when `stdout.read()` returns EOF.
 808          If the direct `Popen` child has exited but a descendant process (e.g.
 809          a daemon spawned by `hermes update` restarting the gateway) is still
 810          holding the stdout pipe open, the reader blocks forever and poll()
 811          keeps returning "running" indefinitely (issue #17327 — 74 polls over
 812          7 minutes on Feishu).
 813  
 814          This helper closes that window: when `session.exited` is still False
 815          but the direct child's `Popen.poll()` reports an exit code, drain any
 816          readable bytes non-blocking and flip `session.exited`. The orphaned
 817          reader thread remains stuck on its blocking `read()` but is a daemon
 818          thread and will be reaped with the process.
 819  
 820          Safe no-op on sessions without a local `Popen` (env/PTY), already-
 821          exited sessions, and detached-recovered sessions.
 822          """
 823          if session is None or session.exited:
 824              return
 825          proc = getattr(session, "process", None)
 826          if proc is None:
 827              return
 828          try:
 829              rc = proc.poll()
 830          except Exception:
 831              return
 832          if rc is None:
 833              return  # Direct child still running — reader block is legitimate.
 834  
 835          # Direct child exited. Try to drain any bytes the reader hasn't
 836          # consumed yet. This is best-effort: if the pipe is held open by a
 837          # descendant, the non-blocking read returns what's immediately
 838          # available and we stop.
 839          drained = ""
 840          stdout = getattr(proc, "stdout", None)
 841          if stdout is not None and not _IS_WINDOWS:
 842              try:
 843                  import fcntl
 844                  fd = stdout.fileno()
 845                  flags = fcntl.fcntl(fd, fcntl.F_GETFL)
 846                  fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
 847                  try:
 848                      chunk = stdout.read()
 849                      if chunk:
 850                          drained = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace")
 851                  except (BlockingIOError, OSError, ValueError):
 852                      pass
 853                  finally:
 854                      try:
 855                          fcntl.fcntl(fd, fcntl.F_SETFL, flags)
 856                      except Exception:
 857                          pass
 858              except Exception as e:
 859                  logger.debug("Non-blocking drain failed for %s: %s", session.id, e)
 860  
 861          with session._lock:
 862              if drained:
 863                  session.output_buffer += drained
 864                  if len(session.output_buffer) > session.max_output_chars:
 865                      session.output_buffer = session.output_buffer[-session.max_output_chars:]
 866              session.exited = True
 867              session.exit_code = rc
 868          logger.info(
 869              "Reconciled session %s: direct child exited with code %s but reader "
 870              "was still blocked (orphaned pipe). Flipped to exited.",
 871              session.id, rc,
 872          )
 873          self._move_to_finished(session)
 874  
 875      def poll(self, session_id: str) -> dict:
 876          """Check status and get new output for a background process."""
 877          from tools.ansi_strip import strip_ansi
 878  
 879          session = self.get(session_id)
 880          if session is None:
 881              return {"status": "not_found", "error": f"No process with ID {session_id}"}
 882  
 883          # Reconcile against real child state before reading session.exited.
 884          # Guards against orphaned-pipe reader hangs (issue #17327).
 885          self._reconcile_local_exit(session)
 886  
 887          with session._lock:
 888              output_preview = strip_ansi(session.output_buffer[-1000:]) if session.output_buffer else ""
 889  
 890          result = {
 891              "session_id": session.id,
 892              "command": session.command,
 893              "status": "exited" if session.exited else "running",
 894              "pid": session.pid,
 895              "uptime_seconds": int(time.time() - session.started_at),
 896              "output_preview": output_preview,
 897          }
 898          if session.exited:
 899              result["exit_code"] = session.exit_code
 900              self._completion_consumed.add(session_id)
 901          if session.detached:
 902              result["detached"] = True
 903              result["note"] = "Process recovered after restart -- output history unavailable"
 904          return result
 905  
 906      def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict:
 907          """Read the full output log with optional pagination by lines."""
 908          from tools.ansi_strip import strip_ansi
 909  
 910          session = self.get(session_id)
 911          if session is None:
 912              return {"status": "not_found", "error": f"No process with ID {session_id}"}
 913  
 914          with session._lock:
 915              full_output = strip_ansi(session.output_buffer)
 916  
 917          lines = full_output.splitlines()
 918          total_lines = len(lines)
 919  
 920          # Default: last N lines
 921          if offset == 0 and limit > 0:
 922              selected = lines[-limit:]
 923          else:
 924              selected = lines[offset:offset + limit]
 925  
 926          result = {
 927              "session_id": session.id,
 928              "status": "exited" if session.exited else "running",
 929              "output": "\n".join(selected),
 930              "total_lines": total_lines,
 931              "showing": f"{len(selected)} lines",
 932          }
 933          if session.exited:
 934              self._completion_consumed.add(session_id)
 935          return result
 936  
 937      def wait(self, session_id: str, timeout: int = None) -> dict:
 938          """
 939          Block until a process exits, timeout, or interrupt.
 940  
 941          Args:
 942              session_id: The process to wait for.
 943              timeout: Max seconds to block. Falls back to TERMINAL_TIMEOUT config.
 944  
 945          Returns:
 946              dict with status ("exited", "timeout", "interrupted", "not_found")
 947              and output snapshot.
 948          """
 949          from tools.ansi_strip import strip_ansi
 950          from tools.interrupt import is_interrupted as _is_interrupted
 951  
 952          try:
 953              default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180"))
 954          except (ValueError, TypeError):
 955              default_timeout = 180
 956          max_timeout = default_timeout
 957          requested_timeout = timeout
 958          timeout_note = None
 959  
 960          if requested_timeout and requested_timeout > max_timeout:
 961              effective_timeout = max_timeout
 962              timeout_note = (
 963                  f"Requested wait of {requested_timeout}s was clamped "
 964                  f"to configured limit of {max_timeout}s"
 965              )
 966          else:
 967              effective_timeout = requested_timeout or max_timeout
 968  
 969          session = self.get(session_id)
 970          if session is None:
 971              return {"status": "not_found", "error": f"No process with ID {session_id}"}
 972  
 973          deadline = time.monotonic() + effective_timeout
 974  
 975          while time.monotonic() < deadline:
 976              session = self._refresh_detached_session(session)
 977              # Reconcile against real child state — guards against orphaned-
 978              # pipe reader hangs where the reader is blocked but the direct
 979              # child has already exited (issue #17327).
 980              self._reconcile_local_exit(session)
 981              if session.exited:
 982                  self._completion_consumed.add(session_id)
 983                  result = {
 984                      "status": "exited",
 985                      "exit_code": session.exit_code,
 986                      "output": strip_ansi(session.output_buffer[-2000:]),
 987                  }
 988                  if timeout_note:
 989                      result["timeout_note"] = timeout_note
 990                  return result
 991  
 992              if _is_interrupted():
 993                  result = {
 994                      "status": "interrupted",
 995                      "output": strip_ansi(session.output_buffer[-1000:]),
 996                      "note": "User sent a new message -- wait interrupted",
 997                  }
 998                  if timeout_note:
 999                      result["timeout_note"] = timeout_note
1000                  return result
1001  
1002              time.sleep(1)
1003  
1004          result = {
1005              "status": "timeout",
1006              "output": strip_ansi(session.output_buffer[-1000:]),
1007          }
1008          if timeout_note:
1009              result["timeout_note"] = timeout_note
1010          else:
1011              result["timeout_note"] = f"Waited {effective_timeout}s, process still running"
1012          return result
1013  
1014      def kill_process(self, session_id: str) -> dict:
1015          """Kill a background process."""
1016          session = self.get(session_id)
1017          if session is None:
1018              return {"status": "not_found", "error": f"No process with ID {session_id}"}
1019  
1020          if session.exited:
1021              return {
1022                  "status": "already_exited",
1023                  "exit_code": session.exit_code,
1024              }
1025  
1026          # Kill via PTY, Popen (local), or env execute (non-local)
1027          try:
1028              if session._pty:
1029                  # PTY process -- terminate via ptyprocess
1030                  try:
1031                      session._pty.terminate(force=True)
1032                  except Exception:
1033                      if session.pid:
1034                          os.kill(session.pid, signal.SIGTERM)
1035              elif session.process:
1036                  # Local process -- kill the process group
1037                  try:
1038                      if _IS_WINDOWS:
1039                          session.process.terminate()
1040                      else:
1041                          os.killpg(os.getpgid(session.process.pid), signal.SIGTERM)
1042                  except (ProcessLookupError, PermissionError):
1043                      session.process.kill()
1044              elif session.env_ref and session.pid:
1045                  # Non-local -- kill inside sandbox
1046                  session.env_ref.execute(f"kill {session.pid} 2>/dev/null", timeout=5)
1047              elif session.detached and session.pid_scope == "host" and session.pid:
1048                  if not self._is_host_pid_alive(session.pid):
1049                      with session._lock:
1050                          session.exited = True
1051                          session.exit_code = None
1052                      self._move_to_finished(session)
1053                      return {
1054                          "status": "already_exited",
1055                          "exit_code": session.exit_code,
1056                      }
1057                  self._terminate_host_pid(session.pid)
1058              else:
1059                  return {
1060                      "status": "error",
1061                      "error": (
1062                          "Recovered process cannot be killed after restart because "
1063                          "its original runtime handle is no longer available"
1064                      ),
1065                  }
1066              session.exited = True
1067              session.exit_code = -15  # SIGTERM
1068              self._move_to_finished(session)
1069              self._write_checkpoint()
1070              return {"status": "killed", "session_id": session.id}
1071          except Exception as e:
1072              return {"status": "error", "error": str(e)}
1073  
1074      def write_stdin(self, session_id: str, data: str) -> dict:
1075          """Send raw data to a running process's stdin (no newline appended)."""
1076          session = self.get(session_id)
1077          if session is None:
1078              return {"status": "not_found", "error": f"No process with ID {session_id}"}
1079          if session.exited:
1080              return {"status": "already_exited", "error": "Process has already finished"}
1081  
1082          # PTY mode -- write through pty handle (expects bytes)
1083          if hasattr(session, '_pty') and session._pty:
1084              try:
1085                  pty_data = data.encode("utf-8") if isinstance(data, str) else data
1086                  session._pty.write(pty_data)
1087                  return {"status": "ok", "bytes_written": len(data)}
1088              except Exception as e:
1089                  return {"status": "error", "error": str(e)}
1090  
1091          # Popen mode -- write through stdin pipe
1092          if not session.process or not session.process.stdin:
1093              return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"}
1094          try:
1095              session.process.stdin.write(data)
1096              session.process.stdin.flush()
1097              return {"status": "ok", "bytes_written": len(data)}
1098          except Exception as e:
1099              return {"status": "error", "error": str(e)}
1100  
1101      def submit_stdin(self, session_id: str, data: str = "") -> dict:
1102          """Send data + newline to a running process's stdin (like pressing Enter)."""
1103          return self.write_stdin(session_id, data + "\n")
1104  
1105      def close_stdin(self, session_id: str) -> dict:
1106          """Close a running process's stdin / send EOF without killing the process."""
1107          session = self.get(session_id)
1108          if session is None:
1109              return {"status": "not_found", "error": f"No process with ID {session_id}"}
1110          if session.exited:
1111              return {"status": "already_exited", "error": "Process has already finished"}
1112  
1113          if hasattr(session, '_pty') and session._pty:
1114              try:
1115                  session._pty.sendeof()
1116                  return {"status": "ok", "message": "EOF sent"}
1117              except Exception as e:
1118                  return {"status": "error", "error": str(e)}
1119  
1120          if not session.process or not session.process.stdin:
1121              return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"}
1122          try:
1123              session.process.stdin.close()
1124              return {"status": "ok", "message": "stdin closed"}
1125          except Exception as e:
1126              return {"status": "error", "error": str(e)}
1127  
1128      def list_sessions(self, task_id: str = None) -> list:
1129          """List all running and recently-finished processes."""
1130          with self._lock:
1131              all_sessions = list(self._running.values()) + list(self._finished.values())
1132  
1133          all_sessions = [self._refresh_detached_session(s) for s in all_sessions]
1134  
1135          if task_id:
1136              all_sessions = [s for s in all_sessions if s.task_id == task_id]
1137  
1138          result = []
1139          for s in all_sessions:
1140              entry = {
1141                  "session_id": s.id,
1142                  "command": s.command[:200],
1143                  "cwd": s.cwd,
1144                  "pid": s.pid,
1145                  "started_at": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(s.started_at)),
1146                  "uptime_seconds": int(time.time() - s.started_at),
1147                  "status": "exited" if s.exited else "running",
1148                  "output_preview": s.output_buffer[-200:] if s.output_buffer else "",
1149              }
1150              if s.exited:
1151                  entry["exit_code"] = s.exit_code
1152              if s.detached:
1153                  entry["detached"] = True
1154              result.append(entry)
1155          return result
1156  
1157      # ----- Session/Task Queries (for gateway integration) -----
1158  
1159      def has_active_processes(self, task_id: str) -> bool:
1160          """Check if there are active (running) processes for a task_id."""
1161          with self._lock:
1162              sessions = list(self._running.values())
1163  
1164          for session in sessions:
1165              self._refresh_detached_session(session)
1166  
1167          with self._lock:
1168              return any(
1169                  s.task_id == task_id and not s.exited
1170                  for s in self._running.values()
1171              )
1172  
1173      def has_active_for_session(self, session_key: str) -> bool:
1174          """Check if there are active processes for a gateway session key."""
1175          with self._lock:
1176              sessions = list(self._running.values())
1177  
1178          for session in sessions:
1179              self._refresh_detached_session(session)
1180  
1181          with self._lock:
1182              return any(
1183                  s.session_key == session_key and not s.exited
1184                  for s in self._running.values()
1185              )
1186  
1187      def kill_all(self, task_id: str = None) -> int:
1188          """Kill all running processes, optionally filtered by task_id. Returns count killed."""
1189          with self._lock:
1190              targets = [
1191                  s for s in self._running.values()
1192                  if (task_id is None or s.task_id == task_id) and not s.exited
1193              ]
1194  
1195          killed = 0
1196          for session in targets:
1197              result = self.kill_process(session.id)
1198              if result.get("status") in ("killed", "already_exited"):
1199                  killed += 1
1200          return killed
1201  
1202      # ----- Cleanup / Pruning -----
1203  
1204      def _prune_if_needed(self):
1205          """Remove oldest finished sessions if over MAX_PROCESSES. Must hold _lock."""
1206          # First prune expired finished sessions
1207          now = time.time()
1208          expired = [
1209              sid for sid, s in self._finished.items()
1210              if (now - s.started_at) > FINISHED_TTL_SECONDS
1211          ]
1212          for sid in expired:
1213              del self._finished[sid]
1214              self._completion_consumed.discard(sid)
1215  
1216          # If still over limit, remove oldest finished
1217          total = len(self._running) + len(self._finished)
1218          if total >= MAX_PROCESSES and self._finished:
1219              oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at)
1220              del self._finished[oldest_id]
1221              self._completion_consumed.discard(oldest_id)
1222  
1223          # Drop any _completion_consumed entries whose sessions are no longer
1224          # tracked at all — belt-and-suspenders against module-lifetime growth
1225          # on process-registry lookup paths that don't reach the dict prunes.
1226          tracked = self._running.keys() | self._finished.keys()
1227          stale = self._completion_consumed - tracked
1228          if stale:
1229              self._completion_consumed -= stale
1230  
1231      # ----- Checkpoint (crash recovery) -----
1232  
1233      def _write_checkpoint(self):
1234          """Write running process metadata to checkpoint file atomically."""
1235          try:
1236              with self._lock:
1237                  entries = []
1238                  for s in self._running.values():
1239                      if not s.exited:
1240                          entries.append({
1241                              "session_id": s.id,
1242                              "command": s.command,
1243                              "pid": s.pid,
1244                              "pid_scope": s.pid_scope,
1245                              "cwd": s.cwd,
1246                              "started_at": s.started_at,
1247                              "task_id": s.task_id,
1248                              "session_key": s.session_key,
1249                              "watcher_platform": s.watcher_platform,
1250                              "watcher_chat_id": s.watcher_chat_id,
1251                              "watcher_user_id": s.watcher_user_id,
1252                              "watcher_user_name": s.watcher_user_name,
1253                              "watcher_thread_id": s.watcher_thread_id,
1254                              "watcher_interval": s.watcher_interval,
1255                              "notify_on_complete": s.notify_on_complete,
1256                              "watch_patterns": s.watch_patterns,
1257                          })
1258              
1259              # Atomic write to avoid corruption on crash
1260              from utils import atomic_json_write
1261              atomic_json_write(CHECKPOINT_PATH, entries)
1262          except Exception as e:
1263              logger.debug("Failed to write checkpoint file: %s", e, exc_info=True)
1264  
1265      def recover_from_checkpoint(self) -> int:
1266          """
1267          On gateway startup, probe PIDs from checkpoint file.
1268  
1269          Returns the number of processes recovered as detached.
1270          """
1271          if not CHECKPOINT_PATH.exists():
1272              return 0
1273  
1274          try:
1275              entries = json.loads(CHECKPOINT_PATH.read_text(encoding="utf-8"))
1276          except Exception:
1277              return 0
1278  
1279          recovered = 0
1280          for entry in entries:
1281              pid = entry.get("pid")
1282              if not pid:
1283                  continue
1284  
1285              pid_scope = entry.get("pid_scope", "host")
1286              if pid_scope != "host":
1287                  # Sandbox-backed processes keep only in-sandbox PIDs in the
1288                  # checkpoint, which are not meaningful to the restarted host
1289                  # process once the original environment handle is gone.
1290                  logger.info(
1291                      "Skipping recovery for non-host process: %s (pid=%s, scope=%s)",
1292                      entry.get("command", "unknown")[:60],
1293                      pid,
1294                      pid_scope,
1295                  )
1296                  continue
1297  
1298              # Check if PID is still alive
1299              alive = self._is_host_pid_alive(pid)
1300  
1301              if alive:
1302                  session = ProcessSession(
1303                      id=entry["session_id"],
1304                      command=entry.get("command", "unknown"),
1305                      task_id=entry.get("task_id", ""),
1306                      session_key=entry.get("session_key", ""),
1307                      pid=pid,
1308                      pid_scope=pid_scope,
1309                      cwd=entry.get("cwd"),
1310                      started_at=entry.get("started_at", time.time()),
1311                      detached=True,  # Can't read output, but can report status + kill
1312                      watcher_platform=entry.get("watcher_platform", ""),
1313                      watcher_chat_id=entry.get("watcher_chat_id", ""),
1314                      watcher_user_id=entry.get("watcher_user_id", ""),
1315                      watcher_user_name=entry.get("watcher_user_name", ""),
1316                      watcher_thread_id=entry.get("watcher_thread_id", ""),
1317                      watcher_interval=entry.get("watcher_interval", 0),
1318                      notify_on_complete=entry.get("notify_on_complete", False),
1319                      watch_patterns=entry.get("watch_patterns", []),
1320                  )
1321                  with self._lock:
1322                      self._running[session.id] = session
1323                  recovered += 1
1324                  logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid)
1325  
1326                  # Re-enqueue watcher so gateway can resume notifications
1327                  if session.watcher_interval > 0:
1328                      self.pending_watchers.append({
1329                          "session_id": session.id,
1330                          "check_interval": session.watcher_interval,
1331                          "session_key": session.session_key,
1332                          "platform": session.watcher_platform,
1333                          "chat_id": session.watcher_chat_id,
1334                          "user_id": session.watcher_user_id,
1335                          "user_name": session.watcher_user_name,
1336                          "thread_id": session.watcher_thread_id,
1337                          "notify_on_complete": session.notify_on_complete,
1338                      })
1339  
1340          self._write_checkpoint()
1341  
1342          return recovered
1343  
1344  
1345  # Module-level singleton
1346  process_registry = ProcessRegistry()
1347  
1348  
1349  # ---------------------------------------------------------------------------
1350  # Registry -- the "process" tool schema + handler
1351  # ---------------------------------------------------------------------------
1352  from tools.registry import registry, tool_error
1353  
1354  PROCESS_SCHEMA = {
1355      "name": "process",
1356      "description": (
1357          "Manage background processes started with terminal(background=true). "
1358          "Actions: 'list' (show all), 'poll' (check status + new output), "
1359          "'log' (full output with pagination), 'wait' (block until done or timeout), "
1360          "'kill' (terminate), 'write' (send raw stdin data without newline), "
1361          "'submit' (send data + Enter, for answering prompts), 'close' (close stdin/send EOF)."
1362      ),
1363      "parameters": {
1364          "type": "object",
1365          "properties": {
1366              "action": {
1367                  "type": "string",
1368                  "enum": ["list", "poll", "log", "wait", "kill", "write", "submit", "close"],
1369                  "description": "Action to perform on background processes"
1370              },
1371              "session_id": {
1372                  "type": "string",
1373                  "description": "Process session ID (from terminal background output). Required for all actions except 'list'."
1374              },
1375              "data": {
1376                  "type": "string",
1377                  "description": "Text to send to process stdin (for 'write' and 'submit' actions)"
1378              },
1379              "timeout": {
1380                  "type": "integer",
1381                  "description": "Max seconds to block for 'wait' action. Returns partial output on timeout.",
1382                  "minimum": 1
1383              },
1384              "offset": {
1385                  "type": "integer",
1386                  "description": "Line offset for 'log' action (default: last 200 lines)"
1387              },
1388              "limit": {
1389                  "type": "integer",
1390                  "description": "Max lines to return for 'log' action",
1391                  "minimum": 1
1392              }
1393          },
1394          "required": ["action"]
1395      }
1396  }
1397  
1398  
1399  def _handle_process(args, **kw):
1400      task_id = kw.get("task_id")
1401      action = args.get("action", "")
1402      # Coerce to string — some models send session_id as an integer
1403      session_id = str(args.get("session_id", "")) if args.get("session_id") is not None else ""
1404  
1405      if action == "list":
1406          return json.dumps({"processes": process_registry.list_sessions(task_id=task_id)}, ensure_ascii=False)
1407      elif action in ("poll", "log", "wait", "kill", "write", "submit", "close"):
1408          if not session_id:
1409              return tool_error(f"session_id is required for {action}")
1410          if action == "poll":
1411              return json.dumps(process_registry.poll(session_id), ensure_ascii=False)
1412          elif action == "log":
1413              return json.dumps(process_registry.read_log(
1414                  session_id, offset=args.get("offset", 0), limit=args.get("limit", 200)), ensure_ascii=False)
1415          elif action == "wait":
1416              return json.dumps(process_registry.wait(session_id, timeout=args.get("timeout")), ensure_ascii=False)
1417          elif action == "kill":
1418              return json.dumps(process_registry.kill_process(session_id), ensure_ascii=False)
1419          elif action == "write":
1420              return json.dumps(process_registry.write_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False)
1421          elif action == "submit":
1422              return json.dumps(process_registry.submit_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False)
1423          elif action == "close":
1424              return json.dumps(process_registry.close_stdin(session_id), ensure_ascii=False)
1425      return tool_error(f"Unknown process action: {action}. Use: list, poll, log, wait, kill, write, submit, close")
1426  
1427  
1428  registry.register(
1429      name="process",
1430      toolset="terminal",
1431      schema=PROCESS_SCHEMA,
1432      handler=_handle_process,
1433      emoji="⚙️",
1434  )