test_cli_interrupt_subagent.py
1 """End-to-end test simulating CLI interrupt during subagent execution. 2 3 Reproduces the exact scenario: 4 1. Parent agent calls delegate_task 5 2. Child agent is running (simulated with a slow tool) 6 3. User "types a message" (simulated by calling parent.interrupt from another thread) 7 4. Child should detect the interrupt and stop 8 9 This tests the COMPLETE path including _run_single_child, _active_children 10 registration, interrupt propagation, and child detection. 11 """ 12 13 import json 14 import os 15 import queue 16 import threading 17 import time 18 import unittest 19 from unittest.mock import MagicMock, patch, PropertyMock 20 21 from tools.interrupt import set_interrupt, is_interrupted 22 23 24 class TestCLISubagentInterrupt(unittest.TestCase): 25 """Simulate exact CLI scenario.""" 26 27 def setUp(self): 28 set_interrupt(False) 29 30 def tearDown(self): 31 set_interrupt(False) 32 33 def test_full_delegate_interrupt_flow(self): 34 """Full integration: parent runs delegate_task, main thread interrupts.""" 35 from run_agent import AIAgent 36 37 interrupt_detected = threading.Event() 38 child_started = threading.Event() 39 child_api_call_count = 0 40 41 # Create a real-enough parent agent 42 parent = AIAgent.__new__(AIAgent) 43 parent._interrupt_requested = False 44 parent._interrupt_message = None 45 parent._active_children = [] 46 parent._active_children_lock = threading.Lock() 47 parent.quiet_mode = True 48 parent.model = "test/model" 49 parent.base_url = "http://localhost:1" 50 parent.api_key = "test" 51 parent.provider = "test" 52 parent.api_mode = "chat_completions" 53 parent.platform = "cli" 54 parent.enabled_toolsets = ["terminal", "file"] 55 parent.providers_allowed = None 56 parent.providers_ignored = None 57 parent.providers_order = None 58 parent.provider_sort = None 59 parent.max_tokens = None 60 parent.reasoning_config = None 61 parent.prefill_messages = None 62 parent._session_db = None 63 parent._delegate_depth = 0 64 parent._delegate_spinner = None 65 parent.tool_progress_callback = None 66 parent._execution_thread_id = None 67 68 # We'll track what happens with _active_children 69 original_children = parent._active_children 70 71 # Mock the child's run_conversation to simulate a slow operation 72 # that checks _interrupt_requested like the real one does 73 def mock_child_run_conversation(user_message, **kwargs): 74 child_started.set() 75 # Find the child in parent._active_children 76 child = parent._active_children[-1] if parent._active_children else None 77 78 # Simulate the agent loop: poll _interrupt_requested like run_conversation does 79 for i in range(100): # Up to 10 seconds (100 * 0.1s) 80 if child and child._interrupt_requested: 81 interrupt_detected.set() 82 return { 83 "final_response": "Interrupted!", 84 "messages": [], 85 "api_calls": 1, 86 "completed": False, 87 "interrupted": True, 88 "interrupt_message": child._interrupt_message, 89 } 90 time.sleep(0.1) 91 92 return { 93 "final_response": "Finished without interrupt", 94 "messages": [], 95 "api_calls": 5, 96 "completed": True, 97 "interrupted": False, 98 } 99 100 # Patch AIAgent to use our mock 101 from tools.delegate_tool import _run_single_child 102 from run_agent import IterationBudget 103 104 parent.iteration_budget = IterationBudget(max_total=100) 105 106 # Run delegate in a thread (simulates agent_thread) 107 delegate_result = [None] 108 delegate_error = [None] 109 110 def run_delegate(): 111 try: 112 with patch('run_agent.AIAgent') as MockAgent: 113 mock_instance = MagicMock() 114 mock_instance._interrupt_requested = False 115 mock_instance._interrupt_message = None 116 mock_instance._active_children = [] 117 mock_instance._active_children_lock = threading.Lock() 118 mock_instance.quiet_mode = True 119 mock_instance.run_conversation = mock_child_run_conversation 120 mock_instance.interrupt = lambda msg=None: setattr(mock_instance, '_interrupt_requested', True) or setattr(mock_instance, '_interrupt_message', msg) 121 mock_instance.tools = [] 122 MockAgent.return_value = mock_instance 123 124 # Register child manually (normally done by _build_child_agent) 125 parent._active_children.append(mock_instance) 126 127 result = _run_single_child( 128 task_index=0, 129 goal="Do something slow", 130 child=mock_instance, 131 parent_agent=parent, 132 ) 133 delegate_result[0] = result 134 except Exception as e: 135 delegate_error[0] = e 136 137 agent_thread = threading.Thread(target=run_delegate, daemon=True) 138 agent_thread.start() 139 140 # Wait for child to start 141 assert child_started.wait(timeout=5), "Child never started!" 142 143 # Now simulate user interrupt (from main/process thread) 144 time.sleep(0.2) # Give child a moment to be in its loop 145 146 print(f"Parent has {len(parent._active_children)} active children") 147 assert len(parent._active_children) >= 1, f"Expected child in _active_children, got {len(parent._active_children)}" 148 149 # This is what the CLI does: 150 parent.interrupt("Hey stop that") 151 152 print(f"Parent._interrupt_requested: {parent._interrupt_requested}") 153 for i, child in enumerate(parent._active_children): 154 print(f"Child {i}._interrupt_requested: {child._interrupt_requested}") 155 156 # Wait for child to detect interrupt 157 detected = interrupt_detected.wait(timeout=3.0) 158 159 # Wait for delegate to finish 160 agent_thread.join(timeout=5) 161 162 if delegate_error[0]: 163 raise delegate_error[0] 164 165 assert detected, "Child never detected the interrupt!" 166 result = delegate_result[0] 167 assert result is not None, "Delegate returned no result" 168 assert result["status"] == "interrupted", f"Expected 'interrupted', got '{result['status']}'" 169 print(f"✓ Interrupt detected! Result: {result}") 170 171 172 if __name__ == "__main__": 173 unittest.main()