/ tools / environments / base.py
base.py
  1  """Base class for all Hermes execution environment backends.
  2  
  3  Unified spawn-per-call model: every command spawns a fresh ``bash -c`` process.
  4  A session snapshot (env vars, functions, aliases) is captured once at init and
  5  re-sourced before each command. CWD persists via in-band stdout markers (remote)
  6  or a temp file (local).
  7  """
  8  
  9  import codecs
 10  import json
 11  import logging
 12  import os
 13  import select
 14  import shlex
 15  import subprocess
 16  import threading
 17  import time
 18  import uuid
 19  from abc import ABC, abstractmethod
 20  from pathlib import Path
 21  from typing import IO, Callable, Protocol
 22  
 23  from hermes_constants import get_hermes_home
 24  from tools.interrupt import is_interrupted
 25  
 26  logger = logging.getLogger(__name__)
 27  
 28  # Opt-in debug tracing for the interrupt/activity/poll machinery.  Set
 29  # HERMES_DEBUG_INTERRUPT=1 to log loop entry/exit, periodic heartbeats, and
 30  # every is_interrupted() state change from _wait_for_process.  Off by default
 31  # to avoid flooding production gateway logs.
 32  _DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT"))
 33  
 34  if _DEBUG_INTERRUPT:
 35      # AIAgent's quiet_mode path (run_agent.py) forces the `tools` logger to
 36      # ERROR on CLI startup, which would silently swallow every trace we emit.
 37      # Force this module's own logger back to INFO so the trace is visible in
 38      # agent.log regardless of quiet-mode.  Scoped to the opt-in case only.
 39      logger.setLevel(logging.INFO)
 40  
 41  # Thread-local activity callback.  The agent sets this before a tool call so
 42  # long-running _wait_for_process loops can report liveness to the gateway.
 43  _activity_callback_local = threading.local()
 44  
 45  
 46  def set_activity_callback(cb: Callable[[str], None] | None) -> None:
 47      """Register a callback that _wait_for_process fires periodically."""
 48      _activity_callback_local.callback = cb
 49  
 50  
 51  def _get_activity_callback() -> Callable[[str], None] | None:
 52      return getattr(_activity_callback_local, "callback", None)
 53  
 54  
 55  def touch_activity_if_due(
 56      state: dict,
 57      label: str,
 58  ) -> None:
 59      """Fire the activity callback at most once every ``state['interval']`` seconds.
 60  
 61      *state* must contain ``last_touch`` (monotonic timestamp) and ``start``
 62      (monotonic timestamp of the operation start).  An optional ``interval``
 63      key overrides the default 10 s cadence.
 64  
 65      Swallows all exceptions so callers don't need their own try/except.
 66      """
 67      now = time.monotonic()
 68      interval = state.get("interval", 10.0)
 69      if now - state["last_touch"] < interval:
 70          return
 71      state["last_touch"] = now
 72      try:
 73          cb = _get_activity_callback()
 74          if cb:
 75              elapsed = int(now - state["start"])
 76              cb(f"{label} ({elapsed}s elapsed)")
 77      except Exception:
 78          pass
 79  
 80  
 81  def get_sandbox_dir() -> Path:
 82      """Return the host-side root for all sandbox storage (Docker workspaces,
 83      Singularity overlays/SIF cache, etc.).
 84  
 85      Configurable via TERMINAL_SANDBOX_DIR. Defaults to {HERMES_HOME}/sandboxes/.
 86      """
 87      custom = os.getenv("TERMINAL_SANDBOX_DIR")
 88      if custom:
 89          p = Path(custom)
 90      else:
 91          p = get_hermes_home() / "sandboxes"
 92      p.mkdir(parents=True, exist_ok=True)
 93      return p
 94  
 95  
 96  # ---------------------------------------------------------------------------
 97  # Shared constants and utilities
 98  # ---------------------------------------------------------------------------
 99  
