base.py
1 """Base class for all Hermes execution environment backends. 2 3 Unified spawn-per-call model: every command spawns a fresh ``bash -c`` process. 4 A session snapshot (env vars, functions, aliases) is captured once at init and 5 re-sourced before each command. CWD persists via in-band stdout markers (remote) 6 or a temp file (local). 7 """ 8 9 import codecs 10 import json 11 import logging 12 import os 13 import select 14 import shlex 15 import subprocess 16 import threading 17 import time 18 import uuid 19 from abc import ABC, abstractmethod 20 from pathlib import Path 21 from typing import IO, Callable, Protocol 22 23 from hermes_constants import get_hermes_home 24 from tools.interrupt import is_interrupted 25 26 logger = logging.getLogger(__name__) 27 28 # Opt-in debug tracing for the interrupt/activity/poll machinery. Set 29 # HERMES_DEBUG_INTERRUPT=1 to log loop entry/exit, periodic heartbeats, and 30 # every is_interrupted() state change from _wait_for_process. Off by default 31 # to avoid flooding production gateway logs. 32 _DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT")) 33 34 if _DEBUG_INTERRUPT: 35 # AIAgent's quiet_mode path (run_agent.py) forces the `tools` logger to 36 # ERROR on CLI startup, which would silently swallow every trace we emit. 37 # Force this module's own logger back to INFO so the trace is visible in 38 # agent.log regardless of quiet-mode. Scoped to the opt-in case only. 39 logger.setLevel(logging.INFO) 40 41 # Thread-local activity callback. The agent sets this before a tool call so 42 # long-running _wait_for_process loops can report liveness to the gateway. 43 _activity_callback_local = threading.local() 44 45 46 def set_activity_callback(cb: Callable[[str], None] | None) -> None: 47 """Register a callback that _wait_for_process fires periodically.""" 48 _activity_callback_local.callback = cb 49 50 51 def _get_activity_callback() -> Callable[[str], None] | None: 52 return getattr(_activity_callback_local, "callback", None) 53 54 55 def touch_activity_if_due( 56 state: dict, 57 label: str, 58 ) -> None: 59 """Fire the activity callback at most once every ``state['interval']`` seconds. 60 61 *state* must contain ``last_touch`` (monotonic timestamp) and ``start`` 62 (monotonic timestamp of the operation start). An optional ``interval`` 63 key overrides the default 10 s cadence. 64 65 Swallows all exceptions so callers don't need their own try/except. 66 """ 67 now = time.monotonic() 68 interval = state.get("interval", 10.0) 69 if now - state["last_touch"] < interval: 70 return 71 state["last_touch"] = now 72 try: 73 cb = _get_activity_callback() 74 if cb: 75 elapsed = int(now - state["start"]) 76 cb(f"{label} ({elapsed}s elapsed)") 77 except Exception: 78 pass 79 80 81 def get_sandbox_dir() -> Path: 82 """Return the host-side root for all sandbox storage (Docker workspaces, 83 Singularity overlays/SIF cache, etc.). 84 85 Configurable via TERMINAL_SANDBOX_DIR. Defaults to {HERMES_HOME}/sandboxes/. 86 """ 87 custom = os.getenv("TERMINAL_SANDBOX_DIR") 88 if custom: 89 p = Path(custom) 90 else: 91 p = get_hermes_home() / "sandboxes" 92 p.mkdir(parents=True, exist_ok=True) 93 return p 94 95 96 # --------------------------------------------------------------------------- 97 # Shared constants and utilities 98 # --------------------------------------------------------------------------- 99 100 101 def _pipe_stdin(proc: subprocess.Popen, data: str) -> None: 102 """Write *data* to proc.stdin on a daemon thread to avoid pipe-buffer deadlocks.""" 103 104 def _write(): 105 try: 106 proc.stdin.write(data) 107 proc.stdin.close() 108 except (BrokenPipeError, OSError): 109 pass 110 111 threading.Thread(target=_write, daemon=True).start() 112 113 114 def _popen_bash( 115 cmd: list[str], stdin_data: str | None = None, **kwargs 116 ) -> subprocess.Popen: 117 """Spawn a subprocess with standard stdout/stderr/stdin setup. 118 119 If *stdin_data* is provided, writes it asynchronously via :func:`_pipe_stdin`. 120 Backends with special Popen needs (e.g. local's ``preexec_fn``) can bypass 121 this and call :func:`_pipe_stdin` directly. 122 """ 123 proc = subprocess.Popen( 124 cmd, 125 stdout=subprocess.PIPE, 126 stderr=subprocess.STDOUT, 127 stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL, 128 text=True, 129 **kwargs, 130 ) 131 if stdin_data is not None: 132 _pipe_stdin(proc, stdin_data) 133 return proc 134 135 136 def _load_json_store(path: Path) -> dict: 137 """Load a JSON file as a dict, returning ``{}`` on any error.""" 138 if path.exists(): 139 try: 140 return json.loads(path.read_text()) 141 except Exception: 142 pass 143 return {} 144 145 146 def _save_json_store(path: Path, data: dict) -> None: 147 """Write *data* as pretty-printed JSON to *path*.""" 148 path.parent.mkdir(parents=True, exist_ok=True) 149 path.write_text(json.dumps(data, indent=2)) 150 151 152 def _file_mtime_key(host_path: str) -> tuple[float, int] | None: 153 """Return ``(mtime, size)`` for cache comparison, or ``None`` if unreadable.""" 154 try: 155 st = Path(host_path).stat() 156 return (st.st_mtime, st.st_size) 157 except OSError: 158 return None 159 160 161 # --------------------------------------------------------------------------- 162 # ProcessHandle protocol 163 # --------------------------------------------------------------------------- 164 165 166 class ProcessHandle(Protocol): 167 """Duck type that every backend's _run_bash() must return. 168 169 subprocess.Popen satisfies this natively. SDK backends (Modal, Daytona) 170 return _ThreadedProcessHandle which adapts their blocking calls. 171 """ 172 173 def poll(self) -> int | None: ... 174 def kill(self) -> None: ... 175 def wait(self, timeout: float | None = None) -> int: ... 176 177 @property 178 def stdout(self) -> IO[str] | None: ... 179 180 @property 181 def returncode(self) -> int | None: ... 182 183 184 class _ThreadedProcessHandle: 185 """Adapter for SDK backends (Modal, Daytona) that have no real subprocess. 186 187 Wraps a blocking ``exec_fn() -> (output_str, exit_code)`` in a background 188 thread and exposes a ProcessHandle-compatible interface. An optional 189 ``cancel_fn`` is invoked on ``kill()`` for backend-specific cancellation 190 (e.g. Modal sandbox.terminate, Daytona sandbox.stop). 191 """ 192 193 def __init__( 194 self, 195 exec_fn: Callable[[], tuple[str, int]], 196 cancel_fn: Callable[[], None] | None = None, 197 ): 198 self._cancel_fn = cancel_fn 199 self._done = threading.Event() 200 self._returncode: int | None = None 201 self._error: Exception | None = None 202 203 # Pipe for stdout — drain thread in _wait_for_process reads the read end. 204 read_fd, write_fd = os.pipe() 205 self._stdout = os.fdopen(read_fd, "r", encoding="utf-8", errors="replace") 206 self._write_fd = write_fd 207 208 def _worker(): 209 try: 210 output, exit_code = exec_fn() 211 self._returncode = exit_code 212 # Write output into the pipe so drain thread picks it up. 213 try: 214 os.write(self._write_fd, output.encode("utf-8", errors="replace")) 215 except OSError: 216 pass 217 except Exception as exc: 218 self._error = exc 219 self._returncode = 1 220 finally: 221 try: 222 os.close(self._write_fd) 223 except OSError: 224 pass 225 self._done.set() 226 227 t = threading.Thread(target=_worker, daemon=True) 228 t.start() 229 230 @property 231 def stdout(self): 232 return self._stdout 233 234 @property 235 def returncode(self) -> int | None: 236 return self._returncode 237 238 def poll(self) -> int | None: 239 return self._returncode if self._done.is_set() else None 240 241 def kill(self): 242 if self._cancel_fn: 243 try: 244 self._cancel_fn() 245 except Exception: 246 pass 247 248 def wait(self, timeout: float | None = None) -> int: 249 self._done.wait(timeout=timeout) 250 return self._returncode 251 252 253 # --------------------------------------------------------------------------- 254 # CWD marker for remote backends 255 # --------------------------------------------------------------------------- 256 257 258 def _cwd_marker(session_id: str) -> str: 259 return f"__HERMES_CWD_{session_id}__" 260 261 262 # --------------------------------------------------------------------------- 263 # BaseEnvironment 264 # --------------------------------------------------------------------------- 265 266 267 class BaseEnvironment(ABC): 268 """Common interface and unified execution flow for all Hermes backends. 269 270 Subclasses implement ``_run_bash()`` and ``cleanup()``. The base class 271 provides ``execute()`` with session snapshot sourcing, CWD tracking, 272 interrupt handling, and timeout enforcement. 273 """ 274 275 # Subclasses that embed stdin as a heredoc (Modal, Daytona) set this. 276 _stdin_mode: str = "pipe" # "pipe" or "heredoc" 277 278 # Snapshot creation timeout (override for slow cold-starts). 279 _snapshot_timeout: int = 30 280 281 def get_temp_dir(self) -> str: 282 """Return the backend temp directory used for session artifacts. 283 284 Most sandboxed backends use ``/tmp`` inside the target environment. 285 LocalEnvironment overrides this on platforms like Termux where ``/tmp`` 286 may be missing and ``TMPDIR`` is the portable writable location. 287 """ 288 return "/tmp" 289 290 def __init__(self, cwd: str, timeout: int, env: dict = None): 291 self.cwd = cwd 292 self.timeout = timeout 293 self.env = env or {} 294 295 self._session_id = uuid.uuid4().hex[:12] 296 temp_dir = self.get_temp_dir().rstrip("/") or "/" 297 self._snapshot_path = f"{temp_dir}/hermes-snap-{self._session_id}.sh" 298 self._cwd_file = f"{temp_dir}/hermes-cwd-{self._session_id}.txt" 299 self._cwd_marker = _cwd_marker(self._session_id) 300 self._snapshot_ready = False 301 302 # ------------------------------------------------------------------ 303 # Abstract methods 304 # ------------------------------------------------------------------ 305 306 def _run_bash( 307 self, 308 cmd_string: str, 309 *, 310 login: bool = False, 311 timeout: int = 120, 312 stdin_data: str | None = None, 313 ) -> ProcessHandle: 314 """Spawn a bash process to run *cmd_string*. 315 316 Returns a ProcessHandle (subprocess.Popen or _ThreadedProcessHandle). 317 Must be overridden by every backend. 318 """ 319 raise NotImplementedError(f"{type(self).__name__} must implement _run_bash()") 320 321 @abstractmethod 322 def cleanup(self): 323 """Release backend resources (container, instance, connection).""" 324 ... 325 326 # ------------------------------------------------------------------ 327 # Session snapshot (init_session) 328 # ------------------------------------------------------------------ 329 330 def init_session(self): 331 """Capture login shell environment into a snapshot file. 332 333 Called once after backend construction. On success, sets 334 ``_snapshot_ready = True`` so subsequent commands source the snapshot 335 instead of running with ``bash -l``. 336 """ 337 # Full capture: env vars, functions (filtered), aliases, shell options. 338 # Restore configured cwd after login shell profile scripts, which may 339 # change the working directory (e.g. bashrc `cd ~`). Without this, 340 # pwd -P captures the profile's directory, not terminal.cwd. 341 _quoted_cwd = shlex.quote(self.cwd) 342 bootstrap = ( 343 f"export -p > {self._snapshot_path}\n" 344 f"declare -f | grep -vE '^_[^_]' >> {self._snapshot_path}\n" 345 f"alias -p >> {self._snapshot_path}\n" 346 f"echo 'shopt -s expand_aliases' >> {self._snapshot_path}\n" 347 f"echo 'set +e' >> {self._snapshot_path}\n" 348 f"echo 'set +u' >> {self._snapshot_path}\n" 349 f"builtin cd {_quoted_cwd} 2>/dev/null || true\n" 350 f"pwd -P > {self._cwd_file} 2>/dev/null || true\n" 351 f"printf '\\n{self._cwd_marker}%s{self._cwd_marker}\\n' \"$(pwd -P)\"\n" 352 ) 353 try: 354 proc = self._run_bash(bootstrap, login=True, timeout=self._snapshot_timeout) 355 result = self._wait_for_process(proc, timeout=self._snapshot_timeout) 356 self._snapshot_ready = True 357 self._update_cwd(result) 358 logger.info( 359 "Session snapshot created (session=%s, cwd=%s)", 360 self._session_id, 361 self.cwd, 362 ) 363 except Exception as exc: 364 logger.warning( 365 "init_session failed (session=%s): %s — " 366 "falling back to bash -l per command", 367 self._session_id, 368 exc, 369 ) 370 self._snapshot_ready = False 371 372 # ------------------------------------------------------------------ 373 # Command wrapping 374 # ------------------------------------------------------------------ 375 376 @staticmethod 377 def _quote_cwd_for_cd(cwd: str) -> str: 378 """Quote a ``cd`` target while preserving ``~`` expansion.""" 379 if cwd == "~": 380 return cwd 381 if cwd == "~/": 382 return "$HOME" 383 if cwd.startswith("~/"): 384 return f"$HOME/{shlex.quote(cwd[2:])}" 385 return shlex.quote(cwd) 386 387 def _wrap_command(self, command: str, cwd: str) -> str: 388 """Build the full bash script that sources snapshot, cd's, runs command, 389 re-dumps env vars, and emits CWD markers.""" 390 escaped = command.replace("'", "'\\''") 391 392 parts = [] 393 394 # Source snapshot (env vars from previous commands). 395 # Redirect stdout to /dev/null: on macOS (bash 3.2 and certain 396 # Homebrew bash builds) sourcing a file containing ``declare -x`` 397 # can emit the declarations to stdout, leaking ~60 lines of env 398 # vars into every tool response (issue #15459). Linux bash is 399 # silent here, but the redirect is harmless. 400 if self._snapshot_ready: 401 parts.append( 402 f"source {self._snapshot_path} >/dev/null 2>&1 || true" 403 ) 404 405 # Preserve bare ``~`` expansion, but rewrite ``~/...`` through 406 # ``$HOME`` so suffixes with spaces remain a single shell word. 407 quoted_cwd = self._quote_cwd_for_cd(cwd) 408 # ``--`` keeps hyphen-prefixed directory names from being parsed as options. 409 parts.append(f"builtin cd -- {quoted_cwd} || exit 126") 410 411 # Run the actual command 412 parts.append(f"eval '{escaped}'") 413 parts.append("__hermes_ec=$?") 414 415 # Re-dump env vars to snapshot (last-writer-wins for concurrent calls) 416 if self._snapshot_ready: 417 parts.append(f"export -p > {self._snapshot_path} 2>/dev/null || true") 418 419 # Write CWD to file (local reads this) and stdout marker (remote parses this) 420 parts.append(f"pwd -P > {self._cwd_file} 2>/dev/null || true") 421 # Use a distinct line for the marker. The leading \n ensures 422 # the marker starts on its own line even if the command doesn't 423 # end with a newline (e.g. printf 'exact'). We'll strip this 424 # injected newline in _extract_cwd_from_output. 425 parts.append( 426 f"printf '\\n{self._cwd_marker}%s{self._cwd_marker}\\n' \"$(pwd -P)\"" 427 ) 428 parts.append("exit $__hermes_ec") 429 430 return "\n".join(parts) 431 432 # ------------------------------------------------------------------ 433 # Stdin heredoc embedding (for SDK backends) 434 # ------------------------------------------------------------------ 435 436 @staticmethod 437 def _embed_stdin_heredoc(command: str, stdin_data: str) -> str: 438 """Append stdin_data as a shell heredoc to the command string.""" 439 delimiter = f"HERMES_STDIN_{uuid.uuid4().hex[:12]}" 440 return f"{command} << '{delimiter}'\n{stdin_data}\n{delimiter}" 441 442 # ------------------------------------------------------------------ 443 # Process lifecycle 444 # ------------------------------------------------------------------ 445 446 def _wait_for_process(self, proc: ProcessHandle, timeout: int = 120) -> dict: 447 """Poll-based wait with interrupt checking and stdout draining. 448 449 Shared across all backends — not overridden. 450 451 Fires the ``activity_callback`` (if set on this instance) every 10s 452 while the process is running so the gateway's inactivity timeout 453 doesn't kill long-running commands. 454 455 Also wraps the poll loop in a ``try/finally`` that guarantees we 456 call ``self._kill_process(proc)`` if we exit via ``KeyboardInterrupt`` 457 or ``SystemExit``. Without this, the local backend (which spawns 458 subprocesses with ``os.setsid`` into their own process group) leaves 459 an orphan with ``PPID=1`` when python is shut down mid-tool — the 460 ``sleep 300``-survives-30-min bug Physikal and I both hit. 461 """ 462 output_chunks: list[str] = [] 463 464 # Non-blocking drain via select(). 465 # 466 # The old pattern — ``for line in proc.stdout`` — blocks on 467 # ``readline()`` until the pipe reaches EOF. When the user's command 468 # backgrounds a process (``cmd &``, ``setsid cmd & disown``, etc.), 469 # that backgrounded grandchild inherits the write-end of our stdout 470 # pipe via ``fork()``. Even after ``bash`` itself exits, the pipe 471 # stays open because the grandchild still holds it — so the drain 472 # thread never returns and the tool hangs for the full lifetime of 473 # the grandchild (issue #8340: users reported indefinite hangs when 474 # restarting uvicorn with ``setsid ... & disown``). 475 # 476 # The fix: select() with a short poll interval, and stop draining 477 # shortly after ``bash`` exits even if the pipe hasn't EOF'd yet. 478 # Any output the grandchild writes after that point goes to an 479 # orphaned pipe (harmless — the kernel reaps it when our end closes). 480 # 481 # Decoding: we ``os.read()`` raw bytes in fixed-size chunks (4096) 482 # so a single multibyte UTF-8 character can split across reads. An 483 # incremental decoder buffers partial sequences across chunks, and 484 # ``errors="replace"`` mirrors the baseline ``TextIOWrapper`` (which 485 # was constructed with ``encoding="utf-8", errors="replace"`` on 486 # ``Popen``) so binary or mis-encoded output is preserved with 487 # U+FFFD substitution rather than clobbering the whole buffer. 488 decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") 489 490 def _drain(): 491 fd = proc.stdout.fileno() 492 idle_after_exit = 0 493 try: 494 while True: 495 try: 496 ready, _, _ = select.select([fd], [], [], 0.1) 497 except (ValueError, OSError): 498 break # fd already closed 499 if ready: 500 try: 501 chunk = os.read(fd, 4096) 502 except (ValueError, OSError): 503 break 504 if not chunk: 505 break # true EOF — all writers closed 506 output_chunks.append(decoder.decode(chunk)) 507 idle_after_exit = 0 508 elif proc.poll() is not None: 509 # bash is gone and the pipe was idle for ~100ms. Give 510 # it two more cycles to catch any buffered tail, then 511 # stop — otherwise we wait forever on a grandchild pipe. 512 idle_after_exit += 1 513 if idle_after_exit >= 3: 514 break 515 finally: 516 # Flush any bytes buffered mid-sequence. With ``errors="replace"`` 517 # this emits U+FFFD for any final incomplete sequence rather than 518 # raising. 519 try: 520 tail = decoder.decode(b"", final=True) 521 if tail: 522 output_chunks.append(tail) 523 except Exception: 524 pass 525 526 drain_thread = threading.Thread(target=_drain, daemon=True) 527 drain_thread.start() 528 deadline = time.monotonic() + timeout 529 _now = time.monotonic() 530 _activity_state = { 531 "last_touch": _now, 532 "start": _now, 533 } 534 535 # --- Debug tracing (opt-in via HERMES_DEBUG_INTERRUPT=1) ------------- 536 # Captures loop entry/exit, interrupt state changes, and periodic 537 # heartbeats so we can diagnose "agent never sees the interrupt" 538 # reports without reproducing locally. 539 _tid = threading.current_thread().ident 540 _pid = getattr(proc, "pid", None) 541 _iter_count = 0 542 _last_heartbeat = _now 543 _last_interrupt_state = False 544 _cb_was_none = _get_activity_callback() is None 545 if _DEBUG_INTERRUPT: 546 logger.info( 547 "[interrupt-debug] _wait_for_process ENTER tid=%s pid=%s " 548 "timeout=%ss activity_cb=%s initial_interrupt=%s", 549 _tid, _pid, timeout, 550 "set" if not _cb_was_none else "MISSING", 551 is_interrupted(), 552 ) 553 554 try: 555 while proc.poll() is None: 556 _iter_count += 1 557 if is_interrupted(): 558 if _DEBUG_INTERRUPT: 559 logger.info( 560 "[interrupt-debug] _wait_for_process INTERRUPT DETECTED " 561 "tid=%s pid=%s iter=%d elapsed=%.1fs — killing process group", 562 _tid, _pid, _iter_count, time.monotonic() - _activity_state["start"], 563 ) 564 self._kill_process(proc) 565 drain_thread.join(timeout=2) 566 return { 567 "output": "".join(output_chunks) + "\n[Command interrupted]", 568 "returncode": 130, 569 } 570 if time.monotonic() > deadline: 571 if _DEBUG_INTERRUPT: 572 logger.info( 573 "[interrupt-debug] _wait_for_process TIMEOUT " 574 "tid=%s pid=%s iter=%d timeout=%ss", 575 _tid, _pid, _iter_count, timeout, 576 ) 577 self._kill_process(proc) 578 drain_thread.join(timeout=2) 579 partial = "".join(output_chunks) 580 timeout_msg = f"\n[Command timed out after {timeout}s]" 581 return { 582 "output": partial + timeout_msg 583 if partial 584 else timeout_msg.lstrip(), 585 "returncode": 124, 586 } 587 # Periodic activity touch so the gateway knows we're alive 588 touch_activity_if_due(_activity_state, "terminal command running") 589 590 # Heartbeat every ~30s: proves the loop is alive and reports 591 # the activity-callback state (thread-local, can get clobbered 592 # by nested tool calls or executor thread reuse). 593 if _DEBUG_INTERRUPT and time.monotonic() - _last_heartbeat >= 30.0: 594 _cb_now_none = _get_activity_callback() is None 595 logger.info( 596 "[interrupt-debug] _wait_for_process HEARTBEAT " 597 "tid=%s pid=%s iter=%d elapsed=%.0fs " 598 "interrupt=%s activity_cb=%s%s", 599 _tid, _pid, _iter_count, 600 time.monotonic() - _activity_state["start"], 601 is_interrupted(), 602 "set" if not _cb_now_none else "MISSING", 603 " (LOST during run)" if _cb_now_none and not _cb_was_none else "", 604 ) 605 _last_heartbeat = time.monotonic() 606 _cb_was_none = _cb_now_none 607 608 time.sleep(0.2) 609 except (KeyboardInterrupt, SystemExit): 610 # Signal arrived (SIGTERM/SIGHUP/SIGINT) or sys.exit() was called 611 # while we were polling. The local backend spawns subprocesses 612 # with os.setsid, which puts them in their own process group — so 613 # if we let the interrupt propagate without killing the child, 614 # python exits and the child is reparented to init (PPID=1) and 615 # keeps running as an orphan. Killing the process group here 616 # guarantees the tool's side effects stop when the agent stops. 617 if _DEBUG_INTERRUPT: 618 logger.info( 619 "[interrupt-debug] _wait_for_process EXCEPTION_EXIT " 620 "tid=%s pid=%s iter=%d elapsed=%.1fs — killing subprocess group before re-raise", 621 _tid, _pid, _iter_count, 622 time.monotonic() - _activity_state["start"], 623 ) 624 try: 625 self._kill_process(proc) 626 drain_thread.join(timeout=2) 627 except Exception: 628 pass # cleanup is best-effort 629 raise 630 631 # Drain thread now exits promptly after bash does (~300ms idle 632 # check). A short join is enough; a long one would be a bug since 633 # it means the non-blocking loop itself stopped cooperating. 634 drain_thread.join(timeout=2) 635 636 try: 637 proc.stdout.close() 638 except Exception: 639 pass 640 641 if _DEBUG_INTERRUPT: 642 logger.info( 643 "[interrupt-debug] _wait_for_process EXIT (natural) " 644 "tid=%s pid=%s iter=%d elapsed=%.1fs returncode=%s", 645 _tid, _pid, _iter_count, 646 time.monotonic() - _activity_state["start"], 647 proc.returncode, 648 ) 649 650 return {"output": "".join(output_chunks), "returncode": proc.returncode} 651 652 def _kill_process(self, proc: ProcessHandle): 653 """Terminate a process. Subclasses may override for process-group kill.""" 654 try: 655 proc.kill() 656 except (ProcessLookupError, PermissionError, OSError): 657 pass 658 659 # ------------------------------------------------------------------ 660 # CWD extraction 661 # ------------------------------------------------------------------ 662 663 def _update_cwd(self, result: dict): 664 """Extract CWD from command output. Override for local file-based read.""" 665 self._extract_cwd_from_output(result) 666 667 def _extract_cwd_from_output(self, result: dict): 668 """Parse the __HERMES_CWD_{session}__ marker from stdout output. 669 670 Updates self.cwd and strips the marker from result["output"]. 671 Used by remote backends (Docker, SSH, Modal, Daytona, Singularity). 672 """ 673 output = result.get("output", "") 674 marker = self._cwd_marker 675 last = output.rfind(marker) 676 if last == -1: 677 return 678 679 # Find the opening marker before this closing one 680 search_start = max(0, last - 4096) # CWD path won't be >4KB 681 first = output.rfind(marker, search_start, last) 682 if first == -1 or first == last: 683 return 684 685 cwd_path = output[first + len(marker) : last].strip() 686 if cwd_path: 687 self.cwd = cwd_path 688 689 # Strip the marker line AND the \n we injected before it. 690 # The wrapper emits: printf '\n__MARKER__%s__MARKER__\n' 691 # So the output looks like: <cmd output>\n__MARKER__path__MARKER__\n 692 # We want to remove everything from the injected \n onwards. 693 line_start = output.rfind("\n", 0, first) 694 if line_start == -1: 695 line_start = first 696 line_end = output.find("\n", last + len(marker)) 697 line_end = line_end + 1 if line_end != -1 else len(output) 698 699 result["output"] = output[:line_start] + output[line_end:] 700 701 # ------------------------------------------------------------------ 702 # Hooks 703 # ------------------------------------------------------------------ 704 705 def _before_execute(self) -> None: 706 """Hook called before each command execution. 707 708 Remote backends (SSH, Modal, Daytona) override this to trigger 709 their FileSyncManager. Bind-mount backends (Docker, Singularity) 710 and Local don't need file sync — the host filesystem is directly 711 visible inside the container/process. 712 """ 713 pass 714 715 # ------------------------------------------------------------------ 716 # Unified execute() 717 # ------------------------------------------------------------------ 718 719 def execute( 720 self, 721 command: str, 722 cwd: str = "", 723 *, 724 timeout: int | None = None, 725 stdin_data: str | None = None, 726 ) -> dict: 727 """Execute a command, return {"output": str, "returncode": int}.""" 728 self._before_execute() 729 730 exec_command, sudo_stdin = self._prepare_command(command) 731 # Guard against the `A && B &` subshell-wait trap: bash forks a 732 # subshell for the compound that then waits for an infinite B (a 733 # server, `yes > /dev/null`, etc.), leaking the subshell forever. 734 # Rewriting to `A && { B & }` runs B as a plain background in the 735 # current shell — no subshell wait. 736 from tools.terminal_tool import _rewrite_compound_background 737 exec_command = _rewrite_compound_background(exec_command) 738 effective_timeout = timeout or self.timeout 739 effective_cwd = cwd or self.cwd 740 741 # Merge sudo stdin with caller stdin 742 if sudo_stdin is not None and stdin_data is not None: 743 effective_stdin = sudo_stdin + stdin_data 744 elif sudo_stdin is not None: 745 effective_stdin = sudo_stdin 746 else: 747 effective_stdin = stdin_data 748 749 # Embed stdin as heredoc for backends that need it 750 if effective_stdin and self._stdin_mode == "heredoc": 751 exec_command = self._embed_stdin_heredoc(exec_command, effective_stdin) 752 effective_stdin = None 753 754 wrapped = self._wrap_command(exec_command, effective_cwd) 755 756 # Use login shell if snapshot failed (so user's profile still loads) 757 login = not self._snapshot_ready 758 759 proc = self._run_bash( 760 wrapped, login=login, timeout=effective_timeout, stdin_data=effective_stdin 761 ) 762 result = self._wait_for_process(proc, timeout=effective_timeout) 763 self._update_cwd(result) 764 765 return result 766 767 # ------------------------------------------------------------------ 768 # Shared helpers 769 # ------------------------------------------------------------------ 770 771 def stop(self): 772 """Alias for cleanup (compat with older callers).""" 773 self.cleanup() 774 775 def __del__(self): 776 try: 777 self.cleanup() 778 except Exception: 779 pass 780 781 def _prepare_command(self, command: str) -> tuple[str, str | None]: 782 """Transform sudo commands if SUDO_PASSWORD is available.""" 783 from tools.terminal_tool import _transform_sudo_command 784 785 return _transform_sudo_command(command) 786