/ tools / interrupt.py
interrupt.py
 1  """Per-thread interrupt signaling for all tools.
 2  
 3  Provides thread-scoped interrupt tracking so that interrupting one agent
 4  session does not kill tools running in other sessions.  This is critical
 5  in the gateway where multiple agents run concurrently in the same process.
 6  
 7  The agent stores its execution thread ID at the start of run_conversation()
 8  and passes it to set_interrupt()/clear_interrupt().  Tools call
 9  is_interrupted() which checks the CURRENT thread — no argument needed.
10  
11  Usage in tools:
12      from tools.interrupt import is_interrupted
13      if is_interrupted():
14          return {"output": "[interrupted]", "returncode": 130}
15  """
16  
17  import logging
18  import os
19  import threading
20  
21  logger = logging.getLogger(__name__)
22  
23  # Opt-in debug tracing — pairs with HERMES_DEBUG_INTERRUPT in
24  # tools/environments/base.py.  Enables per-call logging of set/check so the
25  # caller thread, target thread, and current state are visible when
26  # diagnosing "interrupt signaled but tool never saw it" reports.
27  _DEBUG_INTERRUPT = bool(os.getenv("HERMES_DEBUG_INTERRUPT"))
28  
29  if _DEBUG_INTERRUPT:
30      # AIAgent's quiet_mode path forces `tools` logger to ERROR on CLI startup.
31      # Force our own logger back to INFO so the trace is visible in agent.log.
32      logger.setLevel(logging.INFO)
33  
34  # Set of thread idents that have been interrupted.
35  _interrupted_threads: set[int] = set()
36  _lock = threading.Lock()
37  
38  
39  def set_interrupt(active: bool, thread_id: int | None = None) -> None:
40      """Set or clear interrupt for a specific thread.
41  
42      Args:
43          active: True to signal interrupt, False to clear it.
44          thread_id: Target thread ident.  When None, targets the
45                     current thread (backward compat for CLI/tests).
46      """
47      tid = thread_id if thread_id is not None else threading.current_thread().ident
48      with _lock:
49          if active:
50              _interrupted_threads.add(tid)
51          else:
52              _interrupted_threads.discard(tid)
53          _snapshot = set(_interrupted_threads) if _DEBUG_INTERRUPT else None
54      if _DEBUG_INTERRUPT:
55          logger.info(
56              "[interrupt-debug] set_interrupt(active=%s, target_tid=%s) "
57              "called_from_tid=%s current_set=%s",
58              active, tid, threading.current_thread().ident, _snapshot,
59          )
60  
61  
62  def is_interrupted() -> bool:
63      """Check if an interrupt has been requested for the current thread.
64  
65      Safe to call from any thread — each thread only sees its own
66      interrupt state.
67      """
68      tid = threading.current_thread().ident
69      with _lock:
70          return tid in _interrupted_threads
71  
72  
73  # ---------------------------------------------------------------------------
74  # Backward-compatible _interrupt_event proxy
75  # ---------------------------------------------------------------------------
76  # Some legacy call sites (code_execution_tool, process_registry, tests)
77  # import _interrupt_event directly and call .is_set() / .set() / .clear().
78  # This shim maps those calls to the per-thread functions above so existing
79  # code keeps working while the underlying mechanism is thread-scoped.
80  
81  class _ThreadAwareEventProxy:
82      """Drop-in proxy that maps threading.Event methods to per-thread state."""
83  
84      def is_set(self) -> bool:
85          return is_interrupted()
86  
87      def set(self) -> None:  # noqa: A003
88          set_interrupt(True)
89  
90      def clear(self) -> None:
91          set_interrupt(False)
92  
93      def wait(self, timeout: float | None = None) -> bool:
94          """Not truly supported — returns current state immediately."""
95          return self.is_set()
96  
97  
98  _interrupt_event = _ThreadAwareEventProxy()