/ tests / agent / test_subagent_progress.py
test_subagent_progress.py
  1  """
  2  Tests for subagent progress relay (issue #169).
  3  
  4  Verifies that:
  5  - KawaiiSpinner.print_above() works with and without active spinner
  6  - _build_child_progress_callback handles CLI/gateway/no-display paths
  7  - Thinking events are relayed correctly
  8  - Parallel callbacks don't share state
  9  """
 10  
 11  import io
 12  import sys
 13  import time
 14  import threading
 15  import pytest
 16  from unittest.mock import MagicMock, patch
 17  
 18  from agent.display import KawaiiSpinner
 19  from tools.delegate_tool import _build_child_progress_callback
 20  
 21  
 22  # =========================================================================
 23  # KawaiiSpinner.print_above tests
 24  # =========================================================================
 25  
 26  class TestPrintAbove:
 27      """Tests for KawaiiSpinner.print_above method."""
 28  
 29      def test_print_above_without_spinner_running(self):
 30          """print_above should write to stdout even when spinner is not running."""
 31          buf = io.StringIO()
 32          spinner = KawaiiSpinner("test")
 33          spinner._out = buf  # Redirect to buffer
 34          
 35          spinner.print_above("hello world")
 36          output = buf.getvalue()
 37          assert "hello world" in output
 38  
 39      def test_print_above_with_spinner_running(self):
 40          """print_above should clear spinner line and print text."""
 41          buf = io.StringIO()
 42          spinner = KawaiiSpinner("test")
 43          spinner._out = buf
 44          spinner.running = True  # Pretend spinner is running (don't start thread)
 45          
 46          spinner.print_above("tool line")
 47          output = buf.getvalue()
 48          assert "tool line" in output
 49          assert "\r" in output  # Should start with carriage return to clear spinner line
 50  
 51      def test_print_above_uses_captured_stdout(self):
 52          """print_above should use self._out, not sys.stdout.
 53          This ensures it works inside redirect_stdout(devnull)."""
 54          buf = io.StringIO()
 55          spinner = KawaiiSpinner("test")
 56          spinner._out = buf
 57          
 58          # Simulate redirect_stdout(devnull)
 59          old_stdout = sys.stdout
 60          sys.stdout = io.StringIO()
 61          try:
 62              spinner.print_above("should go to buf")
 63          finally:
 64              sys.stdout = old_stdout
 65          
 66          assert "should go to buf" in buf.getvalue()
 67  
 68  
 69  # =========================================================================
 70  # _build_child_progress_callback tests
 71  # =========================================================================
 72  
 73  class TestBuildChildProgressCallback:
 74      """Tests for child progress callback builder."""
 75  
 76      def test_returns_none_when_no_display(self):
 77          """Should return None when parent has no spinner or callback."""
 78          parent = MagicMock()
 79          parent._delegate_spinner = None
 80          parent.tool_progress_callback = None
 81          
 82          cb = _build_child_progress_callback(0, "test goal", parent)
 83          assert cb is None
 84  
 85      def test_cli_spinner_tool_event(self):
 86          """Should print tool line above spinner for CLI path."""
 87          buf = io.StringIO()
 88          spinner = KawaiiSpinner("delegating")
 89          spinner._out = buf
 90          spinner.running = True
 91          
 92          parent = MagicMock()
 93          parent._delegate_spinner = spinner
 94          parent.tool_progress_callback = None
 95          
 96          cb = _build_child_progress_callback(0, "test goal", parent)
 97          assert cb is not None
 98          
 99          cb("tool.started", "web_search", "quantum computing", {})
