/ tests / cli / test_cli_interrupt_subagent.py
test_cli_interrupt_subagent.py
  1  """End-to-end test simulating CLI interrupt during subagent execution.
  2  
  3  Reproduces the exact scenario:
  4  1. Parent agent calls delegate_task
  5  2. Child agent is running (simulated with a slow tool)
  6  3. User "types a message" (simulated by calling parent.interrupt from another thread)
  7  4. Child should detect the interrupt and stop
  8  
  9  This tests the COMPLETE path including _run_single_child, _active_children
 10  registration, interrupt propagation, and child detection.
 11  """
 12  
 13  import json
 14  import os
 15  import queue
 16  import threading
 17  import time
 18  import unittest
 19  from unittest.mock import MagicMock, patch, PropertyMock
 20  
 21  from tools.interrupt import set_interrupt, is_interrupted
 22  
 23  
 24  class TestCLISubagentInterrupt(unittest.TestCase):
 25      """Simulate exact CLI scenario."""
 26  
 27      def setUp(self):
 28          set_interrupt(False)
 29  
 30      def tearDown(self):
 31          set_interrupt(False)
 32  
 33      def test_full_delegate_interrupt_flow(self):
 34          """Full integration: parent runs delegate_task, main thread interrupts."""
 35          from run_agent import AIAgent
 36  
 37          interrupt_detected = threading.Event()
 38          child_started = threading.Event()
 39          child_api_call_count = 0
 40  
 41          # Create a real-enough parent agent
 42          parent = AIAgent.__new__(AIAgent)
 43          parent._interrupt_requested = False
 44          parent._interrupt_message = None
 45          parent._active_children = []
 46          parent._active_children_lock = threading.Lock()
 47          parent.quiet_mode = True
 48          parent.model = "test/model"
 49          parent.base_url = "http://localhost:1"
 50          parent.api_key = "test"
 51          parent.provider = "test"
 52          parent.api_mode = "chat_completions"
 53          parent.platform = "cli"
 54          parent.enabled_toolsets = ["terminal", "file"]
 55          parent.providers_allowed = None
 56          parent.providers_ignored = None
 57          parent.providers_order = None
 58          parent.provider_sort = None
 59          parent.max_tokens = None
 60          parent.reasoning_config = None
 61          parent.prefill_messages = None
 62          parent._session_db = None
 63          parent._delegate_depth = 0
 64          parent._delegate_spinner = None
 65          parent.tool_progress_callback = None
 66          parent._execution_thread_id = None
 67  
 68          # We'll track what happens with _active_children
 69          original_children = parent._active_children
 70  
 71          # Mock the child's run_conversation to simulate a slow operation
 72          # that checks _interrupt_requested like the real one does
 73          def mock_child_run_conversation(user_message, **kwargs):
 74              child_started.set()
 75              # Find the child in parent._active_children
 76              child = parent._active_children[-1] if parent._active_children else None
 77              
 78              # Simulate the agent loop: poll _interrupt_requested like run_conversation does
 79              for i in range(100):  # Up to 10 seconds (100 * 0.1s)
 80                  if child and child._interrupt_requested:
 81                      interrupt_detected.set()
 82                      return {
 83                          "final_response": "Interrupted!",
 84                          "messages": [],
 85                          "api_calls": 1,
 86                          "completed": False,
 87                          "interrupted": True,
 88                          "interrupt_message": child._interrupt_message,
 89                      }
 90                  time.sleep(0.1)
 91              
 92              return {
 93                  "final_response": "Finished without interrupt",
 94                  "messages": [],
 95                  "api_calls": 5,
 96                  "completed": True,
 97                  "interrupted": False,
 98              }
 99  
100          # Patch AIAgent to use our mock
101          from tools.delegate_tool import _run_single_child
102          from run_agent import IterationBudget
103  
104          parent.iteration_budget = IterationBudget(max_total=100)
105  
106          # Run delegate in a thread (simulates agent_thread)
107          delegate_result = [None]
108          delegate_error = [None]
109  
110          def run_delegate():
111              try:
112                  with patch('run_agent.AIAgent') as MockAgent:
113                      mock_instance = MagicMock()
114                      mock_instance._interrupt_requested = False
115                      mock_instance._interrupt_message = None
116                      mock_instance._active_children = []
117                      mock_instance._active_children_lock = threading.Lock()
118                      mock_instance.quiet_mode = True
119                      mock_instance.run_conversation = mock_child_run_conversation
120                      mock_instance.interrupt = lambda msg=None: setattr(mock_instance, '_interrupt_requested', True) or setattr(mock_instance, '_interrupt_message', msg)
121                      mock_instance.tools = []
122                      MockAgent.return_value = mock_instance
123  
124                      # Register child manually (normally done by _build_child_agent)
125                      parent._active_children.append(mock_instance)
126  
127                      result = _run_single_child(
128                          task_index=0,
129                          goal="Do something slow",
130                          child=mock_instance,
131                          parent_agent=parent,
132                      )
133                      delegate_result[0] = result
134              except Exception as e:
135                  delegate_error[0] = e
136  
137          agent_thread = threading.Thread(target=run_delegate, daemon=True)
138          agent_thread.start()
139  
140          # Wait for child to start
141          assert child_started.wait(timeout=5), "Child never started!"
142  
143          # Now simulate user interrupt (from main/process thread)
144          time.sleep(0.2)  # Give child a moment to be in its loop
145          
146          print(f"Parent has {len(parent._active_children)} active children")
147          assert len(parent._active_children) >= 1, f"Expected child in _active_children, got {len(parent._active_children)}"
148  
149          # This is what the CLI does:
150          parent.interrupt("Hey stop that")
151          
152          print(f"Parent._interrupt_requested: {parent._interrupt_requested}")
153          for i, child in enumerate(parent._active_children):
154              print(f"Child {i}._interrupt_requested: {child._interrupt_requested}")
155  
156          # Wait for child to detect interrupt
157          detected = interrupt_detected.wait(timeout=3.0)
158          
159          # Wait for delegate to finish
160          agent_thread.join(timeout=5)
161  
162          if delegate_error[0]:
163              raise delegate_error[0]
164  
165          assert detected, "Child never detected the interrupt!"
166          result = delegate_result[0]
167          assert result is not None, "Delegate returned no result"
168          assert result["status"] == "interrupted", f"Expected 'interrupted', got '{result['status']}'"
169          print(f"✓ Interrupt detected! Result: {result}")
170  
171  
172  if __name__ == "__main__":
173      unittest.main()