test_interrupt.py
1 """Tests for the interrupt system. 2 3 Run with: python -m pytest tests/test_interrupt.py -v 4 """ 5 6 import queue 7 import threading 8 import time 9 import pytest 10 11 12 # --------------------------------------------------------------------------- 13 # Unit tests: shared interrupt module 14 # --------------------------------------------------------------------------- 15 16 class TestInterruptModule: 17 """Tests for tools/interrupt.py""" 18 19 def test_set_and_check(self): 20 from tools.interrupt import set_interrupt, is_interrupted 21 set_interrupt(False) 22 assert not is_interrupted() 23 24 set_interrupt(True) 25 assert is_interrupted() 26 27 set_interrupt(False) 28 assert not is_interrupted() 29 30 def test_thread_safety(self): 31 """Set from one thread targeting another thread's ident.""" 32 from tools.interrupt import set_interrupt, is_interrupted, _interrupted_threads, _lock 33 set_interrupt(False) 34 # Clear any stale thread idents left by prior tests in this worker. 35 with _lock: 36 _interrupted_threads.clear() 37 38 seen = {"value": False} 39 40 def _checker(): 41 while not is_interrupted(): 42 time.sleep(0.01) 43 seen["value"] = True 44 45 t = threading.Thread(target=_checker, daemon=True) 46 t.start() 47 48 time.sleep(0.05) 49 assert not seen["value"] 50 51 # Target the checker thread's ident so it sees the interrupt 52 set_interrupt(True, thread_id=t.ident) 53 t.join(timeout=1) 54 assert seen["value"] 55 56 set_interrupt(False, thread_id=t.ident) 57 58 59 # --------------------------------------------------------------------------- 60 # Unit tests: pre-tool interrupt check 61 # --------------------------------------------------------------------------- 62 63 class TestPreToolCheck: 64 """Verify that _execute_tool_calls skips all tools when interrupted.""" 65 66 def test_all_tools_skipped_when_interrupted(self): 67 """Mock an interrupted agent and verify no tools execute.""" 68 from unittest.mock import MagicMock, patch 69 70 # Build a fake assistant_message with 3 tool calls 71 tc1 = MagicMock() 72 tc1.id = "tc_1" 73 tc1.function.name = "terminal" 74 tc1.function.arguments = '{"command": "rm -rf /"}' 75 76 tc2 = MagicMock() 77 tc2.id = "tc_2" 78 tc2.function.name = "terminal" 79 tc2.function.arguments = '{"command": "echo hello"}' 80 81 tc3 = MagicMock() 82 tc3.id = "tc_3" 83 tc3.function.name = "web_search" 84 tc3.function.arguments = '{"query": "test"}' 85 86 assistant_msg = MagicMock() 87 assistant_msg.tool_calls = [tc1, tc2, tc3] 88 89 messages = [] 90 91 # Create a minimal mock agent with _interrupt_requested = True 92 agent = MagicMock() 93 agent._interrupt_requested = True 94 agent.log_prefix = "" 95 agent._persist_session = MagicMock() 96 97 # Import and call the method 98 import types 99 from run_agent import AIAgent 100 # Bind the real methods to our mock so dispatch works correctly 101 agent._execute_tool_calls_sequential = types.MethodType(AIAgent._execute_tool_calls_sequential, agent) 102 agent._execute_tool_calls_concurrent = types.MethodType(AIAgent._execute_tool_calls_concurrent, agent) 103 AIAgent._execute_tool_calls(agent, assistant_msg, messages, "default") 104 105 # All 3 should be skipped 106 assert len(messages) == 3 107 for msg in messages: 108 assert msg["role"] == "tool" 109 assert "cancelled" in msg["content"].lower() or "interrupted" in msg["content"].lower() 110 111 # No actual tool handlers should have been called 112 # (handle_function_call should NOT have been invoked) 113 114 115 # --------------------------------------------------------------------------- 116 # Unit tests: message combining 117 # --------------------------------------------------------------------------- 118 119 class TestMessageCombining: 120 """Verify multiple interrupt messages are joined.""" 121 122 def test_cli_interrupt_queue_drain(self): 123 """Simulate draining multiple messages from the interrupt queue.""" 124 q = queue.Queue() 125 q.put("Stop!") 126 q.put("Don't delete anything") 127 q.put("Show me what you were going to delete instead") 128 129 parts = [] 130 while not q.empty(): 131 try: 132 msg = q.get_nowait() 133 if msg: 134 parts.append(msg) 135 except queue.Empty: 136 break 137 138 combined = "\n".join(parts) 139 assert "Stop!" in combined 140 assert "Don't delete anything" in combined 141 assert "Show me what you were going to delete instead" in combined 142 assert combined.count("\n") == 2 143 144 def test_gateway_pending_messages_append(self): 145 """Simulate gateway _pending_messages append logic.""" 146 pending = {} 147 key = "agent:main:telegram:dm" 148 149 # First message 150 if key in pending: 151 pending[key] += "\n" + "Stop!" 152 else: 153 pending[key] = "Stop!" 154 155 # Second message 156 if key in pending: 157 pending[key] += "\n" + "Do something else instead" 158 else: 159 pending[key] = "Do something else instead" 160 161 assert pending[key] == "Stop!\nDo something else instead" 162 163 164 # --------------------------------------------------------------------------- 165 # Integration tests (require local terminal) 166 # --------------------------------------------------------------------------- 167 168 class TestSIGKILLEscalation: 169 """Test that SIGTERM-resistant processes get SIGKILL'd.""" 170 171 @pytest.mark.skipif( 172 not __import__("shutil").which("bash"), 173 reason="Requires bash" 174 ) 175 def test_sigterm_trap_killed_within_2s(self): 176 """A process that traps SIGTERM should be SIGKILL'd after 1s grace.""" 177 from tools.interrupt import set_interrupt 178 from tools.environments.local import LocalEnvironment 179 180 set_interrupt(False) 181 env = LocalEnvironment(cwd="/tmp", timeout=30) 182 183 # Start execution in a thread, interrupt after 0.5s 184 result_holder = {"value": None} 185 186 def _run(): 187 result_holder["value"] = env.execute( 188 "trap '' TERM; sleep 60", 189 timeout=30, 190 ) 191 192 t = threading.Thread(target=_run) 193 t.start() 194 195 time.sleep(0.5) 196 set_interrupt(True, thread_id=t.ident) 197 198 t.join(timeout=5) 199 set_interrupt(False, thread_id=t.ident) 200 201 assert result_holder["value"] is not None 202 assert result_holder["value"]["returncode"] == 130 203 assert "interrupted" in result_holder["value"]["output"].lower() 204 205 206 # --------------------------------------------------------------------------- 207 # Manual smoke test checklist (not automated) 208 # --------------------------------------------------------------------------- 209 210 SMOKE_TESTS = """ 211 Manual Smoke Test Checklist: 212 213 1. CLI: Run `hermes`, ask it to `sleep 30` in terminal, type "stop" + Enter. 214 Expected: command dies within 2s, agent responds to "stop". 215 216 2. CLI: Ask it to extract content from 5 URLs, type interrupt mid-way. 217 Expected: remaining URLs are skipped, partial results returned. 218 219 3. Gateway (Telegram): Send a long task, then send "Stop". 220 Expected: agent stops and responds acknowledging the stop. 221 222 4. Gateway (Telegram): Send "Stop" then "Do X instead" rapidly. 223 Expected: both messages appear as the next prompt (joined by newline). 224 225 5. CLI: Start a task that generates 3+ tool calls in one batch. 226 Type interrupt during the first tool call. 227 Expected: only 1 tool executes, remaining are skipped. 228 """