100          output = buf.getvalue()
101          assert "web_search" in output
102          assert "quantum computing" in output
103          assert "├─" in output
104  
105      def test_cli_spinner_thinking_event(self):
106          """Should print thinking line above spinner for CLI path."""
107          buf = io.StringIO()
108          spinner = KawaiiSpinner("delegating")
109          spinner._out = buf
110          spinner.running = True
111          
112          parent = MagicMock()
113          parent._delegate_spinner = spinner
114          parent.tool_progress_callback = None
115          
116          cb = _build_child_progress_callback(0, "test goal", parent)
117          cb("_thinking", "I'll search for papers first")
118          
119          output = buf.getvalue()
120          assert "💭" in output
121          assert "search for papers" in output
122  
123      def test_gateway_batched_progress(self):
124          """Gateway path: each tool.started relays a subagent.tool event, and a
125          subagent.progress summary fires once BATCH_SIZE tools accumulate."""
126          parent = MagicMock()
127          parent._delegate_spinner = None
128          parent_cb = MagicMock()
129          parent.tool_progress_callback = parent_cb
130  
131          cb = _build_child_progress_callback(0, "test goal", parent)
132  
133          # Each tool.started relays a subagent.tool event immediately (per-tool relay).
134          for i in range(4):
135              cb("tool.started", f"tool_{i}", f"arg_{i}", {})
136          # 4 per-tool relays so far, no batch summary yet (BATCH_SIZE=5)
137          events = [c.args[0] for c in parent_cb.call_args_list]
138          assert events == ["subagent.tool"] * 4
139  
140          # 5th call triggers another per-tool relay PLUS the batch-size summary
141          cb("tool.started", "tool_4", "arg_4", {})
142          events = [c.args[0] for c in parent_cb.call_args_list]
143          assert events == ["subagent.tool"] * 5 + ["subagent.progress"]
144          summary_call = parent_cb.call_args_list[-1]
145          summary_text = summary_call.kwargs.get("preview") or summary_call.args[2]
146          assert "tool_0" in summary_text
147          assert "tool_4" in summary_text
148  
149      def test_thinking_relayed_to_gateway(self):
150          """Thinking events are relayed as subagent.thinking events."""
151          parent = MagicMock()
152          parent._delegate_spinner = None
153          parent_cb = MagicMock()
154          parent.tool_progress_callback = parent_cb
155  
156          cb = _build_child_progress_callback(0, "test goal", parent)
157          cb("_thinking", "some reasoning text")
158  
159          parent_cb.assert_called_once()
160          assert parent_cb.call_args.args[0] == "subagent.thinking"
161          assert parent_cb.call_args.args[2] == "some reasoning text"
162  
163      def test_parallel_callbacks_independent(self):
164          """Each child's callback batches tool names independently."""
165          parent = MagicMock()
166          parent._delegate_spinner = None
167          parent_cb = MagicMock()
168          parent.tool_progress_callback = parent_cb
169  
170          cb0 = _build_child_progress_callback(0, "goal a", parent)
171          cb1 = _build_child_progress_callback(1, "goal b", parent)
172  
173          # 3 tool.started per child = 6 per-tool relays; neither should hit
174          # the batch-size summary (batch size = 5, counted per-child).
175          for i in range(3):
176              cb0("tool.started", f"tool_{i}", f"a_{i}", {})
177              cb1("tool.started", f"other_{i}", f"b_{i}", {})
178  
179          events = [c.args[0] for c in parent_cb.call_args_list]
180          assert events.count("subagent.tool") == 6
181          assert "subagent.progress" not in events
182  
183      def test_task_index_prefix_in_batch_mode(self):
184          """Batch mode (task_count > 1) should show 1-indexed prefix for all tasks."""
185          buf = io.StringIO()
186          spinner = KawaiiSpinner("delegating")
187          spinner._out = buf
188          spinner.running = True
189          
190          parent = MagicMock()
191          parent._delegate_spinner = spinner
192          parent.tool_progress_callback = None
193          
194          # task_index=0 in a batch of 3 → prefix "[1]"
195          cb0 = _build_child_progress_callback(0, "test goal", parent, task_count=3)
196          cb0("tool.started", "web_search", "test", {})
197          output = buf.getvalue()
198          assert "[1]" in output
199  
200          # task_index=2 in a batch of 3 → prefix "[3]"
201          buf.truncate(0)
202          buf.seek(0)
203          cb2 = _build_child_progress_callback(2, "test goal", parent, task_count=3)
204          cb2("tool.started", "web_search", "test", {})
205          output = buf.getvalue()
206          assert "[3]" in output
207  
208      def test_single_task_no_prefix(self):
209          """Single task (task_count=1) should not show index prefix."""
210          buf = io.StringIO()
211          spinner = KawaiiSpinner("delegating")
212          spinner._out = buf
213          spinner.running = True
214          
215          parent = MagicMock()
216          parent._delegate_spinner = spinner
217          parent.tool_progress_callback = None
218          
219          cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
220          cb("tool.started", "web_search", "test", {})
221          
222          output = buf.getvalue()
223          assert "[" not in output
224  
225  
226  # =========================================================================
227  # Integration: thinking callback in run_agent.py
228  # =========================================================================
229  
230  class TestThinkingCallback:
231      """Tests for the _thinking callback in AIAgent conversation loop."""
232  
233      def _simulate_thinking_callback(self, content, callback, delegate_depth=1):
234          """Simulate the exact code path from run_agent.py for the thinking callback.
235          
236          delegate_depth: simulates self._delegate_depth.
237              0 = main agent (should NOT fire), >=1 = subagent (should fire).
238          """
239          import re
240          if (content and callback and delegate_depth > 0):
241              _think_text = content.strip()
242              _think_text = re.sub(
243                  r'</?(?:REASONING_SCRATCHPAD|think|reasoning)>', '', _think_text
244              ).strip()
245              first_line = _think_text.split('\n')[0][:80] if _think_text else ""
246              if first_line:
247                  try:
248                      callback("_thinking", first_line)
249                  except Exception:
250                      pass
251  
252      def test_thinking_callback_fires_on_content(self):
253          """tool_progress_callback should receive _thinking event
254          when assistant message has content."""
255          calls = []
256          self._simulate_thinking_callback(
257              "I'll research quantum computing first, then summarize.",
258              lambda name, preview=None: calls.append((name, preview))
259          )
260          assert len(calls) == 1
261          assert calls[0][0] == "_thinking"
262          assert "quantum computing" in calls[0][1]
263  
264      def test_thinking_callback_skipped_when_no_content(self):
265          """Should not fire when assistant has no content."""
266          calls = []
267          self._simulate_thinking_callback(
268              None,
269              lambda name, preview=None: calls.append((name, preview))
270          )
271          assert len(calls) == 0
272  
273      def test_thinking_callback_truncates_long_content(self):
274          """Should truncate long content to 80 chars."""
275          calls = []
276          self._simulate_thinking_callback(
277              "A" * 200 + "\nSecond line should be ignored",
278              lambda name, preview=None: calls.append((name, preview))
279          )
280          assert len(calls) == 1
281          assert len(calls[0][1]) == 80
282  
283      def test_thinking_callback_skipped_for_main_agent(self):
284          """Main agent (delegate_depth=0) should NOT fire thinking events.
285          This prevents gateway spam on Telegram/Discord."""
286          calls = []
287          self._simulate_thinking_callback(
288              "I'll help you with that request.",
289              lambda name, preview=None: calls.append((name, preview)),
290              delegate_depth=0,
291          )
292          assert len(calls) == 0
293  
294      def test_thinking_callback_strips_reasoning_scratchpad(self):
295          """REASONING_SCRATCHPAD tags should be stripped before display."""
296          calls = []
297          self._simulate_thinking_callback(
298              "<REASONING_SCRATCHPAD>I need to analyze this carefully</REASONING_SCRATCHPAD>",
299              lambda name, preview=None: calls.append((name, preview))
300          )
301          assert len(calls) == 1
302          assert "<REASONING_SCRATCHPAD>" not in calls[0][1]
303          assert "analyze this carefully" in calls[0][1]
304  
305      def test_thinking_callback_strips_think_tags(self):
306          """<think> tags should be stripped before display."""
307          calls = []
308          self._simulate_thinking_callback(
309              "<think>Let me think about this problem</think>",
310              lambda name, preview=None: calls.append((name, preview))
311          )
312          assert len(calls) == 1
313          assert "<think>" not in calls[0][1]
314          assert "think about this problem" in calls[0][1]
315  
316      def test_thinking_callback_empty_after_strip(self):
317          """Should not fire when content is only XML tags."""
318          calls = []
319          self._simulate_thinking_callback(
320              "<REASONING_SCRATCHPAD></REASONING_SCRATCHPAD>",
321              lambda name, preview=None: calls.append((name, preview))
322          )
323          assert len(calls) == 0
324  
325  
326  # =========================================================================
327  # Gateway batch flush tests
328  # =========================================================================
329  
330  class TestBatchFlush:
331      """Tests for gateway batch flush on subagent completion."""
332  
333      def test_flush_sends_remaining_batch(self):
334          """_flush should send a final subagent.progress summary of any unsent
335          tool names in the batch (less than BATCH_SIZE)."""
336          parent = MagicMock()
337          parent._delegate_spinner = None
338          parent_cb = MagicMock()
339          parent.tool_progress_callback = parent_cb
340  
341          cb = _build_child_progress_callback(0, "test goal", parent)
342  
343          # Send 3 tools (below batch size of 5) — each relays subagent.tool
344          cb("tool.started", "web_search", "query1", {})
345          cb("tool.started", "read_file", "file.txt", {})
346          cb("tool.started", "write_file", "out.txt", {})
347          events = [c.args[0] for c in parent_cb.call_args_list]
348          assert events == ["subagent.tool"] * 3  # per-tool relays so far
349          assert "subagent.progress" not in events  # no batch-size summary yet
350  
351          # Flush should send the remaining 3 as a summary
352          cb._flush()
353          events = [c.args[0] for c in parent_cb.call_args_list]
354          assert events[-1] == "subagent.progress"
355          summary_call = parent_cb.call_args_list[-1]
356          summary_text = summary_call.kwargs.get("preview") or summary_call.args[2]
357          assert "web_search" in summary_text
358          assert "write_file" in summary_text
359  
360      def test_flush_noop_when_batch_empty(self):
361          """_flush should not send anything when batch is empty."""
362          parent = MagicMock()
363          parent._delegate_spinner = None
364          parent_cb = MagicMock()
365          parent.tool_progress_callback = parent_cb
366  
367          cb = _build_child_progress_callback(0, "test goal", parent)
368          cb._flush()
369          parent_cb.assert_not_called()
370  
371      def test_flush_noop_when_no_parent_callback(self):
372          """_flush should not crash when there's no parent callback."""
373          buf = io.StringIO()
374          spinner = KawaiiSpinner("test")
375          spinner._out = buf
376          spinner.running = True
377  
378          parent = MagicMock()
379          parent._delegate_spinner = spinner
380          parent.tool_progress_callback = None
381  
382          cb = _build_child_progress_callback(0, "test goal", parent)
383          cb("tool.started", "web_search", "test", {})
384          cb._flush()  # Should not crash
385  
386  
387  if __name__ == "__main__":
388      pytest.main([__file__, "-v"])
389