/ tests / tools / test_interrupt.py
test_interrupt.py
  1  """Tests for the interrupt system.
  2  
  3  Run with: python -m pytest tests/test_interrupt.py -v
  4  """
  5  
  6  import queue
  7  import threading
  8  import time
  9  import pytest
 10  
 11  
 12  # ---------------------------------------------------------------------------
 13  # Unit tests: shared interrupt module
 14  # ---------------------------------------------------------------------------
 15  
 16  class TestInterruptModule:
 17      """Tests for tools/interrupt.py"""
 18  
 19      def test_set_and_check(self):
 20          from tools.interrupt import set_interrupt, is_interrupted
 21          set_interrupt(False)
 22          assert not is_interrupted()
 23  
 24          set_interrupt(True)
 25          assert is_interrupted()
 26  
 27          set_interrupt(False)
 28          assert not is_interrupted()
 29  
 30      def test_thread_safety(self):
 31          """Set from one thread targeting another thread's ident."""
 32          from tools.interrupt import set_interrupt, is_interrupted, _interrupted_threads, _lock
 33          set_interrupt(False)
 34          # Clear any stale thread idents left by prior tests in this worker.
 35          with _lock:
 36              _interrupted_threads.clear()
 37  
 38          seen = {"value": False}
 39  
 40          def _checker():
 41              while not is_interrupted():
 42                  time.sleep(0.01)
 43              seen["value"] = True
 44  
 45          t = threading.Thread(target=_checker, daemon=True)
 46          t.start()
 47  
 48          time.sleep(0.05)
 49          assert not seen["value"]
 50  
 51          # Target the checker thread's ident so it sees the interrupt
 52          set_interrupt(True, thread_id=t.ident)
 53          t.join(timeout=1)
 54          assert seen["value"]
 55  
 56          set_interrupt(False, thread_id=t.ident)
 57  
 58  
 59  # ---------------------------------------------------------------------------
 60  # Unit tests: pre-tool interrupt check
 61  # ---------------------------------------------------------------------------
 62  
 63  class TestPreToolCheck:
 64      """Verify that _execute_tool_calls skips all tools when interrupted."""
 65  
 66      def test_all_tools_skipped_when_interrupted(self):
 67          """Mock an interrupted agent and verify no tools execute."""
 68          from unittest.mock import MagicMock, patch
 69  
 70          # Build a fake assistant_message with 3 tool calls
 71          tc1 = MagicMock()
 72          tc1.id = "tc_1"
 73          tc1.function.name = "terminal"
 74          tc1.function.arguments = '{"command": "rm -rf /"}'
 75  
 76          tc2 = MagicMock()
 77          tc2.id = "tc_2"
 78          tc2.function.name = "terminal"
 79          tc2.function.arguments = '{"command": "echo hello"}'
 80  
 81          tc3 = MagicMock()
 82          tc3.id = "tc_3"
 83          tc3.function.name = "web_search"
 84          tc3.function.arguments = '{"query": "test"}'
 85  
 86          assistant_msg = MagicMock()
 87          assistant_msg.tool_calls = [tc1, tc2, tc3]
 88  
 89          messages = []
 90  
 91          # Create a minimal mock agent with _interrupt_requested = True
 92          agent = MagicMock()
 93          agent._interrupt_requested = True
 94          agent.log_prefix = ""
 95          agent._persist_session = MagicMock()
 96  
 97          # Import and call the method
 98          import types
 99          from run_agent import AIAgent
