/ tests / gateway / test_busy_session_ack.py
test_busy_session_ack.py
  1  """Tests for busy-session acknowledgment when user sends messages during active agent runs.
  2  
  3  Verifies that users get an immediate status response instead of total silence
  4  when the agent is working on a task. See PR fix for the @Lonely__MH report.
  5  """
  6  import asyncio
  7  import time
  8  from unittest.mock import AsyncMock, MagicMock, patch
  9  
 10  import pytest
 11  
 12  # ---------------------------------------------------------------------------
 13  # Minimal stubs so we can import gateway code without heavy deps
 14  # ---------------------------------------------------------------------------
 15  import sys, types
 16  
 17  _tg = types.ModuleType("telegram")
 18  _tg.constants = types.ModuleType("telegram.constants")
 19  _ct = MagicMock()
 20  _ct.SUPERGROUP = "supergroup"
 21  _ct.GROUP = "group"
 22  _ct.PRIVATE = "private"
 23  _tg.constants.ChatType = _ct
 24  sys.modules.setdefault("telegram", _tg)
 25  sys.modules.setdefault("telegram.constants", _tg.constants)
 26  sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext"))
 27  
 28  from gateway.platforms.base import (
 29      BasePlatformAdapter,
 30      MessageEvent,
 31      MessageType,
 32      SessionSource,
 33      build_session_key,
 34  )
 35  
 36  
 37  # ---------------------------------------------------------------------------
 38  # Helpers
 39  # ---------------------------------------------------------------------------
 40  
 41  def _make_event(text="hello", chat_id="123", platform_val="telegram"):
 42      """Build a minimal MessageEvent."""
 43      source = SessionSource(
 44          platform=MagicMock(value=platform_val),
 45          chat_id=chat_id,
 46          chat_type="private",
 47          user_id="user1",
 48      )
 49      evt = MessageEvent(
 50          text=text,
 51          message_type=MessageType.TEXT,
 52          source=source,
 53          message_id="msg1",
 54      )
 55      return evt
 56  
 57  
 58  def _make_runner():
 59      """Build a minimal GatewayRunner-like object for testing."""
 60      from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL
 61  
 62      runner = object.__new__(GatewayRunner)
 63      runner._running_agents = {}
 64      runner._running_agents_ts = {}
 65      runner._pending_messages = {}
 66      runner._busy_ack_ts = {}
 67      runner._draining = False
 68      runner.adapters = {}
 69      runner.config = MagicMock()
 70      runner.session_store = None
 71      runner.hooks = MagicMock()
 72      runner.hooks.emit = AsyncMock()
 73      runner.pairing_store = MagicMock()
 74      runner.pairing_store.is_approved.return_value = True
 75      runner._is_user_authorized = lambda _source: True
 76      return runner, _AGENT_PENDING_SENTINEL
 77  
 78  
 79  def _make_adapter(platform_val="telegram"):
 80      """Build a minimal adapter mock."""
 81      adapter = MagicMock()
 82      adapter._pending_messages = {}
 83      adapter._send_with_retry = AsyncMock()
 84      adapter.config = MagicMock()
 85      adapter.config.extra = {}
 86      adapter.platform = MagicMock(value=platform_val)
 87      return adapter
 88  
 89  
 90  # ---------------------------------------------------------------------------
 91  # Tests
 92  # ---------------------------------------------------------------------------
 93  
 94  class TestBusySessionAck:
 95      """User sends a message while agent is running — should get acknowledgment."""
 96  
 97      @pytest.mark.asyncio
 98      async def test_handle_message_queue_mode_queues_without_interrupt(self):
 99          """Runner queue mode must not interrupt an active agent for text follow-ups."""