100  
101  def _pipe_stdin(proc: subprocess.Popen, data: str) -> None:
102      """Write *data* to proc.stdin on a daemon thread to avoid pipe-buffer deadlocks."""
103  
104      def _write():
105          try:
106              proc.stdin.write(data)
107              proc.stdin.close()
108          except (BrokenPipeError, OSError):
109              pass
110  
111      threading.Thread(target=_write, daemon=True).start()
112  
113  
114  def _popen_bash(
115      cmd: list[str], stdin_data: str | None = None, **kwargs
116  ) -> subprocess.Popen:
117      """Spawn a subprocess with standard stdout/stderr/stdin setup.
118  
119      If *stdin_data* is provided, writes it asynchronously via :func:`_pipe_stdin`.
120      Backends with special Popen needs (e.g. local's ``preexec_fn``) can bypass
121      this and call :func:`_pipe_stdin` directly.
122      """
123      proc = subprocess.Popen(
124          cmd,
125          stdout=subprocess.PIPE,
126          stderr=subprocess.STDOUT,
127          stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
128          text=True,
129          **kwargs,
130      )
131      if stdin_data is not None:
132          _pipe_stdin(proc, stdin_data)
133      return proc
134  
135  
136  def _load_json_store(path: Path) -> dict:
137      """Load a JSON file as a dict, returning ``{}`` on any error."""
138      if path.exists():
139          try:
140              return json.loads(path.read_text())
141          except Exception:
142              pass
143      return {}
144  
145  
146  def _save_json_store(path: Path, data: dict) -> None:
147      """Write *data* as pretty-printed JSON to *path*."""
148      path.parent.mkdir(parents=True, exist_ok=True)
149      path.write_text(json.dumps(data, indent=2))
150  
151  
152  def _file_mtime_key(host_path: str) -> tuple[float, int] | None:
153      """Return ``(mtime, size)`` for cache comparison, or ``None`` if unreadable."""
154      try:
155          st = Path(host_path).stat()
156          return (st.st_mtime, st.st_size)
157      except OSError:
158          return None
159  
160  
161  # ---------------------------------------------------------------------------
162  # ProcessHandle protocol
163  # ---------------------------------------------------------------------------
164  
165  
166  class ProcessHandle(Protocol):
167      """Duck type that every backend's _run_bash() must return.
168  
169      subprocess.Popen satisfies this natively.  SDK backends (Modal, Daytona)
170      return _ThreadedProcessHandle which adapts their blocking calls.
171      """
172  
173      def poll(self) -> int | None: ...
174      def kill(self) -> None: ...
175      def wait(self, timeout: float | None = None) -> int: ...
176  
177      @property
178      def stdout(self) -> IO[str] | None: ...
179  
180      @property
181      def returncode(self) -> int | None: ...
182  
183  
184  class _ThreadedProcessHandle:
185      """Adapter for SDK backends (Modal, Daytona) that have no real subprocess.
186  
187      Wraps a blocking ``exec_fn() -> (output_str, exit_code)`` in a background
188      thread and exposes a ProcessHandle-compatible interface.  An optional
189      ``cancel_fn`` is invoked on ``kill()`` for backend-specific cancellation
190      (e.g. Modal sandbox.terminate, Daytona sandbox.stop).
191      """
192  
193      def __init__(
194          self,
195          exec_fn: Callable[[], tuple[str, int]],
196          cancel_fn: Callable[[], None] | None = None,
197      ):
198          self._cancel_fn = cancel_fn
199          self._done = threading.Event()
200          self._returncode: int | None = None
201          self._error: Exception | None = None
202  
203          # Pipe for stdout — drain thread in _wait_for_process reads the read end.
204          read_fd, write_fd = os.pipe()
205          self._stdout = os.fdopen(read_fd, "r", encoding="utf-8", errors="replace")
206          self._write_fd = write_fd
207  
208          def _worker():
209              try:
210                  output, exit_code = exec_fn()
211                  self._returncode = exit_code
212                  # Write output into the pipe so drain thread picks it up.
213                  try:
214                      os.write(self._write_fd, output.encode("utf-8", errors="replace"))
215                  except OSError:
216                      pass
217              except Exception as exc:
218                  self._error = exc
219                  self._returncode = 1
220              finally:
221                  try:
222                      os.close(self._write_fd)
223                  except OSError:
224                      pass
225                  self._done.set()
226  
227          t = threading.Thread(target=_worker, daemon=True)
228          t.start()
229  
230      @property
231      def stdout(self):
232          return self._stdout
233  
234      @property
235      def returncode(self) -> int | None:
236          return self._returncode
237  
238      def poll(self) -> int | None:
239          return self._returncode if self._done.is_set() else None
240  
241      def kill(self):
242          if self._cancel_fn:
243              try:
244                  self._cancel_fn()
245              except Exception:
246                  pass
247  
248      def wait(self, timeout: float | None = None) -> int:
249          self._done.wait(timeout=timeout)
250          return self._returncode
251  
252  
253  # ---------------------------------------------------------------------------
254  # CWD marker for remote backends
255  # ---------------------------------------------------------------------------
256  
257  
258  def _cwd_marker(session_id: str) -> str:
259      return f"__HERMES_CWD_{session_id}__"
260  
261  
262  # ---------------------------------------------------------------------------
263  # BaseEnvironment
264  # ---------------------------------------------------------------------------
265  
266  
267  class BaseEnvironment(ABC):
268      """Common interface and unified execution flow for all Hermes backends.
269  
270      Subclasses implement ``_run_bash()`` and ``cleanup()``.  The base class
271      provides ``execute()`` with session snapshot sourcing, CWD tracking,
272      interrupt handling, and timeout enforcement.
273      """
274  
275      # Subclasses that embed stdin as a heredoc (Modal, Daytona) set this.
276      _stdin_mode: str = "pipe"  # "pipe" or "heredoc"
277  
278      # Snapshot creation timeout (override for slow cold-starts).
279      _snapshot_timeout: int = 30
280  
281      def get_temp_dir(self) -> str:
282          """Return the backend temp directory used for session artifacts.
283  
284          Most sandboxed backends use ``/tmp`` inside the target environment.
285          LocalEnvironment overrides this on platforms like Termux where ``/tmp``
286          may be missing and ``TMPDIR`` is the portable writable location.
287          """
288          return "/tmp"
289  
290      def __init__(self, cwd: str, timeout: int, env: dict = None):
291          self.cwd = cwd
292          self.timeout = timeout
293          self.env = env or {}
294  
295          self._session_id = uuid.uuid4().hex[:12]
296          temp_dir = self.get_temp_dir().rstrip("/") or "/"
297          self._snapshot_path = f"{temp_dir}/hermes-snap-{self._session_id}.sh"
298          self._cwd_file = f"{temp_dir}/hermes-cwd-{self._session_id}.txt"
299          self._cwd_marker = _cwd_marker(self._session_id)
300          self._snapshot_ready = False
301  
302      # ------------------------------------------------------------------
303      # Abstract methods
304      # ------------------------------------------------------------------
305  
306      def _run_bash(
307          self,
308          cmd_string: str,
309          *,
310          login: bool = False,
311          timeout: int = 120,
312          stdin_data: str | None = None,
313      ) -> ProcessHandle:
314          """Spawn a bash process to run *cmd_string*.
315  
316          Returns a ProcessHandle (subprocess.Popen or _ThreadedProcessHandle).
317          Must be overridden by every backend.
318          """
319          raise NotImplementedError(f"{type(self).__name__} must implement _run_bash()")
320  
321      @abstractmethod
322      def cleanup(self):
323          """Release backend resources (container, instance, connection)."""
324          ...
325  
326      # ------------------------------------------------------------------
327      # Session snapshot (init_session)
328      # ------------------------------------------------------------------
329  
330      def init_session(self):
331          """Capture login shell environment into a snapshot file.
332  
333          Called once after backend construction.  On success, sets
334          ``_snapshot_ready = True`` so subsequent commands source the snapshot
335          instead of running with ``bash -l``.
336          """
337          # Full capture: env vars, functions (filtered), aliases, shell options.
338          # Restore configured cwd after login shell profile scripts, which may
339          # change the working directory (e.g. bashrc `cd ~`).  Without this,
340          # pwd -P captures the profile's directory, not terminal.cwd.
341          _quoted_cwd = shlex.quote(self.cwd)
342          bootstrap = (
343              f"export -p > {self._snapshot_path}\n"
344              f"declare -f | grep -vE '^_[^_]' >> {self._snapshot_path}\n"
345              f"alias -p >> {self._snapshot_path}\n"
346              f"echo 'shopt -s expand_aliases' >> {self._snapshot_path}\n"
347              f"echo 'set +e' >> {self._snapshot_path}\n"
348              f"echo 'set +u' >> {self._snapshot_path}\n"
349              f"builtin cd {_quoted_cwd} 2>/dev/null || true\n"
350              f"pwd -P > {self._cwd_file} 2>/dev/null || true\n"
351              f"printf '\\n{self._cwd_marker}%s{self._cwd_marker}\\n' \"$(pwd -P)\"\n"
352          )
353          try:
354              proc = self._run_bash(bootstrap, login=True, timeout=self._snapshot_timeout)
355              result = self._wait_for_process(proc, timeout=self._snapshot_timeout)
356              self._snapshot_ready = True
357              self._update_cwd(result)
358              logger.info(
359                  "Session snapshot created (session=%s, cwd=%s)",
360                  self._session_id,
361                  self.cwd,
362              )
363          except Exception as exc:
364              logger.warning(
365                  "init_session failed (session=%s): %s — "
366                  "falling back to bash -l per command",
367                  self._session_id,
368                  exc,
369              )
370              self._snapshot_ready = False
371  
372      # ------------------------------------------------------------------
373      # Command wrapping
374      # ------------------------------------------------------------------
375  
376      @staticmethod
377      def _quote_cwd_for_cd(cwd: str) -> str:
378          """Quote a ``cd`` target while preserving ``~`` expansion."""
379          if cwd == "~":
380              return cwd
381          if cwd == "~/":
382              return "$HOME"
383          if cwd.startswith("~/"):
384              return f"$HOME/{shlex.quote(cwd[2:])}"
385          return shlex.quote(cwd)
386  
387      def _wrap_command(self, command: str, cwd: str) -> str:
388          """Build the full bash script that sources snapshot, cd's, runs command,
389          re-dumps env vars, and emits CWD markers."""
390          escaped = command.replace("'", "'\\''")
391  
392          parts = []
393  
394          # Source snapshot (env vars from previous commands).
395          # Redirect stdout to /dev/null: on macOS (bash 3.2 and certain
396          # Homebrew bash builds) sourcing a file containing ``declare -x``
397          # can emit the declarations to stdout, leaking ~60 lines of env
398          # vars into every tool response (issue #15459).  Linux bash is
399          # silent here, but the redirect is harmless.
400          if self._snapshot_ready:
401              parts.append(
402                  f"source {self._snapshot_path} >/dev/null 2>&1 || true"
403              )
404  
405          # Preserve bare ``~`` expansion, but rewrite ``~/...`` through
406          # ``$HOME`` so suffixes with spaces remain a single shell word.
407          quoted_cwd = self._quote_cwd_for_cd(cwd)
408          # ``--`` keeps hyphen-prefixed directory names from being parsed as options.
409          parts.append(f"builtin cd -- {quoted_cwd} || exit 126")
410  
411          # Run the actual command
412          parts.append(f"eval '{escaped}'")
413          parts.append("__hermes_ec=$?")
414  
415          # Re-dump env vars to snapshot (last-writer-wins for concurrent calls)
416          if self._snapshot_ready:
417              parts.append(f"export -p > {self._snapshot_path} 2>/dev/null || true")
418  
419          # Write CWD to file (local reads this) and stdout marker (remote parses this)
420          parts.append(f"pwd -P > {self._cwd_file} 2>/dev/null || true")
421          # Use a distinct line for the marker. The leading \n ensures
422          # the marker starts on its own line even if the command doesn't
423          # end with a newline (e.g. printf 'exact'). We'll strip this
424          # injected newline in _extract_cwd_from_output.
425          parts.append(
426              f"printf '\\n{self._cwd_marker}%s{self._cwd_marker}\\n' \"$(pwd -P)\""
427          )
428          parts.append("exit $__hermes_ec")
429  
430          return "\n".join(parts)
431  
432      # ------------------------------------------------------------------
433      # Stdin heredoc embedding (for SDK backends)
434      # ------------------------------------------------------------------
435  
436      @staticmethod
437      def _embed_stdin_heredoc(command: str, stdin_data: str) -> str:
438          """Append stdin_data as a shell heredoc to the command string."""
439          delimiter = f"HERMES_STDIN_{uuid.uuid4().hex[:12]}"
440          return f"{command} << '{delimiter}'\n{stdin_data}\n{delimiter}"
441  
442      # ------------------------------------------------------------------
443      # Process lifecycle
444      # ------------------------------------------------------------------
445  
446      def _wait_for_process(self, proc: ProcessHandle, timeout: int = 120) -> dict:
447          """Poll-based wait with interrupt checking and stdout draining.
448  
449          Shared across all backends — not overridden.
450  
451          Fires the ``activity_callback`` (if set on this instance) every 10s
452          while the process is running so the gateway's inactivity timeout
453          doesn't kill long-running commands.
454  
455          Also wraps the poll loop in a ``try/finally`` that guarantees we
456          call ``self._kill_process(proc)`` if we exit via ``KeyboardInterrupt``
457          or ``SystemExit``.  Without this, the local backend (which spawns
458          subprocesses with ``os.setsid`` into their own process group) leaves
459          an orphan with ``PPID=1`` when python is shut down mid-tool — the
460          ``sleep 300``-survives-30-min bug Physikal and I both hit.
461          """
462          output_chunks: list[str] = []
463  
464          # Non-blocking drain via select().
465          #
466          # The old pattern — ``for line in proc.stdout`` — blocks on
467          # ``readline()`` until the pipe reaches EOF.  When the user's command
468          # backgrounds a process (``cmd &``, ``setsid cmd & disown``, etc.),
469          # that backgrounded grandchild inherits the write-end of our stdout
470          # pipe via ``fork()``.  Even after ``bash`` itself exits, the pipe
471          # stays open because the grandchild still holds it — so the drain
472          # thread never returns and the tool hangs for the full lifetime of
473          # the grandchild (issue #8340: users reported indefinite hangs when
474          # restarting uvicorn with ``setsid ... & disown``).
475          #
476          # The fix: select() with a short poll interval, and stop draining
477          # shortly after ``bash`` exits even if the pipe hasn't EOF'd yet.
478          # Any output the grandchild writes after that point goes to an
479          # orphaned pipe (harmless — the kernel reaps it when our end closes).
480          #
481          # Decoding: we ``os.read()`` raw bytes in fixed-size chunks (4096)
482          # so a single multibyte UTF-8 character can split across reads.  An
483          # incremental decoder buffers partial sequences across chunks, and
484          # ``errors="replace"`` mirrors the baseline ``TextIOWrapper`` (which
485          # was constructed with ``encoding="utf-8", errors="replace"`` on
486          # ``Popen``) so binary or mis-encoded output is preserved with
487          # U+FFFD substitution rather than clobbering the whole buffer.
488          decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
489  
490          def _drain():
491              fd = proc.stdout.fileno()
492              idle_after_exit = 0
493              try:
494                  while True:
495                      try:
496                          ready, _, _ = select.select([fd], [], [], 0.1)
497                      except (ValueError, OSError):
498                          break  # fd already closed
499                      if ready:
500                          try:
501                              chunk = os.read(fd, 4096)
502                          except (ValueError, OSError):
503                              break
504                          if not chunk:
505                              break  # true EOF — all writers closed
506                          output_chunks.append(decoder.decode(chunk))
507                          idle_after_exit = 0
508                      elif proc.poll() is not None:
509                          # bash is gone and the pipe was idle for ~100ms.  Give
510                          # it two more cycles to catch any buffered tail, then
511                          # stop — otherwise we wait forever on a grandchild pipe.
512                          idle_after_exit += 1
513                          if idle_after_exit >= 3:
514                              break
515              finally:
516                  # Flush any bytes buffered mid-sequence.  With ``errors="replace"``
517                  # this emits U+FFFD for any final incomplete sequence rather than
518                  # raising.
519                  try:
520                      tail = decoder.decode(b"", final=True)
521                      if tail:
522                          output_chunks.append(tail)
523                  except Exception:
524                      pass
525  
526          drain_thread = threading.Thread(target=_drain, daemon=True)
527          drain_thread.start()
528          deadline = time.monotonic() + timeout
529          _now = time.monotonic()
530          _activity_state = {
531              "last_touch": _now,
532              "start": _now,
533          }
534  
535          # --- Debug tracing (opt-in via HERMES_DEBUG_INTERRUPT=1) -------------
536          # Captures loop entry/exit, interrupt state changes, and periodic
537          # heartbeats so we can diagnose "agent never sees the interrupt"
538          # reports without reproducing locally.
539          _tid = threading.current_thread().ident
540          _pid = getattr(proc, "pid", None)
541          _iter_count = 0
542          _last_heartbeat = _now
543          _last_interrupt_state = False
544          _cb_was_none = _get_activity_callback() is None
545          if _DEBUG_INTERRUPT:
546              logger.info(
547                  "[interrupt-debug] _wait_for_process ENTER tid=%s pid=%s "
548                  "timeout=%ss activity_cb=%s initial_interrupt=%s",
549                  _tid, _pid, timeout,
550                  "set" if not _cb_was_none else "MISSING",
551                  is_interrupted(),
552              )
553  
554          try:
555              while proc.poll() is None:
556                  _iter_count += 1
557                  if is_interrupted():
558                      if _DEBUG_INTERRUPT:
559                          logger.info(
560                              "[interrupt-debug] _wait_for_process INTERRUPT DETECTED "
561                              "tid=%s pid=%s iter=%d elapsed=%.1fs — killing process group",
562                              _tid, _pid, _iter_count, time.monotonic() - _activity_state["start"],
563                          )
564                      self._kill_process(proc)
565                      drain_thread.join(timeout=2)
566                      return {
567                          "output": "".join(output_chunks) + "\n[Command interrupted]",
568                          "returncode": 130,
569                      }
570                  if time.monotonic() > deadline:
571                      if _DEBUG_INTERRUPT:
572                          logger.info(
573                              "[interrupt-debug] _wait_for_process TIMEOUT "
574                              "tid=%s pid=%s iter=%d timeout=%ss",
575                              _tid, _pid, _iter_count, timeout,
576                          )
577                      self._kill_process(proc)
578                      drain_thread.join(timeout=2)
579                      partial = "".join(output_chunks)
580                      timeout_msg = f"\n[Command timed out after {timeout}s]"
581                      return {
582                          "output": partial + timeout_msg
583                          if partial
584                          else timeout_msg.lstrip(),
585                          "returncode": 124,
586                      }
587                  # Periodic activity touch so the gateway knows we're alive
588                  touch_activity_if_due(_activity_state, "terminal command running")
589  
590                  # Heartbeat every ~30s: proves the loop is alive and reports
591                  # the activity-callback state (thread-local, can get clobbered
592                  # by nested tool calls or executor thread reuse).
593                  if _DEBUG_INTERRUPT and time.monotonic() - _last_heartbeat >= 30.0:
594                      _cb_now_none = _get_activity_callback() is None
595                      logger.info(
596                          "[interrupt-debug] _wait_for_process HEARTBEAT "
597                          "tid=%s pid=%s iter=%d elapsed=%.0fs "
598                          "interrupt=%s activity_cb=%s%s",
599                          _tid, _pid, _iter_count,
600                          time.monotonic() - _activity_state["start"],
601                          is_interrupted(),
602                          "set" if not _cb_now_none else "MISSING",
603                          " (LOST during run)" if _cb_now_none and not _cb_was_none else "",
604                      )
605                      _last_heartbeat = time.monotonic()
606                      _cb_was_none = _cb_now_none
607  
608                  time.sleep(0.2)
609          except (KeyboardInterrupt, SystemExit):
610              # Signal arrived (SIGTERM/SIGHUP/SIGINT) or sys.exit() was called
611              # while we were polling.  The local backend spawns subprocesses
612              # with os.setsid, which puts them in their own process group — so
613              # if we let the interrupt propagate without killing the child,
614              # python exits and the child is reparented to init (PPID=1) and
615              # keeps running as an orphan.  Killing the process group here
616              # guarantees the tool's side effects stop when the agent stops.
617              if _DEBUG_INTERRUPT:
618                  logger.info(
619                      "[interrupt-debug] _wait_for_process EXCEPTION_EXIT "
620                      "tid=%s pid=%s iter=%d elapsed=%.1fs — killing subprocess group before re-raise",
621                      _tid, _pid, _iter_count,
622                      time.monotonic() - _activity_state["start"],
623                  )
624              try:
625                  self._kill_process(proc)
626                  drain_thread.join(timeout=2)
627              except Exception:
628                  pass  # cleanup is best-effort
629              raise
630  
631          # Drain thread now exits promptly after bash does (~300ms idle
632          # check).  A short join is enough; a long one would be a bug since
633          # it means the non-blocking loop itself stopped cooperating.
634          drain_thread.join(timeout=2)
635  
636          try:
637              proc.stdout.close()
638          except Exception:
639              pass
640  
641          if _DEBUG_INTERRUPT:
642              logger.info(
643                  "[interrupt-debug] _wait_for_process EXIT (natural) "
644                  "tid=%s pid=%s iter=%d elapsed=%.1fs returncode=%s",
645                  _tid, _pid, _iter_count,
646                  time.monotonic() - _activity_state["start"],
647                  proc.returncode,
648              )
649  
650          return {"output": "".join(output_chunks), "returncode": proc.returncode}
651  
652      def _kill_process(self, proc: ProcessHandle):
653          """Terminate a process. Subclasses may override for process-group kill."""
654          try:
655              proc.kill()
656          except (ProcessLookupError, PermissionError, OSError):
657              pass
658  
659      # ------------------------------------------------------------------
660      # CWD extraction
661      # ------------------------------------------------------------------
662  
663      def _update_cwd(self, result: dict):
664          """Extract CWD from command output. Override for local file-based read."""
665          self._extract_cwd_from_output(result)
666  
667      def _extract_cwd_from_output(self, result: dict):
668          """Parse the __HERMES_CWD_{session}__ marker from stdout output.
669  
670          Updates self.cwd and strips the marker from result["output"].
671          Used by remote backends (Docker, SSH, Modal, Daytona, Singularity).
672          """
673          output = result.get("output", "")
674          marker = self._cwd_marker
675          last = output.rfind(marker)
676          if last == -1:
677              return
678  
679          # Find the opening marker before this closing one
680          search_start = max(0, last - 4096)  # CWD path won't be >4KB
681          first = output.rfind(marker, search_start, last)
682          if first == -1 or first == last:
683              return
684  
685          cwd_path = output[first + len(marker) : last].strip()
686          if cwd_path:
687              self.cwd = cwd_path
688  
689          # Strip the marker line AND the \n we injected before it.
690          # The wrapper emits: printf '\n__MARKER__%s__MARKER__\n'
691          # So the output looks like: <cmd output>\n__MARKER__path__MARKER__\n
692          # We want to remove everything from the injected \n onwards.
693          line_start = output.rfind("\n", 0, first)
694          if line_start == -1:
695              line_start = first
696          line_end = output.find("\n", last + len(marker))
697          line_end = line_end + 1 if line_end != -1 else len(output)
698  
699          result["output"] = output[:line_start] + output[line_end:]
700  
701      # ------------------------------------------------------------------
702      # Hooks
703      # ------------------------------------------------------------------
704  
705      def _before_execute(self) -> None:
706          """Hook called before each command execution.
707  
708          Remote backends (SSH, Modal, Daytona) override this to trigger
709          their FileSyncManager.  Bind-mount backends (Docker, Singularity)
710          and Local don't need file sync — the host filesystem is directly
711          visible inside the container/process.
712          """
713          pass
714  
715      # ------------------------------------------------------------------
716      # Unified execute()
717      # ------------------------------------------------------------------
718  
719      def execute(
720          self,
721          command: str,
722          cwd: str = "",
723          *,
724          timeout: int | None = None,
725          stdin_data: str | None = None,
726      ) -> dict:
727          """Execute a command, return {"output": str, "returncode": int}."""
728          self._before_execute()
729  
730          exec_command, sudo_stdin = self._prepare_command(command)
731          # Guard against the `A && B &` subshell-wait trap: bash forks a
732          # subshell for the compound that then waits for an infinite B (a
733          # server, `yes > /dev/null`, etc.), leaking the subshell forever.
734          # Rewriting to `A && { B & }` runs B as a plain background in the
735          # current shell — no subshell wait.
736          from tools.terminal_tool import _rewrite_compound_background
737          exec_command = _rewrite_compound_background(exec_command)
738          effective_timeout = timeout or self.timeout
739          effective_cwd = cwd or self.cwd
740  
741          # Merge sudo stdin with caller stdin
742          if sudo_stdin is not None and stdin_data is not None:
743              effective_stdin = sudo_stdin + stdin_data
744          elif sudo_stdin is not None:
745              effective_stdin = sudo_stdin
746          else:
747              effective_stdin = stdin_data
748  
749          # Embed stdin as heredoc for backends that need it
750          if effective_stdin and self._stdin_mode == "heredoc":
751              exec_command = self._embed_stdin_heredoc(exec_command, effective_stdin)
752              effective_stdin = None
753  
754          wrapped = self._wrap_command(exec_command, effective_cwd)
755  
756          # Use login shell if snapshot failed (so user's profile still loads)
757          login = not self._snapshot_ready
758  
759          proc = self._run_bash(
760              wrapped, login=login, timeout=effective_timeout, stdin_data=effective_stdin
761          )
762          result = self._wait_for_process(proc, timeout=effective_timeout)
763          self._update_cwd(result)
764  
765          return result
766  
767      # ------------------------------------------------------------------
768      # Shared helpers
769      # ------------------------------------------------------------------
770  
771      def stop(self):
772          """Alias for cleanup (compat with older callers)."""
773          self.cleanup()
774  
775      def __del__(self):
776          try:
777              self.cleanup()
778          except Exception:
779              pass
780  
781      def _prepare_command(self, command: str) -> tuple[str, str | None]:
782          """Transform sudo commands if SUDO_PASSWORD is available."""
783          from tools.terminal_tool import _transform_sudo_command
784  
785          return _transform_sudo_command(command)
786