interrupt.py
1 """Per-thread interrupt signaling for all tools. 2 3 Provides thread-scoped interrupt tracking so that interrupting one agent 4 session does not kill tools running in other sessions. This is critical 5 in the gateway where multiple agents run concurrently in the same process. 6 7 The agent stores its execution thread ID at the start of run_conversation() 8 and passes it to set_interrupt()/clear_interrupt(). Tools call 9 is_interrupted() which checks the CURRENT thread — no argument needed. 10 11 Usage in tools: 12 from tools.interrupt import is_interrupted 13 if is_interrupted(): 14 return {"output": "[interrupted]", "returncode": 130} 15 """ 16 17 import logging 18 import os 19 import threading 20 21 logger = logging.getLogger(__name__) 22 23 # Opt-in debug tracing — pairs with HERMES_DEBUG_INTERRUPT in 24 # tools/environments/base.py. Enables per-call logging of set/check so the 25 # caller thread, target thread, and current state are visible when 26 # diagnosing "interrupt signaled but tool never saw it" reports. 27 _DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT")) 28 29 if _DEBUG_INTERRUPT: 30 # AIAgent's quiet_mode path forces `tools` logger to ERROR on CLI startup. 31 # Force our own logger back to INFO so the trace is visible in agent.log. 32 logger.setLevel(logging.INFO) 33 34 # Set of thread idents that have been interrupted. 35 _interrupted_threads: set[int] = set() 36 _lock = threading.Lock() 37 38 39 def set_interrupt(active: bool, thread_id: int | None = None) -> None: 40 """Set or clear interrupt for a specific thread. 41 42 Args: 43 active: True to signal interrupt, False to clear it. 44 thread_id: Target thread ident. When None, targets the 45 current thread (backward compat for CLI/tests). 46 """ 47 tid = thread_id if thread_id is not None else threading.current_thread().ident 48 with _lock: 49 if active: 50 _interrupted_threads.add(tid) 51 else: 52 _interrupted_threads.discard(tid) 53 _snapshot = set(_interrupted_threads) if _DEBUG_INTERRUPT else None 54 if _DEBUG_INTERRUPT: 55 logger.info( 56 "[interrupt-debug] set_interrupt(active=%s, target_tid=%s) " 57 "called_from_tid=%s current_set=%s", 58 active, tid, threading.current_thread().ident, _snapshot, 59 ) 60 61 62 def is_interrupted() -> bool: 63 """Check if an interrupt has been requested for the current thread. 64 65 Safe to call from any thread — each thread only sees its own 66 interrupt state. 67 """ 68 tid = threading.current_thread().ident 69 with _lock: 70 return tid in _interrupted_threads 71 72 73 # --------------------------------------------------------------------------- 74 # Backward-compatible _interrupt_event proxy 75 # --------------------------------------------------------------------------- 76 # Some legacy call sites (code_execution_tool, process_registry, tests) 77 # import _interrupt_event directly and call .is_set() / .set() / .clear(). 78 # This shim maps those calls to the per-thread functions above so existing 79 # code keeps working while the underlying mechanism is thread-scoped. 80 81 class _ThreadAwareEventProxy: 82 """Drop-in proxy that maps threading.Event methods to per-thread state.""" 83 84 def is_set(self) -> bool: 85 return is_interrupted() 86 87 def set(self) -> None: # noqa: A003 88 set_interrupt(True) 89 90 def clear(self) -> None: 91 set_interrupt(False) 92 93 def wait(self, timeout: float | None = None) -> bool: 94 """Not truly supported — returns current state immediately.""" 95 return self.is_set() 96 97 98 _interrupt_event = _ThreadAwareEventProxy()