100          # Bind the real methods to our mock so dispatch works correctly
101          agent._execute_tool_calls_sequential = types.MethodType(AIAgent._execute_tool_calls_sequential, agent)
102          agent._execute_tool_calls_concurrent = types.MethodType(AIAgent._execute_tool_calls_concurrent, agent)
103          AIAgent._execute_tool_calls(agent, assistant_msg, messages, "default")
104  
105          # All 3 should be skipped
106          assert len(messages) == 3
107          for msg in messages:
108              assert msg["role"] == "tool"
109              assert "cancelled" in msg["content"].lower() or "interrupted" in msg["content"].lower()
110  
111          # No actual tool handlers should have been called
112          # (handle_function_call should NOT have been invoked)
113  
114  
115  # ---------------------------------------------------------------------------
116  # Unit tests: message combining
117  # ---------------------------------------------------------------------------
118  
119  class TestMessageCombining:
120      """Verify multiple interrupt messages are joined."""
121  
122      def test_cli_interrupt_queue_drain(self):
123          """Simulate draining multiple messages from the interrupt queue."""
124          q = queue.Queue()
125          q.put("Stop!")
126          q.put("Don't delete anything")
127          q.put("Show me what you were going to delete instead")
128  
129          parts = []
130          while not q.empty():
131              try:
132                  msg = q.get_nowait()
133                  if msg:
134                      parts.append(msg)
135              except queue.Empty:
136                  break
137  
138          combined = "\n".join(parts)
139          assert "Stop!" in combined
140          assert "Don't delete anything" in combined
141          assert "Show me what you were going to delete instead" in combined
142          assert combined.count("\n") == 2
143  
144      def test_gateway_pending_messages_append(self):
145          """Simulate gateway _pending_messages append logic."""
146          pending = {}
147          key = "agent:main:telegram:dm"
148  
149          # First message
150          if key in pending:
151              pending[key] += "\n" + "Stop!"
152          else:
153              pending[key] = "Stop!"
154  
155          # Second message
156          if key in pending:
157              pending[key] += "\n" + "Do something else instead"
158          else:
159              pending[key] = "Do something else instead"
160  
161          assert pending[key] == "Stop!\nDo something else instead"
162  
163  
164  # ---------------------------------------------------------------------------
165  # Integration tests (require local terminal)
166  # ---------------------------------------------------------------------------
167  
168  class TestSIGKILLEscalation:
169      """Test that SIGTERM-resistant processes get SIGKILL'd."""
170  
171      @pytest.mark.skipif(
172          not __import__("shutil").which("bash"),
173          reason="Requires bash"
174      )
175      def test_sigterm_trap_killed_within_2s(self):
176          """A process that traps SIGTERM should be SIGKILL'd after 1s grace."""
177          from tools.interrupt import set_interrupt
178          from tools.environments.local import LocalEnvironment
179  
180          set_interrupt(False)
181          env = LocalEnvironment(cwd="/tmp", timeout=30)
182  
183          # Start execution in a thread, interrupt after 0.5s
184          result_holder = {"value": None}
185  
186          def _run():
187              result_holder["value"] = env.execute(
188                  "trap '' TERM; sleep 60",
189                  timeout=30,
190              )
191  
192          t = threading.Thread(target=_run)
193          t.start()
194  
195          time.sleep(0.5)
196          set_interrupt(True, thread_id=t.ident)
197  
198          t.join(timeout=5)
199          set_interrupt(False, thread_id=t.ident)
200  
201          assert result_holder["value"] is not None
202          assert result_holder["value"]["returncode"] == 130
203          assert "interrupted" in result_holder["value"]["output"].lower()
204  
205  
206  # ---------------------------------------------------------------------------
207  # Manual smoke test checklist (not automated)
208  # ---------------------------------------------------------------------------
209  
210  SMOKE_TESTS = """
211  Manual Smoke Test Checklist:
212  
213  1. CLI: Run `hermes`, ask it to `sleep 30` in terminal, type "stop" + Enter.
214     Expected: command dies within 2s, agent responds to "stop".
215  
216  2. CLI: Ask it to extract content from 5 URLs, type interrupt mid-way.
217     Expected: remaining URLs are skipped, partial results returned.
218  
219  3. Gateway (Telegram): Send a long task, then send "Stop".
220     Expected: agent stops and responds acknowledging the stop.
221  
222  4. Gateway (Telegram): Send "Stop" then "Do X instead" rapidly.
223     Expected: both messages appear as the next prompt (joined by newline).
224  
225  5. CLI: Start a task that generates 3+ tool calls in one batch.
226     Type interrupt during the first tool call.
227     Expected: only 1 tool executes, remaining are skipped.
228  """