process_registry.py
1 """ 2 Process Registry -- In-memory registry for managed background processes. 3 4 Tracks processes spawned via terminal(background=true), providing: 5 - Output buffering (rolling 200KB window) 6 - Status polling and log retrieval 7 - Blocking wait with interrupt support 8 - Process killing 9 - Crash recovery via JSON checkpoint file 10 - Session-scoped tracking for gateway reset protection 11 12 Background processes execute THROUGH the environment interface -- nothing 13 runs on the host machine unless TERMINAL_ENV=local. For Docker, Singularity, 14 Modal, Daytona, and SSH backends, the command runs inside the sandbox. 15 16 Usage: 17 from tools.process_registry import process_registry 18 19 # Spawn a background process (called from terminal_tool) 20 session = process_registry.spawn(env, "pytest -v", task_id="task_123") 21 22 # Poll for status 23 result = process_registry.poll(session.id) 24 25 # Block until done 26 result = process_registry.wait(session.id, timeout=300) 27 28 # Kill it 29 process_registry.kill(session.id) 30 """ 31 32 import json 33 import logging 34 import os 35 import platform 36 import shlex 37 import signal 38 import subprocess 39 import threading 40 import time 41 import uuid 42 43 _IS_WINDOWS = platform.system() == "Windows" 44 from tools.environments.local import _find_shell, _sanitize_subprocess_env 45 from dataclasses import dataclass, field 46 from typing import Any, Dict, List, Optional 47 48 from hermes_cli.config import get_hermes_home 49 50 logger = logging.getLogger(__name__) 51 52 53 # Checkpoint file for crash recovery (gateway only) 54 CHECKPOINT_PATH = get_hermes_home() / "processes.json" 55 56 # Limits 57 MAX_OUTPUT_CHARS = 200_000 # 200KB rolling output buffer 58 FINISHED_TTL_SECONDS = 1800 # Keep finished processes for 30 minutes 59 MAX_PROCESSES = 64 # Max concurrent tracked processes (LRU pruning) 60 61 # Watch pattern rate limiting — PER SESSION. 62 # Hard rule: at most ONE watch-match notification every WATCH_MIN_INTERVAL_SECONDS. 63 # Any match arriving inside that cooldown window is dropped and counted as a strike. 64 # After WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns for that 65 # session is permanently disabled and the session falls back to notify_on_complete 66 # semantics (one notification when the process actually exits). 67 WATCH_MIN_INTERVAL_SECONDS = 15 # Minimum spacing between consecutive watch matches 68 WATCH_STRIKE_LIMIT = 3 # Strikes in a row → disable watch + promote to notify_on_complete 69 70 # Global circuit breaker — across all sessions. Secondary safety net so concurrent 71 # siblings can't collectively flood the user even when each is under its own cap. 72 WATCH_GLOBAL_MAX_PER_WINDOW = 15 73 WATCH_GLOBAL_WINDOW_SECONDS = 10 74 WATCH_GLOBAL_COOLDOWN_SECONDS = 30 75 76 77 def format_uptime_short(seconds: int) -> str: 78 s = max(0, int(seconds)) 79 if s < 60: 80 return f"{s}s" 81 mins, secs = divmod(s, 60) 82 if mins < 60: 83 return f"{mins}m {secs}s" 84 hours, mins = divmod(mins, 60) 85 return f"{hours}h {mins}m" 86 87 88 @dataclass 89 class ProcessSession: 90 """A tracked background process with output buffering.""" 91 id: str # Unique session ID ("proc_xxxxxxxxxxxx") 92 command: str # Original command string 93 task_id: str = "" # Task/sandbox isolation key 94 session_key: str = "" # Gateway session key (for reset protection) 95 pid: Optional[int] = None # OS process ID 96 process: Optional[subprocess.Popen] = None # Popen handle (local only) 97 env_ref: Any = None # Reference to the environment object 98 cwd: Optional[str] = None # Working directory 99 started_at: float = 0.0 # time.time() of spawn 100 exited: bool = False # Whether the process has finished 101 exit_code: Optional[int] = None # Exit code (None if still running) 102 output_buffer: str = "" # Rolling output (last MAX_OUTPUT_CHARS) 103 max_output_chars: int = MAX_OUTPUT_CHARS 104 detached: bool = False # True if recovered from crash (no pipe) 105 pid_scope: str = "host" # "host" for local/PTY PIDs, "sandbox" for env-local PIDs 106 # Watcher/notification metadata (persisted for crash recovery) 107 watcher_platform: str = "" 108 watcher_chat_id: str = "" 109 watcher_user_id: str = "" 110 watcher_user_name: str = "" 111 watcher_thread_id: str = "" 112 watcher_interval: int = 0 # 0 = no watcher configured 113 notify_on_complete: bool = False # Queue agent notification on exit 114 # Watch patterns — trigger agent notification when output matches any pattern 115 watch_patterns: List[str] = field(default_factory=list) 116 _watch_hits: int = field(default=0, repr=False) # total matches delivered 117 _watch_suppressed: int = field(default=0, repr=False) # matches dropped by rate limit 118 _watch_disabled: bool = field(default=False, repr=False) # permanently killed after strike limit 119 # Per-session rate limit state: at most one match every WATCH_MIN_INTERVAL_SECONDS. 120 # When an emission happens, _watch_cooldown_until is set to now + interval and 121 # _watch_strike_candidate becomes True. The next match to arrive before that 122 # deadline counts as one strike (regardless of how many matches were dropped in 123 # between — a strike is a window, not a match). After WATCH_STRIKE_LIMIT strikes 124 # in a row, watch_patterns is disabled and the session promotes to 125 # notify_on_complete. 126 _watch_last_emit_at: float = field(default=0.0, repr=False) 127 _watch_cooldown_until: float = field(default=0.0, repr=False) 128 _watch_strike_candidate: bool = field(default=False, repr=False) 129 _watch_consecutive_strikes: int = field(default=0, repr=False) 130 _lock: threading.Lock = field(default_factory=threading.Lock) 131 _reader_thread: Optional[threading.Thread] = field(default=None, repr=False) 132 _pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True) 133 134 135 class ProcessRegistry: 136 """ 137 In-memory registry of running and finished background processes. 138 139 Thread-safe. Accessed from: 140 - Executor threads (terminal_tool, process tool handlers) 141 - Gateway asyncio loop (watcher tasks, session reset checks) 142 - Cleanup thread (sandbox reaping coordination) 143 """ 144 145 _SHELL_NOISE_SUBSTRINGS = ( 146 "bash: cannot set terminal process group", 147 "bash: no job control in this shell", 148 "no job control in this shell", 149 "cannot set terminal process group", 150 "tcsetattr: Inappropriate ioctl for device", 151 ) 152 153 def __init__(self): 154 self._running: Dict[str, ProcessSession] = {} 155 self._finished: Dict[str, ProcessSession] = {} 156 self._lock = threading.Lock() 157 158 # Side-channel for check_interval watchers (gateway reads after agent run) 159 self.pending_watchers: List[Dict[str, Any]] = [] 160 161 # Notification queue — unified queue for all background process events. 162 # Completion notifications (notify_on_complete) and watch pattern matches 163 # both land here, distinguished by "type" field. CLI process_loop and 164 # gateway drain this after each agent turn to auto-trigger new turns. 165 import queue as _queue_mod 166 self.completion_queue: _queue_mod.Queue = _queue_mod.Queue() 167 168 # Track sessions whose completion was already consumed by the agent 169 # via wait/poll/log. Drain loops skip notifications for these. 170 self._completion_consumed: set = set() 171 172 # Global watch-match circuit breaker — across all sessions. 173 # Prevents sibling processes from collectively flooding the user even 174 # when each stays under its own per-session cap. 175 self._global_watch_lock = threading.Lock() 176 self._global_watch_window_start: float = 0.0 177 self._global_watch_window_hits: int = 0 178 self._global_watch_tripped_until: float = 0.0 179 self._global_watch_suppressed_during_trip: int = 0 180 181 @staticmethod 182 def _clean_shell_noise(text: str) -> str: 183 """Strip shell startup warnings from the beginning of output.""" 184 lines = text.split("\n") 185 while lines and any(noise in lines[0] for noise in ProcessRegistry._SHELL_NOISE_SUBSTRINGS): 186 lines.pop(0) 187 return "\n".join(lines) 188 189 def _check_watch_patterns(self, session: ProcessSession, new_text: str) -> None: 190 """Scan new output for watch patterns and queue notifications. 191 192 Called from reader threads with new_text being the freshly-read chunk. 193 194 Per-session rate limit: at most ONE watch-match notification per 195 WATCH_MIN_INTERVAL_SECONDS. Any match arriving inside the cooldown 196 window is dropped and counts as ONE strike for that window. After 197 WATCH_STRIKE_LIMIT consecutive strike windows, watch_patterns is 198 disabled for this session and the session is promoted to 199 notify_on_complete semantics — one notification when the process 200 actually exits, no more mid-process spam. 201 """ 202 if not session.watch_patterns or session._watch_disabled: 203 return 204 # Suppress-after-exit: once the reader loop has declared the process 205 # exited, any late chunk we still see is post-exit noise. Dropping these 206 # prevents the "stale notifications delivered minutes after the process 207 # ended" spam when completion_queue consumers run async. 208 if session.exited: 209 return 210 211 # Scan new text line-by-line for pattern matches 212 matched_lines = [] 213 matched_pattern = None 214 for line in new_text.splitlines(): 215 for pat in session.watch_patterns: 216 if pat in line: 217 matched_lines.append(line.rstrip()) 218 if matched_pattern is None: 219 matched_pattern = pat 220 break # one match per line is enough 221 222 if not matched_lines: 223 return 224 225 now = time.time() 226 should_disable = False 227 with session._lock: 228 # Case 1: still inside the cooldown from the last emission. 229 # Count this as a strike for the current window (only once per window) 230 # and drop the event. If we've hit the strike limit, disable watch 231 # and promote to notify_on_complete. 232 if session._watch_cooldown_until and now < session._watch_cooldown_until: 233 session._watch_suppressed += len(matched_lines) 234 if not session._watch_strike_candidate: 235 # First drop in this window — count one strike. 236 session._watch_strike_candidate = True 237 session._watch_consecutive_strikes += 1 238 if session._watch_consecutive_strikes >= WATCH_STRIKE_LIMIT: 239 session._watch_disabled = True 240 # Promote to notify_on_complete so the agent still gets 241 # exactly one notification when the process actually ends. 242 session.notify_on_complete = True 243 should_disable = True 244 return_early = True 245 else: 246 # Case 2: cooldown has expired. 247 # Decide whether this window was a "clean" one (no drops) or a 248 # strike window. If no strike candidate was set during the prior 249 # cooldown, reset the consecutive-strike counter — we're back to 250 # healthy emission cadence. 251 if ( 252 session._watch_cooldown_until 253 and not session._watch_strike_candidate 254 ): 255 session._watch_consecutive_strikes = 0 256 session._watch_strike_candidate = False 257 258 # Emit the notification and start a new cooldown window. 259 session._watch_last_emit_at = now 260 session._watch_cooldown_until = now + WATCH_MIN_INTERVAL_SECONDS 261 session._watch_hits += 1 262 suppressed = session._watch_suppressed 263 session._watch_suppressed = 0 264 return_early = False 265 266 if return_early: 267 if should_disable: 268 # Emit exactly one "watch disabled, falling back to notify_on_complete" 269 # summary event so the agent/user sees why things went quiet. 270 self.completion_queue.put({ 271 "session_id": session.id, 272 "session_key": session.session_key, 273 "command": session.command, 274 "type": "watch_disabled", 275 "suppressed": session._watch_suppressed, 276 "platform": session.watcher_platform, 277 "chat_id": session.watcher_chat_id, 278 "user_id": session.watcher_user_id, 279 "user_name": session.watcher_user_name, 280 "thread_id": session.watcher_thread_id, 281 "message": ( 282 f"Watch patterns disabled for process {session.id} — " 283 f"{WATCH_STRIKE_LIMIT} consecutive rate-limit windows triggered " 284 f"(min spacing {WATCH_MIN_INTERVAL_SECONDS}s). " 285 f"Falling back to notify_on_complete semantics; you'll get " 286 f"exactly one notification when the process exits." 287 ), 288 }) 289 return 290 291 # Trim matched output to a reasonable size 292 output = "\n".join(matched_lines[:20]) 293 if len(output) > 2000: 294 output = output[:2000] + "\n...(truncated)" 295 296 # Global circuit breaker — across all sessions (secondary safety net). 297 if not self._global_watch_admit(now): 298 return 299 300 self.completion_queue.put({ 301 "session_id": session.id, 302 "session_key": session.session_key, 303 "command": session.command, 304 "type": "watch_match", 305 "pattern": matched_pattern, 306 "output": output, 307 "suppressed": suppressed, 308 "platform": session.watcher_platform, 309 "chat_id": session.watcher_chat_id, 310 "user_id": session.watcher_user_id, 311 "user_name": session.watcher_user_name, 312 "thread_id": session.watcher_thread_id, 313 }) 314 315 def _global_watch_admit(self, now: float) -> bool: 316 """Return True if this watch_match event is allowed through the global breaker. 317 318 Semantics: 319 - If we're currently in a cooldown period, drop the event and count it. 320 - Otherwise, slide the rolling window and check the global cap. 321 - If the cap is exceeded, trip the breaker for WATCH_GLOBAL_COOLDOWN_SECONDS 322 and emit ONE summary event so the agent/user sees "N notifications were 323 suppressed" instead of getting them individually. 324 - When the cooldown ends, emit a release summary and reset counters. 325 """ 326 with self._global_watch_lock: 327 # Handle cooldown expiry first so we can emit the release summary. 328 if self._global_watch_tripped_until and now >= self._global_watch_tripped_until: 329 suppressed = self._global_watch_suppressed_during_trip 330 self._global_watch_tripped_until = 0.0 331 self._global_watch_suppressed_during_trip = 0 332 self._global_watch_window_start = now 333 self._global_watch_window_hits = 0 334 if suppressed > 0: 335 # Queue a summary event outside the lock (below). 336 release_msg = { 337 "session_id": "", 338 "session_key": "", 339 "command": "", 340 "type": "watch_overflow_released", 341 "suppressed": suppressed, 342 "message": ( 343 f"Watch-pattern notifications resumed. " 344 f"{suppressed} match event(s) were suppressed during the flood." 345 ), 346 "platform": "", 347 "chat_id": "", 348 "user_id": "", 349 "user_name": "", 350 "thread_id": "", 351 } 352 else: 353 release_msg = None 354 else: 355 release_msg = None 356 357 # Still in cooldown — drop and count. 358 if self._global_watch_tripped_until and now < self._global_watch_tripped_until: 359 self._global_watch_suppressed_during_trip += 1 360 admit = False 361 trip_now = None 362 else: 363 # Slide the window. 364 if now - self._global_watch_window_start >= WATCH_GLOBAL_WINDOW_SECONDS: 365 self._global_watch_window_start = now 366 self._global_watch_window_hits = 0 367 368 if self._global_watch_window_hits >= WATCH_GLOBAL_MAX_PER_WINDOW: 369 # Trip the breaker. 370 self._global_watch_tripped_until = now + WATCH_GLOBAL_COOLDOWN_SECONDS 371 self._global_watch_suppressed_during_trip += 1 372 trip_now = now 373 admit = False 374 else: 375 self._global_watch_window_hits += 1 376 trip_now = None 377 admit = True 378 379 # Queue summary events outside the lock. 380 if release_msg is not None: 381 self.completion_queue.put(release_msg) 382 if trip_now is not None: 383 self.completion_queue.put({ 384 "session_id": "", 385 "session_key": "", 386 "command": "", 387 "type": "watch_overflow_tripped", 388 "message": ( 389 f"Watch-pattern overflow: >{WATCH_GLOBAL_MAX_PER_WINDOW} " 390 f"notifications in {WATCH_GLOBAL_WINDOW_SECONDS}s across all processes. " 391 f"Suppressing further watch_match events for " 392 f"{WATCH_GLOBAL_COOLDOWN_SECONDS}s." 393 ), 394 "platform": "", 395 "chat_id": "", 396 "user_id": "", 397 "user_name": "", 398 "thread_id": "", 399 }) 400 return admit 401 402 @staticmethod 403 def _is_host_pid_alive(pid: Optional[int]) -> bool: 404 """Best-effort liveness check for host-visible PIDs.""" 405 if not pid: 406 return False 407 try: 408 os.kill(pid, 0) 409 return True 410 except (ProcessLookupError, PermissionError): 411 return False 412 413 def _refresh_detached_session(self, session: Optional[ProcessSession]) -> Optional[ProcessSession]: 414 """Update recovered host-PID sessions when the underlying process has exited.""" 415 if session is None or session.exited or not session.detached or session.pid_scope != "host": 416 return session 417 418 if self._is_host_pid_alive(session.pid): 419 return session 420 421 with session._lock: 422 if session.exited: 423 return session 424 session.exited = True 425 # Recovered sessions no longer have a waitable handle, so the real 426 # exit code is unavailable once the original process object is gone. 427 session.exit_code = None 428 429 self._move_to_finished(session) 430 return session 431 432 @staticmethod 433 def _terminate_host_pid(pid: int) -> None: 434 """Terminate a host-visible PID without requiring the original process handle.""" 435 if _IS_WINDOWS: 436 os.kill(pid, signal.SIGTERM) 437 return 438 439 try: 440 os.killpg(os.getpgid(pid), signal.SIGTERM) 441 except (OSError, ProcessLookupError, PermissionError): 442 os.kill(pid, signal.SIGTERM) 443 444 # ----- Spawn ----- 445 446 @staticmethod 447 def _env_temp_dir(env: Any) -> str: 448 """Return the writable sandbox temp dir for env-backed background tasks.""" 449 get_temp_dir = getattr(env, "get_temp_dir", None) 450 if callable(get_temp_dir): 451 try: 452 temp_dir = get_temp_dir() 453 if isinstance(temp_dir, str) and temp_dir.startswith("/"): 454 return temp_dir.rstrip("/") or "/" 455 except Exception as exc: 456 logger.debug("Could not resolve environment temp dir: %s", exc) 457 return "/tmp" 458 459 def spawn_local( 460 self, 461 command: str, 462 cwd: str = None, 463 task_id: str = "", 464 session_key: str = "", 465 env_vars: dict = None, 466 use_pty: bool = False, 467 ) -> ProcessSession: 468 """ 469 Spawn a background process locally. 470 471 Only for TERMINAL_ENV=local. Other backends use spawn_via_env(). 472 473 Args: 474 use_pty: If True, use a pseudo-terminal via ptyprocess for interactive 475 CLI tools (Codex, Claude Code, Python REPL). Falls back to 476 subprocess.Popen if ptyprocess is not installed. 477 """ 478 session = ProcessSession( 479 id=f"proc_{uuid.uuid4().hex[:12]}", 480 command=command, 481 task_id=task_id, 482 session_key=session_key, 483 cwd=cwd or os.getcwd(), 484 started_at=time.time(), 485 ) 486 487 if use_pty: 488 # Try PTY mode for interactive CLI tools 489 try: 490 if _IS_WINDOWS: 491 from winpty import PtyProcess as _PtyProcessCls 492 else: 493 from ptyprocess import PtyProcess as _PtyProcessCls 494 user_shell = _find_shell() 495 pty_env = _sanitize_subprocess_env(os.environ, env_vars) 496 pty_env["PYTHONUNBUFFERED"] = "1" 497 pty_proc = _PtyProcessCls.spawn( 498 [user_shell, "-lic", f"set +m; {command}"], 499 cwd=session.cwd, 500 env=pty_env, 501 dimensions=(30, 120), 502 ) 503 session.pid = pty_proc.pid 504 # Store the pty handle on the session for read/write 505 session._pty = pty_proc 506 507 # PTY reader thread 508 reader = threading.Thread( 509 target=self._pty_reader_loop, 510 args=(session,), 511 daemon=True, 512 name=f"proc-pty-reader-{session.id}", 513 ) 514 session._reader_thread = reader 515 reader.start() 516 517 with self._lock: 518 self._prune_if_needed() 519 self._running[session.id] = session 520 521 self._write_checkpoint() 522 return session 523 524 except ImportError: 525 logger.warning("ptyprocess not installed, falling back to pipe mode") 526 except Exception as e: 527 logger.warning("PTY spawn failed (%s), falling back to pipe mode", e) 528 529 # Standard Popen path (non-PTY or PTY fallback) 530 # Use the user's login shell for consistency with LocalEnvironment -- 531 # ensures rc files are sourced and user tools are available. 532 user_shell = _find_shell() 533 # Force unbuffered output for Python scripts so progress is visible 534 # during background execution (libraries like tqdm/datasets buffer when 535 # stdout is a pipe, hiding output from process(action="poll")). 536 bg_env = _sanitize_subprocess_env(os.environ, env_vars) 537 bg_env["PYTHONUNBUFFERED"] = "1" 538 proc = subprocess.Popen( 539 [user_shell, "-lic", f"set +m; {command}"], 540 text=True, 541 cwd=session.cwd, 542 env=bg_env, 543 encoding="utf-8", 544 errors="replace", 545 stdout=subprocess.PIPE, 546 stderr=subprocess.STDOUT, 547 stdin=subprocess.PIPE, 548 preexec_fn=None if _IS_WINDOWS else os.setsid, 549 ) 550 551 session.process = proc 552 session.pid = proc.pid 553 554 # Start output reader thread 555 reader = threading.Thread( 556 target=self._reader_loop, 557 args=(session,), 558 daemon=True, 559 name=f"proc-reader-{session.id}", 560 ) 561 session._reader_thread = reader 562 reader.start() 563 564 with self._lock: 565 self._prune_if_needed() 566 self._running[session.id] = session 567 568 self._write_checkpoint() 569 return session 570 571 def spawn_via_env( 572 self, 573 env: Any, 574 command: str, 575 cwd: str = None, 576 task_id: str = "", 577 session_key: str = "", 578 timeout: int = 10, 579 ) -> ProcessSession: 580 """ 581 Spawn a background process through a non-local environment backend. 582 583 For Docker/Singularity/Modal/Daytona/SSH: runs the command inside the sandbox 584 using the environment's execute() interface. We wrap the command to 585 capture the in-sandbox PID and redirect output to a log file inside 586 the sandbox, then poll the log via subsequent execute() calls. 587 588 This is less capable than local spawn (no live stdout pipe, no stdin), 589 but it ensures the command runs in the correct sandbox context. 590 """ 591 session = ProcessSession( 592 id=f"proc_{uuid.uuid4().hex[:12]}", 593 command=command, 594 task_id=task_id, 595 session_key=session_key, 596 cwd=cwd, 597 started_at=time.time(), 598 env_ref=env, 599 pid_scope="sandbox", 600 ) 601 602 # Run the command in the sandbox with output capture 603 temp_dir = self._env_temp_dir(env) 604 log_path = f"{temp_dir}/hermes_bg_{session.id}.log" 605 pid_path = f"{temp_dir}/hermes_bg_{session.id}.pid" 606 exit_path = f"{temp_dir}/hermes_bg_{session.id}.exit" 607 quoted_command = shlex.quote(command) 608 quoted_temp_dir = shlex.quote(temp_dir) 609 quoted_log_path = shlex.quote(log_path) 610 quoted_pid_path = shlex.quote(pid_path) 611 quoted_exit_path = shlex.quote(exit_path) 612 bg_command = ( 613 f"mkdir -p {quoted_temp_dir} && " 614 f"( nohup bash -lc {quoted_command} > {quoted_log_path} 2>&1; " 615 f"rc=$?; printf '%s\\n' \"$rc\" > {quoted_exit_path} ) & " 616 f"echo $! > {quoted_pid_path} && cat {quoted_pid_path}" 617 ) 618 619 try: 620 result = env.execute(bg_command, timeout=timeout) 621 output = result.get("output", "").strip() 622 # Try to extract the PID from the output 623 for line in output.splitlines(): 624 line = line.strip() 625 if line.isdigit(): 626 session.pid = int(line) 627 break 628 except Exception as e: 629 session.exited = True 630 session.exit_code = -1 631 session.output_buffer = f"Failed to start: {e}" 632 633 if not session.exited: 634 # Start a poller thread that periodically reads the log file 635 reader = threading.Thread( 636 target=self._env_poller_loop, 637 args=(session, env, log_path, pid_path, exit_path), 638 daemon=True, 639 name=f"proc-poller-{session.id}", 640 ) 641 session._reader_thread = reader 642 reader.start() 643 644 with self._lock: 645 self._prune_if_needed() 646 self._running[session.id] = session 647 648 self._write_checkpoint() 649 return session 650 651 # ----- Reader / Poller Threads ----- 652 653 def _reader_loop(self, session: ProcessSession): 654 """Background thread: read stdout from a local Popen process.""" 655 first_chunk = True 656 try: 657 while True: 658 chunk = session.process.stdout.read(4096) 659 if not chunk: 660 break 661 if first_chunk: 662 chunk = self._clean_shell_noise(chunk) 663 first_chunk = False 664 with session._lock: 665 session.output_buffer += chunk 666 if len(session.output_buffer) > session.max_output_chars: 667 session.output_buffer = session.output_buffer[-session.max_output_chars:] 668 self._check_watch_patterns(session, chunk) 669 except Exception as e: 670 logger.debug("Process stdout reader ended: %s", e) 671 finally: 672 # Always reap the child to prevent zombie processes. 673 try: 674 session.process.wait(timeout=5) 675 except Exception as e: 676 logger.debug("Process wait timed out or failed: %s", e) 677 session.exited = True 678 session.exit_code = session.process.returncode 679 self._move_to_finished(session) 680 681 def _env_poller_loop( 682 self, session: ProcessSession, env: Any, log_path: str, pid_path: str, exit_path: str 683 ): 684 """Background thread: poll a sandbox log file for non-local backends.""" 685 quoted_log_path = shlex.quote(log_path) 686 quoted_pid_path = shlex.quote(pid_path) 687 quoted_exit_path = shlex.quote(exit_path) 688 prev_output_len = 0 # track delta for watch pattern scanning 689 while not session.exited: 690 time.sleep(2) # Poll every 2 seconds 691 try: 692 # Read new output from the log file 693 result = env.execute(f"cat {quoted_log_path} 2>/dev/null", timeout=10) 694 new_output = result.get("output", "") 695 if new_output: 696 # Compute delta for watch pattern scanning 697 delta = new_output[prev_output_len:] if len(new_output) > prev_output_len else "" 698 prev_output_len = len(new_output) 699 with session._lock: 700 session.output_buffer = new_output 701 if len(session.output_buffer) > session.max_output_chars: 702 session.output_buffer = session.output_buffer[-session.max_output_chars:] 703 if delta: 704 self._check_watch_patterns(session, delta) 705 706 # Check if process is still running 707 check = env.execute( 708 f"kill -0 \"$(cat {quoted_pid_path} 2>/dev/null)\" 2>/dev/null; echo $?", 709 timeout=5, 710 ) 711 check_output = check.get("output", "").strip() 712 if check_output and check_output.splitlines()[-1].strip() != "0": 713 # Process has exited -- get exit code captured by the wrapper shell. 714 exit_result = env.execute( 715 f"cat {quoted_exit_path} 2>/dev/null", 716 timeout=5, 717 ) 718 exit_str = exit_result.get("output", "").strip() 719 try: 720 session.exit_code = int(exit_str.splitlines()[-1].strip()) 721 except (ValueError, IndexError): 722 session.exit_code = -1 723 session.exited = True 724 self._move_to_finished(session) 725 return 726 727 except Exception: 728 # Environment might be gone (sandbox reaped, etc.) 729 session.exited = True 730 session.exit_code = -1 731 self._move_to_finished(session) 732 return 733 734 def _pty_reader_loop(self, session: ProcessSession): 735 """Background thread: read output from a PTY process.""" 736 pty = session._pty 737 try: 738 while pty.isalive(): 739 try: 740 chunk = pty.read(4096) 741 if chunk: 742 # ptyprocess returns bytes 743 text = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace") 744 with session._lock: 745 session.output_buffer += text 746 if len(session.output_buffer) > session.max_output_chars: 747 session.output_buffer = session.output_buffer[-session.max_output_chars:] 748 self._check_watch_patterns(session, text) 749 except EOFError: 750 break 751 except Exception: 752 break 753 except Exception as e: 754 logger.debug("PTY stdout reader ended: %s", e) 755 756 # Process exited 757 try: 758 pty.wait() 759 except Exception as e: 760 logger.debug("PTY wait timed out or failed: %s", e) 761 session.exited = True 762 session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1 763 self._move_to_finished(session) 764 765 def _move_to_finished(self, session: ProcessSession): 766 """Move a session from running to finished. 767 768 Idempotent: if the session was already moved (e.g. kill_process raced 769 with the reader thread), the second call is a no-op — no duplicate 770 completion notification is enqueued. 771 """ 772 with self._lock: 773 was_running = self._running.pop(session.id, None) is not None 774 self._finished[session.id] = session 775 self._write_checkpoint() 776 777 # Only enqueue completion notification on the FIRST move. Without 778 # this guard, kill_process() and the reader thread can both call 779 # _move_to_finished(), producing duplicate [IMPORTANT: ...] messages. 780 if was_running and session.notify_on_complete: 781 from tools.ansi_strip import strip_ansi 782 output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else "" 783 self.completion_queue.put({ 784 "type": "completion", 785 "session_id": session.id, 786 "command": session.command, 787 "exit_code": session.exit_code, 788 "output": output_tail, 789 }) 790 791 # ----- Query Methods ----- 792 793 def is_completion_consumed(self, session_id: str) -> bool: 794 """Check if a completion notification was already consumed via wait/poll/log.""" 795 return session_id in self._completion_consumed 796 797 def get(self, session_id: str) -> Optional[ProcessSession]: 798 """Get a session by ID (running or finished).""" 799 with self._lock: 800 session = self._running.get(session_id) or self._finished.get(session_id) 801 return self._refresh_detached_session(session) 802 803 def _reconcile_local_exit(self, session: "ProcessSession") -> None: 804 """Reconcile session.exited against the real child process state. 805 806 The reader thread (`_reader_loop`) sets `session.exited = True` only 807 in its `finally` block, which runs when `stdout.read()` returns EOF. 808 If the direct `Popen` child has exited but a descendant process (e.g. 809 a daemon spawned by `hermes update` restarting the gateway) is still 810 holding the stdout pipe open, the reader blocks forever and poll() 811 keeps returning "running" indefinitely (issue #17327 — 74 polls over 812 7 minutes on Feishu). 813 814 This helper closes that window: when `session.exited` is still False 815 but the direct child's `Popen.poll()` reports an exit code, drain any 816 readable bytes non-blocking and flip `session.exited`. The orphaned 817 reader thread remains stuck on its blocking `read()` but is a daemon 818 thread and will be reaped with the process. 819 820 Safe no-op on sessions without a local `Popen` (env/PTY), already- 821 exited sessions, and detached-recovered sessions. 822 """ 823 if session is None or session.exited: 824 return 825 proc = getattr(session, "process", None) 826 if proc is None: 827 return 828 try: 829 rc = proc.poll() 830 except Exception: 831 return 832 if rc is None: 833 return # Direct child still running — reader block is legitimate. 834 835 # Direct child exited. Try to drain any bytes the reader hasn't 836 # consumed yet. This is best-effort: if the pipe is held open by a 837 # descendant, the non-blocking read returns what's immediately 838 # available and we stop. 839 drained = "" 840 stdout = getattr(proc, "stdout", None) 841 if stdout is not None and not _IS_WINDOWS: 842 try: 843 import fcntl 844 fd = stdout.fileno() 845 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 846 fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 847 try: 848 chunk = stdout.read() 849 if chunk: 850 drained = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace") 851 except (BlockingIOError, OSError, ValueError): 852 pass 853 finally: 854 try: 855 fcntl.fcntl(fd, fcntl.F_SETFL, flags) 856 except Exception: 857 pass 858 except Exception as e: 859 logger.debug("Non-blocking drain failed for %s: %s", session.id, e) 860 861 with session._lock: 862 if drained: 863 session.output_buffer += drained 864 if len(session.output_buffer) > session.max_output_chars: 865 session.output_buffer = session.output_buffer[-session.max_output_chars:] 866 session.exited = True 867 session.exit_code = rc 868 logger.info( 869 "Reconciled session %s: direct child exited with code %s but reader " 870 "was still blocked (orphaned pipe). Flipped to exited.", 871 session.id, rc, 872 ) 873 self._move_to_finished(session) 874 875 def poll(self, session_id: str) -> dict: 876 """Check status and get new output for a background process.""" 877 from tools.ansi_strip import strip_ansi 878 879 session = self.get(session_id) 880 if session is None: 881 return {"status": "not_found", "error": f"No process with ID {session_id}"} 882 883 # Reconcile against real child state before reading session.exited. 884 # Guards against orphaned-pipe reader hangs (issue #17327). 885 self._reconcile_local_exit(session) 886 887 with session._lock: 888 output_preview = strip_ansi(session.output_buffer[-1000:]) if session.output_buffer else "" 889 890 result = { 891 "session_id": session.id, 892 "command": session.command, 893 "status": "exited" if session.exited else "running", 894 "pid": session.pid, 895 "uptime_seconds": int(time.time() - session.started_at), 896 "output_preview": output_preview, 897 } 898 if session.exited: 899 result["exit_code"] = session.exit_code 900 self._completion_consumed.add(session_id) 901 if session.detached: 902 result["detached"] = True 903 result["note"] = "Process recovered after restart -- output history unavailable" 904 return result 905 906 def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict: 907 """Read the full output log with optional pagination by lines.""" 908 from tools.ansi_strip import strip_ansi 909 910 session = self.get(session_id) 911 if session is None: 912 return {"status": "not_found", "error": f"No process with ID {session_id}"} 913 914 with session._lock: 915 full_output = strip_ansi(session.output_buffer) 916 917 lines = full_output.splitlines() 918 total_lines = len(lines) 919 920 # Default: last N lines 921 if offset == 0 and limit > 0: 922 selected = lines[-limit:] 923 else: 924 selected = lines[offset:offset + limit] 925 926 result = { 927 "session_id": session.id, 928 "status": "exited" if session.exited else "running", 929 "output": "\n".join(selected), 930 "total_lines": total_lines, 931 "showing": f"{len(selected)} lines", 932 } 933 if session.exited: 934 self._completion_consumed.add(session_id) 935 return result 936 937 def wait(self, session_id: str, timeout: int = None) -> dict: 938 """ 939 Block until a process exits, timeout, or interrupt. 940 941 Args: 942 session_id: The process to wait for. 943 timeout: Max seconds to block. Falls back to TERMINAL_TIMEOUT config. 944 945 Returns: 946 dict with status ("exited", "timeout", "interrupted", "not_found") 947 and output snapshot. 948 """ 949 from tools.ansi_strip import strip_ansi 950 from tools.interrupt import is_interrupted as _is_interrupted 951 952 try: 953 default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180")) 954 except (ValueError, TypeError): 955 default_timeout = 180 956 max_timeout = default_timeout 957 requested_timeout = timeout 958 timeout_note = None 959 960 if requested_timeout and requested_timeout > max_timeout: 961 effective_timeout = max_timeout 962 timeout_note = ( 963 f"Requested wait of {requested_timeout}s was clamped " 964 f"to configured limit of {max_timeout}s" 965 ) 966 else: 967 effective_timeout = requested_timeout or max_timeout 968 969 session = self.get(session_id) 970 if session is None: 971 return {"status": "not_found", "error": f"No process with ID {session_id}"} 972 973 deadline = time.monotonic() + effective_timeout 974 975 while time.monotonic() < deadline: 976 session = self._refresh_detached_session(session) 977 # Reconcile against real child state — guards against orphaned- 978 # pipe reader hangs where the reader is blocked but the direct 979 # child has already exited (issue #17327). 980 self._reconcile_local_exit(session) 981 if session.exited: 982 self._completion_consumed.add(session_id) 983 result = { 984 "status": "exited", 985 "exit_code": session.exit_code, 986 "output": strip_ansi(session.output_buffer[-2000:]), 987 } 988 if timeout_note: 989 result["timeout_note"] = timeout_note 990 return result 991 992 if _is_interrupted(): 993 result = { 994 "status": "interrupted", 995 "output": strip_ansi(session.output_buffer[-1000:]), 996 "note": "User sent a new message -- wait interrupted", 997 } 998 if timeout_note: 999 result["timeout_note"] = timeout_note 1000 return result 1001 1002 time.sleep(1) 1003 1004 result = { 1005 "status": "timeout", 1006 "output": strip_ansi(session.output_buffer[-1000:]), 1007 } 1008 if timeout_note: 1009 result["timeout_note"] = timeout_note 1010 else: 1011 result["timeout_note"] = f"Waited {effective_timeout}s, process still running" 1012 return result 1013 1014 def kill_process(self, session_id: str) -> dict: 1015 """Kill a background process.""" 1016 session = self.get(session_id) 1017 if session is None: 1018 return {"status": "not_found", "error": f"No process with ID {session_id}"} 1019 1020 if session.exited: 1021 return { 1022 "status": "already_exited", 1023 "exit_code": session.exit_code, 1024 } 1025 1026 # Kill via PTY, Popen (local), or env execute (non-local) 1027 try: 1028 if session._pty: 1029 # PTY process -- terminate via ptyprocess 1030 try: 1031 session._pty.terminate(force=True) 1032 except Exception: 1033 if session.pid: 1034 os.kill(session.pid, signal.SIGTERM) 1035 elif session.process: 1036 # Local process -- kill the process group 1037 try: 1038 if _IS_WINDOWS: 1039 session.process.terminate() 1040 else: 1041 os.killpg(os.getpgid(session.process.pid), signal.SIGTERM) 1042 except (ProcessLookupError, PermissionError): 1043 session.process.kill() 1044 elif session.env_ref and session.pid: 1045 # Non-local -- kill inside sandbox 1046 session.env_ref.execute(f"kill {session.pid} 2>/dev/null", timeout=5) 1047 elif session.detached and session.pid_scope == "host" and session.pid: 1048 if not self._is_host_pid_alive(session.pid): 1049 with session._lock: 1050 session.exited = True 1051 session.exit_code = None 1052 self._move_to_finished(session) 1053 return { 1054 "status": "already_exited", 1055 "exit_code": session.exit_code, 1056 } 1057 self._terminate_host_pid(session.pid) 1058 else: 1059 return { 1060 "status": "error", 1061 "error": ( 1062 "Recovered process cannot be killed after restart because " 1063 "its original runtime handle is no longer available" 1064 ), 1065 } 1066 session.exited = True 1067 session.exit_code = -15 # SIGTERM 1068 self._move_to_finished(session) 1069 self._write_checkpoint() 1070 return {"status": "killed", "session_id": session.id} 1071 except Exception as e: 1072 return {"status": "error", "error": str(e)} 1073 1074 def write_stdin(self, session_id: str, data: str) -> dict: 1075 """Send raw data to a running process's stdin (no newline appended).""" 1076 session = self.get(session_id) 1077 if session is None: 1078 return {"status": "not_found", "error": f"No process with ID {session_id}"} 1079 if session.exited: 1080 return {"status": "already_exited", "error": "Process has already finished"} 1081 1082 # PTY mode -- write through pty handle (expects bytes) 1083 if hasattr(session, '_pty') and session._pty: 1084 try: 1085 pty_data = data.encode("utf-8") if isinstance(data, str) else data 1086 session._pty.write(pty_data) 1087 return {"status": "ok", "bytes_written": len(data)} 1088 except Exception as e: 1089 return {"status": "error", "error": str(e)} 1090 1091 # Popen mode -- write through stdin pipe 1092 if not session.process or not session.process.stdin: 1093 return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"} 1094 try: 1095 session.process.stdin.write(data) 1096 session.process.stdin.flush() 1097 return {"status": "ok", "bytes_written": len(data)} 1098 except Exception as e: 1099 return {"status": "error", "error": str(e)} 1100 1101 def submit_stdin(self, session_id: str, data: str = "") -> dict: 1102 """Send data + newline to a running process's stdin (like pressing Enter).""" 1103 return self.write_stdin(session_id, data + "\n") 1104 1105 def close_stdin(self, session_id: str) -> dict: 1106 """Close a running process's stdin / send EOF without killing the process.""" 1107 session = self.get(session_id) 1108 if session is None: 1109 return {"status": "not_found", "error": f"No process with ID {session_id}"} 1110 if session.exited: 1111 return {"status": "already_exited", "error": "Process has already finished"} 1112 1113 if hasattr(session, '_pty') and session._pty: 1114 try: 1115 session._pty.sendeof() 1116 return {"status": "ok", "message": "EOF sent"} 1117 except Exception as e: 1118 return {"status": "error", "error": str(e)} 1119 1120 if not session.process or not session.process.stdin: 1121 return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"} 1122 try: 1123 session.process.stdin.close() 1124 return {"status": "ok", "message": "stdin closed"} 1125 except Exception as e: 1126 return {"status": "error", "error": str(e)} 1127 1128 def list_sessions(self, task_id: str = None) -> list: 1129 """List all running and recently-finished processes.""" 1130 with self._lock: 1131 all_sessions = list(self._running.values()) + list(self._finished.values()) 1132 1133 all_sessions = [self._refresh_detached_session(s) for s in all_sessions] 1134 1135 if task_id: 1136 all_sessions = [s for s in all_sessions if s.task_id == task_id] 1137 1138 result = [] 1139 for s in all_sessions: 1140 entry = { 1141 "session_id": s.id, 1142 "command": s.command[:200], 1143 "cwd": s.cwd, 1144 "pid": s.pid, 1145 "started_at": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(s.started_at)), 1146 "uptime_seconds": int(time.time() - s.started_at), 1147 "status": "exited" if s.exited else "running", 1148 "output_preview": s.output_buffer[-200:] if s.output_buffer else "", 1149 } 1150 if s.exited: 1151 entry["exit_code"] = s.exit_code 1152 if s.detached: 1153 entry["detached"] = True 1154 result.append(entry) 1155 return result 1156 1157 # ----- Session/Task Queries (for gateway integration) ----- 1158 1159 def has_active_processes(self, task_id: str) -> bool: 1160 """Check if there are active (running) processes for a task_id.""" 1161 with self._lock: 1162 sessions = list(self._running.values()) 1163 1164 for session in sessions: 1165 self._refresh_detached_session(session) 1166 1167 with self._lock: 1168 return any( 1169 s.task_id == task_id and not s.exited 1170 for s in self._running.values() 1171 ) 1172 1173 def has_active_for_session(self, session_key: str) -> bool: 1174 """Check if there are active processes for a gateway session key.""" 1175 with self._lock: 1176 sessions = list(self._running.values()) 1177 1178 for session in sessions: 1179 self._refresh_detached_session(session) 1180 1181 with self._lock: 1182 return any( 1183 s.session_key == session_key and not s.exited 1184 for s in self._running.values() 1185 ) 1186 1187 def kill_all(self, task_id: str = None) -> int: 1188 """Kill all running processes, optionally filtered by task_id. Returns count killed.""" 1189 with self._lock: 1190 targets = [ 1191 s for s in self._running.values() 1192 if (task_id is None or s.task_id == task_id) and not s.exited 1193 ] 1194 1195 killed = 0 1196 for session in targets: 1197 result = self.kill_process(session.id) 1198 if result.get("status") in ("killed", "already_exited"): 1199 killed += 1 1200 return killed 1201 1202 # ----- Cleanup / Pruning ----- 1203 1204 def _prune_if_needed(self): 1205 """Remove oldest finished sessions if over MAX_PROCESSES. Must hold _lock.""" 1206 # First prune expired finished sessions 1207 now = time.time() 1208 expired = [ 1209 sid for sid, s in self._finished.items() 1210 if (now - s.started_at) > FINISHED_TTL_SECONDS 1211 ] 1212 for sid in expired: 1213 del self._finished[sid] 1214 self._completion_consumed.discard(sid) 1215 1216 # If still over limit, remove oldest finished 1217 total = len(self._running) + len(self._finished) 1218 if total >= MAX_PROCESSES and self._finished: 1219 oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at) 1220 del self._finished[oldest_id] 1221 self._completion_consumed.discard(oldest_id) 1222 1223 # Drop any _completion_consumed entries whose sessions are no longer 1224 # tracked at all — belt-and-suspenders against module-lifetime growth 1225 # on process-registry lookup paths that don't reach the dict prunes. 1226 tracked = self._running.keys() | self._finished.keys() 1227 stale = self._completion_consumed - tracked 1228 if stale: 1229 self._completion_consumed -= stale 1230 1231 # ----- Checkpoint (crash recovery) ----- 1232 1233 def _write_checkpoint(self): 1234 """Write running process metadata to checkpoint file atomically.""" 1235 try: 1236 with self._lock: 1237 entries = [] 1238 for s in self._running.values(): 1239 if not s.exited: 1240 entries.append({ 1241 "session_id": s.id, 1242 "command": s.command, 1243 "pid": s.pid, 1244 "pid_scope": s.pid_scope, 1245 "cwd": s.cwd, 1246 "started_at": s.started_at, 1247 "task_id": s.task_id, 1248 "session_key": s.session_key, 1249 "watcher_platform": s.watcher_platform, 1250 "watcher_chat_id": s.watcher_chat_id, 1251 "watcher_user_id": s.watcher_user_id, 1252 "watcher_user_name": s.watcher_user_name, 1253 "watcher_thread_id": s.watcher_thread_id, 1254 "watcher_interval": s.watcher_interval, 1255 "notify_on_complete": s.notify_on_complete, 1256 "watch_patterns": s.watch_patterns, 1257 }) 1258 1259 # Atomic write to avoid corruption on crash 1260 from utils import atomic_json_write 1261 atomic_json_write(CHECKPOINT_PATH, entries) 1262 except Exception as e: 1263 logger.debug("Failed to write checkpoint file: %s", e, exc_info=True) 1264 1265 def recover_from_checkpoint(self) -> int: 1266 """ 1267 On gateway startup, probe PIDs from checkpoint file. 1268 1269 Returns the number of processes recovered as detached. 1270 """ 1271 if not CHECKPOINT_PATH.exists(): 1272 return 0 1273 1274 try: 1275 entries = json.loads(CHECKPOINT_PATH.read_text(encoding="utf-8")) 1276 except Exception: 1277 return 0 1278 1279 recovered = 0 1280 for entry in entries: 1281 pid = entry.get("pid") 1282 if not pid: 1283 continue 1284 1285 pid_scope = entry.get("pid_scope", "host") 1286 if pid_scope != "host": 1287 # Sandbox-backed processes keep only in-sandbox PIDs in the 1288 # checkpoint, which are not meaningful to the restarted host 1289 # process once the original environment handle is gone. 1290 logger.info( 1291 "Skipping recovery for non-host process: %s (pid=%s, scope=%s)", 1292 entry.get("command", "unknown")[:60], 1293 pid, 1294 pid_scope, 1295 ) 1296 continue 1297 1298 # Check if PID is still alive 1299 alive = self._is_host_pid_alive(pid) 1300 1301 if alive: 1302 session = ProcessSession( 1303 id=entry["session_id"], 1304 command=entry.get("command", "unknown"), 1305 task_id=entry.get("task_id", ""), 1306 session_key=entry.get("session_key", ""), 1307 pid=pid, 1308 pid_scope=pid_scope, 1309 cwd=entry.get("cwd"), 1310 started_at=entry.get("started_at", time.time()), 1311 detached=True, # Can't read output, but can report status + kill 1312 watcher_platform=entry.get("watcher_platform", ""), 1313 watcher_chat_id=entry.get("watcher_chat_id", ""), 1314 watcher_user_id=entry.get("watcher_user_id", ""), 1315 watcher_user_name=entry.get("watcher_user_name", ""), 1316 watcher_thread_id=entry.get("watcher_thread_id", ""), 1317 watcher_interval=entry.get("watcher_interval", 0), 1318 notify_on_complete=entry.get("notify_on_complete", False), 1319 watch_patterns=entry.get("watch_patterns", []), 1320 ) 1321 with self._lock: 1322 self._running[session.id] = session 1323 recovered += 1 1324 logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid) 1325 1326 # Re-enqueue watcher so gateway can resume notifications 1327 if session.watcher_interval > 0: 1328 self.pending_watchers.append({ 1329 "session_id": session.id, 1330 "check_interval": session.watcher_interval, 1331 "session_key": session.session_key, 1332 "platform": session.watcher_platform, 1333 "chat_id": session.watcher_chat_id, 1334 "user_id": session.watcher_user_id, 1335 "user_name": session.watcher_user_name, 1336 "thread_id": session.watcher_thread_id, 1337 "notify_on_complete": session.notify_on_complete, 1338 }) 1339 1340 self._write_checkpoint() 1341 1342 return recovered 1343 1344 1345 # Module-level singleton 1346 process_registry = ProcessRegistry() 1347 1348 1349 # --------------------------------------------------------------------------- 1350 # Registry -- the "process" tool schema + handler 1351 # --------------------------------------------------------------------------- 1352 from tools.registry import registry, tool_error 1353 1354 PROCESS_SCHEMA = { 1355 "name": "process", 1356 "description": ( 1357 "Manage background processes started with terminal(background=true). " 1358 "Actions: 'list' (show all), 'poll' (check status + new output), " 1359 "'log' (full output with pagination), 'wait' (block until done or timeout), " 1360 "'kill' (terminate), 'write' (send raw stdin data without newline), " 1361 "'submit' (send data + Enter, for answering prompts), 'close' (close stdin/send EOF)." 1362 ), 1363 "parameters": { 1364 "type": "object", 1365 "properties": { 1366 "action": { 1367 "type": "string", 1368 "enum": ["list", "poll", "log", "wait", "kill", "write", "submit", "close"], 1369 "description": "Action to perform on background processes" 1370 }, 1371 "session_id": { 1372 "type": "string", 1373 "description": "Process session ID (from terminal background output). Required for all actions except 'list'." 1374 }, 1375 "data": { 1376 "type": "string", 1377 "description": "Text to send to process stdin (for 'write' and 'submit' actions)" 1378 }, 1379 "timeout": { 1380 "type": "integer", 1381 "description": "Max seconds to block for 'wait' action. Returns partial output on timeout.", 1382 "minimum": 1 1383 }, 1384 "offset": { 1385 "type": "integer", 1386 "description": "Line offset for 'log' action (default: last 200 lines)" 1387 }, 1388 "limit": { 1389 "type": "integer", 1390 "description": "Max lines to return for 'log' action", 1391 "minimum": 1 1392 } 1393 }, 1394 "required": ["action"] 1395 } 1396 } 1397 1398 1399 def _handle_process(args, **kw): 1400 task_id = kw.get("task_id") 1401 action = args.get("action", "") 1402 # Coerce to string — some models send session_id as an integer 1403 session_id = str(args.get("session_id", "")) if args.get("session_id") is not None else "" 1404 1405 if action == "list": 1406 return json.dumps({"processes": process_registry.list_sessions(task_id=task_id)}, ensure_ascii=False) 1407 elif action in ("poll", "log", "wait", "kill", "write", "submit", "close"): 1408 if not session_id: 1409 return tool_error(f"session_id is required for {action}") 1410 if action == "poll": 1411 return json.dumps(process_registry.poll(session_id), ensure_ascii=False) 1412 elif action == "log": 1413 return json.dumps(process_registry.read_log( 1414 session_id, offset=args.get("offset", 0), limit=args.get("limit", 200)), ensure_ascii=False) 1415 elif action == "wait": 1416 return json.dumps(process_registry.wait(session_id, timeout=args.get("timeout")), ensure_ascii=False) 1417 elif action == "kill": 1418 return json.dumps(process_registry.kill_process(session_id), ensure_ascii=False) 1419 elif action == "write": 1420 return json.dumps(process_registry.write_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False) 1421 elif action == "submit": 1422 return json.dumps(process_registry.submit_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False) 1423 elif action == "close": 1424 return json.dumps(process_registry.close_stdin(session_id), ensure_ascii=False) 1425 return tool_error(f"Unknown process action: {action}. Use: list, poll, log, wait, kill, write, submit, close") 1426 1427 1428 registry.register( 1429 name="process", 1430 toolset="terminal", 1431 schema=PROCESS_SCHEMA, 1432 handler=_handle_process, 1433 emoji="⚙️", 1434 )