100          from gateway.run import GatewayRunner
101  
102          runner, _sentinel = _make_runner()
103          adapter = _make_adapter()
104  
105          event = _make_event(text="follow up in queue mode")
106          sk = build_session_key(event.source)
107  
108          running_agent = MagicMock()
109          runner._busy_input_mode = "queue"
110          runner._running_agents[sk] = running_agent
111          runner.adapters[event.source.platform] = adapter
112  
113          result = await GatewayRunner._handle_message(runner, event)
114  
115          assert result is None
116          assert sk in adapter._pending_messages
117          assert adapter._pending_messages[sk] is event
118          assert sk not in runner._pending_messages
119          running_agent.interrupt.assert_not_called()
120  
121      @pytest.mark.asyncio
122      async def test_sends_ack_when_agent_running(self):
123          """First message during busy session should get a status ack."""
124          runner, sentinel = _make_runner()
125          runner._busy_input_mode = "interrupt"
126          adapter = _make_adapter()
127  
128          event = _make_event(text="Are you working?")
129          sk = build_session_key(event.source)
130  
131          # Simulate running agent
132          agent = MagicMock()
133          agent.get_activity_summary.return_value = {
134              "api_call_count": 21,
135              "max_iterations": 60,
136              "current_tool": "terminal",
137              "last_activity_ts": time.time(),
138              "last_activity_desc": "terminal",
139              "seconds_since_activity": 1.0,
140          }
141          runner._running_agents[sk] = agent
142          runner._running_agents_ts[sk] = time.time() - 600  # 10 min ago
143          runner.adapters[event.source.platform] = adapter
144  
145          result = await runner._handle_active_session_busy_message(event, sk)
146  
147          assert result is True  # handled
148          # Verify ack was sent
149          adapter._send_with_retry.assert_called_once()
150          call_kwargs = adapter._send_with_retry.call_args
151          content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
152          if not content and call_kwargs.args:
153              # positional args
154              content = str(call_kwargs)
155          assert "Interrupting" in content or "respond" in content
156          assert "/stop" not in content  # no need — we ARE interrupting
157  
158          # Verify agent interrupt was called
159          agent.interrupt.assert_called_once_with("Are you working?")
160  
161      @pytest.mark.asyncio
162      async def test_queue_mode_suppresses_interrupt_and_updates_ack(self):
163          """When busy_input_mode is 'queue', message is queued WITHOUT interrupt."""
164          runner, sentinel = _make_runner()
165          runner._busy_input_mode = "queue"
166          adapter = _make_adapter()
167  
168          event = _make_event(text="Add this to queue")
169          sk = build_session_key(event.source)
170          runner.adapters[event.source.platform] = adapter
171  
172          agent = MagicMock()
173          runner._running_agents[sk] = agent
174  
175          with patch("gateway.run.merge_pending_message_event"):
176              await runner._handle_active_session_busy_message(event, sk)
177  
178          # VERIFY: Agent was NOT interrupted
179          agent.interrupt.assert_not_called()
180  
181          # VERIFY: Ack sent with queue-specific wording
182          adapter._send_with_retry.assert_called_once()
183          call_kwargs = adapter._send_with_retry.call_args
184          content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
185          assert "Queued for the next turn" in content
186          assert "respond once the current task finishes" in content
187          assert "Interrupting" not in content
188  
189      @pytest.mark.asyncio
190      async def test_steer_mode_calls_agent_steer_no_interrupt_no_queue(self):
191          """busy_input_mode='steer' injects via agent.steer() and skips queueing."""
192          runner, sentinel = _make_runner()
193          runner._busy_input_mode = "steer"
194          adapter = _make_adapter()
195  
196          event = _make_event(text="also check the tests")
197          sk = build_session_key(event.source)
198          runner.adapters[event.source.platform] = adapter
199  
200          agent = MagicMock()
201          agent.steer = MagicMock(return_value=True)
202          runner._running_agents[sk] = agent
203  
204          with patch("gateway.run.merge_pending_message_event") as mock_merge:
205              await runner._handle_active_session_busy_message(event, sk)
206  
207          # VERIFY: Agent was steered, NOT interrupted
208          agent.steer.assert_called_once_with("also check the tests")
209          agent.interrupt.assert_not_called()
210  
211          # VERIFY: No queueing — successful steer must NOT replay as next turn
212          mock_merge.assert_not_called()
213  
214          # VERIFY: Ack mentions steer wording
215          adapter._send_with_retry.assert_called_once()
216          call_kwargs = adapter._send_with_retry.call_args
217          content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
218          assert "Steered" in content or "steer" in content.lower()
219          assert "Interrupting" not in content
220  
221      @pytest.mark.asyncio
222      async def test_steer_mode_falls_back_to_queue_when_agent_rejects(self):
223          """If agent.steer() returns False, fall back to queue behavior."""
224          runner, sentinel = _make_runner()
225          runner._busy_input_mode = "steer"
226          adapter = _make_adapter()
227  
228          event = _make_event(text="empty or rejected")
229          sk = build_session_key(event.source)
230          runner.adapters[event.source.platform] = adapter
231  
232          agent = MagicMock()
233          agent.steer = MagicMock(return_value=False)  # rejected
234          runner._running_agents[sk] = agent
235  
236          with patch("gateway.run.merge_pending_message_event") as mock_merge:
237              await runner._handle_active_session_busy_message(event, sk)
238  
239          agent.steer.assert_called_once()
240          agent.interrupt.assert_not_called()
241          # Fell back to queue semantics: event was merged into pending messages
242          mock_merge.assert_called_once()
243  
244          # Ack uses queue-mode wording (not steer, not interrupt)
245          call_kwargs = adapter._send_with_retry.call_args
246          content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
247          assert "Queued for the next turn" in content
248          assert "Steered" not in content
249  
250      @pytest.mark.asyncio
251      async def test_steer_mode_falls_back_to_queue_when_agent_pending(self):
252          """If agent is still starting (sentinel), steer mode falls back to queue."""
253          runner, sentinel = _make_runner()
254          runner._busy_input_mode = "steer"
255          adapter = _make_adapter()
256  
257          event = _make_event(text="arrived too early")
258          sk = build_session_key(event.source)
259          runner.adapters[event.source.platform] = adapter
260  
261          # Agent is still being set up — sentinel in place
262          runner._running_agents[sk] = sentinel
263  
264          with patch("gateway.run.merge_pending_message_event") as mock_merge:
265              await runner._handle_active_session_busy_message(event, sk)
266  
267          # Event was queued instead of steered
268          mock_merge.assert_called_once()
269  
270          call_kwargs = adapter._send_with_retry.call_args
271          content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
272          assert "Queued for the next turn" in content
273  
274      @pytest.mark.asyncio
275      async def test_debounce_suppresses_rapid_acks(self):
276          """Second message within 30s should NOT send another ack."""
277          runner, sentinel = _make_runner()
278          runner._busy_input_mode = "interrupt"
279          adapter = _make_adapter()
280  
281          event1 = _make_event(text="hello?")
282          # Reuse the same source so platform mock matches
283          event2 = MessageEvent(
284              text="still there?",
285              message_type=MessageType.TEXT,
286              source=event1.source,
287              message_id="msg2",
288          )
289          sk = build_session_key(event1.source)
290  
291          agent = MagicMock()
292          agent.get_activity_summary.return_value = {
293              "api_call_count": 5,
294              "max_iterations": 60,
295              "current_tool": None,
296              "last_activity_ts": time.time(),
297              "last_activity_desc": "api_call",
298              "seconds_since_activity": 0.5,
299          }
300          runner._running_agents[sk] = agent
301          runner._running_agents_ts[sk] = time.time() - 60
302          runner.adapters[event1.source.platform] = adapter
303  
304          # First message — should get ack
305          result1 = await runner._handle_active_session_busy_message(event1, sk)
306          assert result1 is True
307          assert adapter._send_with_retry.call_count == 1
308  
309          # Second message within cooldown — should be queued but no ack
310          result2 = await runner._handle_active_session_busy_message(event2, sk)
311          assert result2 is True
312          assert adapter._send_with_retry.call_count == 1  # still 1, no new ack
313  
314          # But interrupt should still be called for both (since we are in interrupt mode)
315          assert agent.interrupt.call_count == 2
316  
317      @pytest.mark.asyncio
318      async def test_ack_after_cooldown_expires(self):
319          """After 30s cooldown, a new message should send a fresh ack."""
320          runner, sentinel = _make_runner()
321          runner._busy_input_mode = "interrupt"
322          adapter = _make_adapter()
323  
324          event = _make_event(text="hello?")
325          sk = build_session_key(event.source)
326  
327          agent = MagicMock()
328          agent.get_activity_summary.return_value = {
329              "api_call_count": 10,
330              "max_iterations": 60,
331              "current_tool": "web_search",
332              "last_activity_ts": time.time(),
333              "last_activity_desc": "tool",
334              "seconds_since_activity": 0.5,
335          }
336          runner._running_agents[sk] = agent
337          runner._running_agents_ts[sk] = time.time() - 120
338          runner.adapters[event.source.platform] = adapter
339  
340          # First ack
341          await runner._handle_active_session_busy_message(event, sk)
342          assert adapter._send_with_retry.call_count == 1
343  
344          # Fake that cooldown expired
345          runner._busy_ack_ts[sk] = time.time() - 31
346  
347          # Second ack should go through
348          await runner._handle_active_session_busy_message(event, sk)
349          assert adapter._send_with_retry.call_count == 2
350  
351      @pytest.mark.asyncio
352      async def test_includes_status_detail(self):
353          """Ack message should include iteration and tool info when available."""
354          runner, sentinel = _make_runner()
355          runner._busy_input_mode = "interrupt"
356          adapter = _make_adapter()
357  
358          event = _make_event(text="yo")
359          sk = build_session_key(event.source)
360  
361          agent = MagicMock()
362          agent.get_activity_summary.return_value = {
363              "api_call_count": 21,
364              "max_iterations": 60,
365              "current_tool": "terminal",
366              "last_activity_ts": time.time(),
367              "last_activity_desc": "terminal",
368              "seconds_since_activity": 0.5,
369          }
370          runner._running_agents[sk] = agent
371          runner._running_agents_ts[sk] = time.time() - 600  # 10 min
372          runner.adapters[event.source.platform] = adapter
373  
374          await runner._handle_active_session_busy_message(event, sk)
375  
376          call_kwargs = adapter._send_with_retry.call_args
377          content = call_kwargs.kwargs.get("content", "")
378          assert "21/60" in content  # iteration
379          assert "terminal" in content  # current tool
380          assert "10 min" in content  # elapsed
381  
382      @pytest.mark.asyncio
383      async def test_draining_still_works(self):
384          """Draining case should still produce the drain-specific message."""
385          runner, sentinel = _make_runner()
386          runner._draining = True
387          runner._busy_input_mode = "interrupt"
388          adapter = _make_adapter()
389  
390          event = _make_event(text="hello")
391          sk = build_session_key(event.source)
392          runner.adapters[event.source.platform] = adapter
393  
394          # Mock the drain-specific methods
395          runner._queue_during_drain_enabled = lambda: False
396          runner._status_action_gerund = lambda: "restarting"
397  
398          result = await runner._handle_active_session_busy_message(event, sk)
399          assert result is True
400  
401          call_kwargs = adapter._send_with_retry.call_args
402          content = call_kwargs.kwargs.get("content", "")
403          assert "restarting" in content
404  
405      @pytest.mark.asyncio
406      async def test_pending_sentinel_no_interrupt(self):
407          """When agent is PENDING_SENTINEL, don't call interrupt (it has no method)."""
408          runner, sentinel = _make_runner()
409          runner._busy_input_mode = "interrupt"
410          adapter = _make_adapter()
411  
412          event = _make_event(text="hey")
413          sk = build_session_key(event.source)
414  
415          runner._running_agents[sk] = sentinel
416          runner._running_agents_ts[sk] = time.time()
417          runner.adapters[event.source.platform] = adapter
418  
419          result = await runner._handle_active_session_busy_message(event, sk)
420          assert result is True
421          # Should still send ack
422          adapter._send_with_retry.assert_called_once()
423  
424      @pytest.mark.asyncio
425      async def test_no_adapter_falls_through(self):
426          """If adapter is missing, return False so default path handles it."""
427          runner, sentinel = _make_runner()
428  
429          event = _make_event(text="hello")
430          sk = build_session_key(event.source)
431  
432          # No adapter registered
433          runner._running_agents[sk] = MagicMock()
434  
435          result = await runner._handle_active_session_busy_message(event, sk)
436          assert result is False  # not handled, let default path try
437  
438  
439  class TestBusySessionOnboardingHint:
440      """First-touch hint appended to the busy-ack the first time it fires."""
441  
442      @pytest.mark.asyncio
443      async def test_first_busy_ack_appends_interrupt_hint(self, tmp_path, monkeypatch):
444          """First busy-while-running message gets an extra hint about /busy."""
445          import gateway.run as _gr
446  
447          monkeypatch.setattr(_gr, "_hermes_home", tmp_path)
448          # mark_seen imports utils.atomic_yaml_write; make sure it resolves
449          # against a writable dir by pointing _hermes_home at tmp_path.
450          monkeypatch.setattr(_gr, "_load_gateway_config", lambda: {})
451  
452          runner, _sentinel = _make_runner()
453          runner._busy_input_mode = "interrupt"
454          adapter = _make_adapter()
455  
456          event = _make_event(text="ping")
457          sk = build_session_key(event.source)
458  
459          agent = MagicMock()
460          agent.get_activity_summary.return_value = {
461              "api_call_count": 3, "max_iterations": 60,
462              "current_tool": None, "last_activity_ts": time.time(),
463              "last_activity_desc": "api", "seconds_since_activity": 0.1,
464          }
465          runner._running_agents[sk] = agent
466          runner._running_agents_ts[sk] = time.time() - 5
467          runner.adapters[event.source.platform] = adapter
468  
469          await runner._handle_active_session_busy_message(event, sk)
470  
471          call_kwargs = adapter._send_with_retry.call_args
472          content = call_kwargs.kwargs.get("content", "")
473  
474          # Normal ack body
475          assert "Interrupting" in content
476          # First-touch hint appended
477          assert "First-time tip" in content
478          assert "/busy queue" in content
479  
480          # The flag is now persisted to tmp_path/config.yaml
481          import yaml
482          cfg = yaml.safe_load((tmp_path / "config.yaml").read_text())
483          assert cfg["onboarding"]["seen"]["busy_input_prompt"] is True
484  
485      @pytest.mark.asyncio
486      async def test_second_busy_ack_omits_hint(self, tmp_path, monkeypatch):
487          """Once the flag is marked, the hint never appears again."""
488          import gateway.run as _gr
489          import yaml
490  
491          monkeypatch.setattr(_gr, "_hermes_home", tmp_path)
492          # Pre-populate the config so is_seen() returns True from the start.
493          (tmp_path / "config.yaml").write_text(yaml.safe_dump({
494              "onboarding": {"seen": {"busy_input_prompt": True}},
495          }))
496          monkeypatch.setattr(
497              _gr, "_load_gateway_config",
498              lambda: yaml.safe_load((tmp_path / "config.yaml").read_text()),
499          )
500  
501          runner, _sentinel = _make_runner()
502          runner._busy_input_mode = "interrupt"
503          adapter = _make_adapter()
504  
505          event = _make_event(text="ping again")
506          sk = build_session_key(event.source)
507  
508          agent = MagicMock()
509          agent.get_activity_summary.return_value = {
510              "api_call_count": 3, "max_iterations": 60,
511              "current_tool": None, "last_activity_ts": time.time(),
512              "last_activity_desc": "api", "seconds_since_activity": 0.1,
513          }
514          runner._running_agents[sk] = agent
515          runner._running_agents_ts[sk] = time.time() - 5
516          runner.adapters[event.source.platform] = adapter
517  
518          await runner._handle_active_session_busy_message(event, sk)
519  
520          call_kwargs = adapter._send_with_retry.call_args
521          content = call_kwargs.kwargs.get("content", "")
522  
523          assert "Interrupting" in content
524          assert "First-time tip" not in content
525          assert "/busy queue" not in content
526  
527      @pytest.mark.asyncio
528      async def test_queue_mode_hint_points_to_interrupt(self, tmp_path, monkeypatch):
529          """In queue mode the hint should suggest /busy interrupt, not /busy queue."""
530          import gateway.run as _gr
531  
532          monkeypatch.setattr(_gr, "_hermes_home", tmp_path)
533          monkeypatch.setattr(_gr, "_load_gateway_config", lambda: {})
534  
535          runner, _sentinel = _make_runner()
536          runner._busy_input_mode = "queue"
537          adapter = _make_adapter()
538  
539          event = _make_event(text="queue me")
540          sk = build_session_key(event.source)
541          runner.adapters[event.source.platform] = adapter
542  
543          agent = MagicMock()
544          runner._running_agents[sk] = agent
545  
546          with patch("gateway.run.merge_pending_message_event"):
547              await runner._handle_active_session_busy_message(event, sk)
548  
549          content = adapter._send_with_retry.call_args.kwargs.get("content", "")
550          assert "Queued for the next turn" in content
551          assert "First-time tip" in content
552          assert "/busy interrupt" in content
553          # Must NOT tell the user to /busy queue when they're already on queue.
554          assert "/busy queue" not in content