test_busy_session_ack.py
1 """Tests for busy-session acknowledgment when user sends messages during active agent runs. 2 3 Verifies that users get an immediate status response instead of total silence 4 when the agent is working on a task. See PR fix for the @Lonely__MH report. 5 """ 6 import asyncio 7 import time 8 from unittest.mock import AsyncMock, MagicMock, patch 9 10 import pytest 11 12 # --------------------------------------------------------------------------- 13 # Minimal stubs so we can import gateway code without heavy deps 14 # --------------------------------------------------------------------------- 15 import sys, types 16 17 _tg = types.ModuleType("telegram") 18 _tg.constants = types.ModuleType("telegram.constants") 19 _ct = MagicMock() 20 _ct.SUPERGROUP = "supergroup" 21 _ct.GROUP = "group" 22 _ct.PRIVATE = "private" 23 _tg.constants.ChatType = _ct 24 sys.modules.setdefault("telegram", _tg) 25 sys.modules.setdefault("telegram.constants", _tg.constants) 26 sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext")) 27 28 from gateway.platforms.base import ( 29 BasePlatformAdapter, 30 MessageEvent, 31 MessageType, 32 SessionSource, 33 build_session_key, 34 ) 35 36 37 # --------------------------------------------------------------------------- 38 # Helpers 39 # --------------------------------------------------------------------------- 40 41 def _make_event(text="hello", chat_id="123", platform_val="telegram"): 42 """Build a minimal MessageEvent.""" 43 source = SessionSource( 44 platform=MagicMock(value=platform_val), 45 chat_id=chat_id, 46 chat_type="private", 47 user_id="user1", 48 ) 49 evt = MessageEvent( 50 text=text, 51 message_type=MessageType.TEXT, 52 source=source, 53 message_id="msg1", 54 ) 55 return evt 56 57 58 def _make_runner(): 59 """Build a minimal GatewayRunner-like object for testing.""" 60 from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL 61 62 runner = object.__new__(GatewayRunner) 63 runner._running_agents = {} 64 runner._running_agents_ts = {} 65 runner._pending_messages = {} 66 runner._busy_ack_ts = {} 67 runner._draining = False 68 runner.adapters = {} 69 runner.config = MagicMock() 70 runner.session_store = None 71 runner.hooks = MagicMock() 72 runner.hooks.emit = AsyncMock() 73 runner.pairing_store = MagicMock() 74 runner.pairing_store.is_approved.return_value = True 75 runner._is_user_authorized = lambda _source: True 76 return runner, _AGENT_PENDING_SENTINEL 77 78 79 def _make_adapter(platform_val="telegram"): 80 """Build a minimal adapter mock.""" 81 adapter = MagicMock() 82 adapter._pending_messages = {} 83 adapter._send_with_retry = AsyncMock() 84 adapter.config = MagicMock() 85 adapter.config.extra = {} 86 adapter.platform = MagicMock(value=platform_val) 87 return adapter 88 89 90 # --------------------------------------------------------------------------- 91 # Tests 92 # --------------------------------------------------------------------------- 93 94 class TestBusySessionAck: 95 """User sends a message while agent is running — should get acknowledgment.""" 96 97 @pytest.mark.asyncio 98 async def test_handle_message_queue_mode_queues_without_interrupt(self): 99 """Runner queue mode must not interrupt an active agent for text follow-ups.""" 100 from gateway.run import GatewayRunner 101 102 runner, _sentinel = _make_runner() 103 adapter = _make_adapter() 104 105 event = _make_event(text="follow up in queue mode") 106 sk = build_session_key(event.source) 107 108 running_agent = MagicMock() 109 runner._busy_input_mode = "queue" 110 runner._running_agents[sk] = running_agent 111 runner.adapters[event.source.platform] = adapter 112 113 result = await GatewayRunner._handle_message(runner, event) 114 115 assert result is None 116 assert sk in adapter._pending_messages 117 assert adapter._pending_messages[sk] is event 118 assert sk not in runner._pending_messages 119 running_agent.interrupt.assert_not_called() 120 121 @pytest.mark.asyncio 122 async def test_sends_ack_when_agent_running(self): 123 """First message during busy session should get a status ack.""" 124 runner, sentinel = _make_runner() 125 runner._busy_input_mode = "interrupt" 126 adapter = _make_adapter() 127 128 event = _make_event(text="Are you working?") 129 sk = build_session_key(event.source) 130 131 # Simulate running agent 132 agent = MagicMock() 133 agent.get_activity_summary.return_value = { 134 "api_call_count": 21, 135 "max_iterations": 60, 136 "current_tool": "terminal", 137 "last_activity_ts": time.time(), 138 "last_activity_desc": "terminal", 139 "seconds_since_activity": 1.0, 140 } 141 runner._running_agents[sk] = agent 142 runner._running_agents_ts[sk] = time.time() - 600 # 10 min ago 143 runner.adapters[event.source.platform] = adapter 144 145 result = await runner._handle_active_session_busy_message(event, sk) 146 147 assert result is True # handled 148 # Verify ack was sent 149 adapter._send_with_retry.assert_called_once() 150 call_kwargs = adapter._send_with_retry.call_args 151 content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "") 152 if not content and call_kwargs.args: 153 # positional args 154 content = str(call_kwargs) 155 assert "Interrupting" in content or "respond" in content 156 assert "/stop" not in content # no need — we ARE interrupting 157 158 # Verify agent interrupt was called 159 agent.interrupt.assert_called_once_with("Are you working?") 160 161 @pytest.mark.asyncio 162 async def test_queue_mode_suppresses_interrupt_and_updates_ack(self): 163 """When busy_input_mode is 'queue', message is queued WITHOUT interrupt.""" 164 runner, sentinel = _make_runner() 165 runner._busy_input_mode = "queue" 166 adapter = _make_adapter() 167 168 event = _make_event(text="Add this to queue") 169 sk = build_session_key(event.source) 170 runner.adapters[event.source.platform] = adapter 171 172 agent = MagicMock() 173 runner._running_agents[sk] = agent 174 175 with patch("gateway.run.merge_pending_message_event"): 176 await runner._handle_active_session_busy_message(event, sk) 177 178 # VERIFY: Agent was NOT interrupted 179 agent.interrupt.assert_not_called() 180 181 # VERIFY: Ack sent with queue-specific wording 182 adapter._send_with_retry.assert_called_once() 183 call_kwargs = adapter._send_with_retry.call_args 184 content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "") 185 assert "Queued for the next turn" in content 186 assert "respond once the current task finishes" in content 187 assert "Interrupting" not in content 188 189 @pytest.mark.asyncio 190 async def test_steer_mode_calls_agent_steer_no_interrupt_no_queue(self): 191 """busy_input_mode='steer' injects via agent.steer() and skips queueing.""" 192 runner, sentinel = _make_runner() 193 runner._busy_input_mode = "steer" 194 adapter = _make_adapter() 195 196 event = _make_event(text="also check the tests") 197 sk = build_session_key(event.source) 198 runner.adapters[event.source.platform] = adapter 199 200 agent = MagicMock() 201 agent.steer = MagicMock(return_value=True) 202 runner._running_agents[sk] = agent 203 204 with patch("gateway.run.merge_pending_message_event") as mock_merge: 205 await runner._handle_active_session_busy_message(event, sk) 206 207 # VERIFY: Agent was steered, NOT interrupted 208 agent.steer.assert_called_once_with("also check the tests") 209 agent.interrupt.assert_not_called() 210 211 # VERIFY: No queueing — successful steer must NOT replay as next turn 212 mock_merge.assert_not_called() 213 214 # VERIFY: Ack mentions steer wording 215 adapter._send_with_retry.assert_called_once() 216 call_kwargs = adapter._send_with_retry.call_args 217 content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "") 218 assert "Steered" in content or "steer" in content.lower() 219 assert "Interrupting" not in content 220 221 @pytest.mark.asyncio 222 async def test_steer_mode_falls_back_to_queue_when_agent_rejects(self): 223 """If agent.steer() returns False, fall back to queue behavior.""" 224 runner, sentinel = _make_runner() 225 runner._busy_input_mode = "steer" 226 adapter = _make_adapter() 227 228 event = _make_event(text="empty or rejected") 229 sk = build_session_key(event.source) 230 runner.adapters[event.source.platform] = adapter 231 232 agent = MagicMock() 233 agent.steer = MagicMock(return_value=False) # rejected 234 runner._running_agents[sk] = agent 235 236 with patch("gateway.run.merge_pending_message_event") as mock_merge: 237 await runner._handle_active_session_busy_message(event, sk) 238 239 agent.steer.assert_called_once() 240 agent.interrupt.assert_not_called() 241 # Fell back to queue semantics: event was merged into pending messages 242 mock_merge.assert_called_once() 243 244 # Ack uses queue-mode wording (not steer, not interrupt) 245 call_kwargs = adapter._send_with_retry.call_args 246 content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "") 247 assert "Queued for the next turn" in content 248 assert "Steered" not in content 249 250 @pytest.mark.asyncio 251 async def test_steer_mode_falls_back_to_queue_when_agent_pending(self): 252 """If agent is still starting (sentinel), steer mode falls back to queue.""" 253 runner, sentinel = _make_runner() 254 runner._busy_input_mode = "steer" 255 adapter = _make_adapter() 256 257 event = _make_event(text="arrived too early") 258 sk = build_session_key(event.source) 259 runner.adapters[event.source.platform] = adapter 260 261 # Agent is still being set up — sentinel in place 262 runner._running_agents[sk] = sentinel 263 264 with patch("gateway.run.merge_pending_message_event") as mock_merge: 265 await runner._handle_active_session_busy_message(event, sk) 266 267 # Event was queued instead of steered 268 mock_merge.assert_called_once() 269 270 call_kwargs = adapter._send_with_retry.call_args 271 content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "") 272 assert "Queued for the next turn" in content 273 274 @pytest.mark.asyncio 275 async def test_debounce_suppresses_rapid_acks(self): 276 """Second message within 30s should NOT send another ack.""" 277 runner, sentinel = _make_runner() 278 runner._busy_input_mode = "interrupt" 279 adapter = _make_adapter() 280 281 event1 = _make_event(text="hello?") 282 # Reuse the same source so platform mock matches 283 event2 = MessageEvent( 284 text="still there?", 285 message_type=MessageType.TEXT, 286 source=event1.source, 287 message_id="msg2", 288 ) 289 sk = build_session_key(event1.source) 290 291 agent = MagicMock() 292 agent.get_activity_summary.return_value = { 293 "api_call_count": 5, 294 "max_iterations": 60, 295 "current_tool": None, 296 "last_activity_ts": time.time(), 297 "last_activity_desc": "api_call", 298 "seconds_since_activity": 0.5, 299 } 300 runner._running_agents[sk] = agent 301 runner._running_agents_ts[sk] = time.time() - 60 302 runner.adapters[event1.source.platform] = adapter 303 304 # First message — should get ack 305 result1 = await runner._handle_active_session_busy_message(event1, sk) 306 assert result1 is True 307 assert adapter._send_with_retry.call_count == 1 308 309 # Second message within cooldown — should be queued but no ack 310 result2 = await runner._handle_active_session_busy_message(event2, sk) 311 assert result2 is True 312 assert adapter._send_with_retry.call_count == 1 # still 1, no new ack 313 314 # But interrupt should still be called for both (since we are in interrupt mode) 315 assert agent.interrupt.call_count == 2 316 317 @pytest.mark.asyncio 318 async def test_ack_after_cooldown_expires(self): 319 """After 30s cooldown, a new message should send a fresh ack.""" 320 runner, sentinel = _make_runner() 321 runner._busy_input_mode = "interrupt" 322 adapter = _make_adapter() 323 324 event = _make_event(text="hello?") 325 sk = build_session_key(event.source) 326 327 agent = MagicMock() 328 agent.get_activity_summary.return_value = { 329 "api_call_count": 10, 330 "max_iterations": 60, 331 "current_tool": "web_search", 332 "last_activity_ts": time.time(), 333 "last_activity_desc": "tool", 334 "seconds_since_activity": 0.5, 335 } 336 runner._running_agents[sk] = agent 337 runner._running_agents_ts[sk] = time.time() - 120 338 runner.adapters[event.source.platform] = adapter 339 340 # First ack 341 await runner._handle_active_session_busy_message(event, sk) 342 assert adapter._send_with_retry.call_count == 1 343 344 # Fake that cooldown expired 345 runner._busy_ack_ts[sk] = time.time() - 31 346 347 # Second ack should go through 348 await runner._handle_active_session_busy_message(event, sk) 349 assert adapter._send_with_retry.call_count == 2 350 351 @pytest.mark.asyncio 352 async def test_includes_status_detail(self): 353 """Ack message should include iteration and tool info when available.""" 354 runner, sentinel = _make_runner() 355 runner._busy_input_mode = "interrupt" 356 adapter = _make_adapter() 357 358 event = _make_event(text="yo") 359 sk = build_session_key(event.source) 360 361 agent = MagicMock() 362 agent.get_activity_summary.return_value = { 363 "api_call_count": 21, 364 "max_iterations": 60, 365 "current_tool": "terminal", 366 "last_activity_ts": time.time(), 367 "last_activity_desc": "terminal", 368 "seconds_since_activity": 0.5, 369 } 370 runner._running_agents[sk] = agent 371 runner._running_agents_ts[sk] = time.time() - 600 # 10 min 372 runner.adapters[event.source.platform] = adapter 373 374 await runner._handle_active_session_busy_message(event, sk) 375 376 call_kwargs = adapter._send_with_retry.call_args 377 content = call_kwargs.kwargs.get("content", "") 378 assert "21/60" in content # iteration 379 assert "terminal" in content # current tool 380 assert "10 min" in content # elapsed 381 382 @pytest.mark.asyncio 383 async def test_draining_still_works(self): 384 """Draining case should still produce the drain-specific message.""" 385 runner, sentinel = _make_runner() 386 runner._draining = True 387 runner._busy_input_mode = "interrupt" 388 adapter = _make_adapter() 389 390 event = _make_event(text="hello") 391 sk = build_session_key(event.source) 392 runner.adapters[event.source.platform] = adapter 393 394 # Mock the drain-specific methods 395 runner._queue_during_drain_enabled = lambda: False 396 runner._status_action_gerund = lambda: "restarting" 397 398 result = await runner._handle_active_session_busy_message(event, sk) 399 assert result is True 400 401 call_kwargs = adapter._send_with_retry.call_args 402 content = call_kwargs.kwargs.get("content", "") 403 assert "restarting" in content 404 405 @pytest.mark.asyncio 406 async def test_pending_sentinel_no_interrupt(self): 407 """When agent is PENDING_SENTINEL, don't call interrupt (it has no method).""" 408 runner, sentinel = _make_runner() 409 runner._busy_input_mode = "interrupt" 410 adapter = _make_adapter() 411 412 event = _make_event(text="hey") 413 sk = build_session_key(event.source) 414 415 runner._running_agents[sk] = sentinel 416 runner._running_agents_ts[sk] = time.time() 417 runner.adapters[event.source.platform] = adapter 418 419 result = await runner._handle_active_session_busy_message(event, sk) 420 assert result is True 421 # Should still send ack 422 adapter._send_with_retry.assert_called_once() 423 424 @pytest.mark.asyncio 425 async def test_no_adapter_falls_through(self): 426 """If adapter is missing, return False so default path handles it.""" 427 runner, sentinel = _make_runner() 428 429 event = _make_event(text="hello") 430 sk = build_session_key(event.source) 431 432 # No adapter registered 433 runner._running_agents[sk] = MagicMock() 434 435 result = await runner._handle_active_session_busy_message(event, sk) 436 assert result is False # not handled, let default path try 437 438 439 class TestBusySessionOnboardingHint: 440 """First-touch hint appended to the busy-ack the first time it fires.""" 441 442 @pytest.mark.asyncio 443 async def test_first_busy_ack_appends_interrupt_hint(self, tmp_path, monkeypatch): 444 """First busy-while-running message gets an extra hint about /busy.""" 445 import gateway.run as _gr 446 447 monkeypatch.setattr(_gr, "_hermes_home", tmp_path) 448 # mark_seen imports utils.atomic_yaml_write; make sure it resolves 449 # against a writable dir by pointing _hermes_home at tmp_path. 450 monkeypatch.setattr(_gr, "_load_gateway_config", lambda: {}) 451 452 runner, _sentinel = _make_runner() 453 runner._busy_input_mode = "interrupt" 454 adapter = _make_adapter() 455 456 event = _make_event(text="ping") 457 sk = build_session_key(event.source) 458 459 agent = MagicMock() 460 agent.get_activity_summary.return_value = { 461 "api_call_count": 3, "max_iterations": 60, 462 "current_tool": None, "last_activity_ts": time.time(), 463 "last_activity_desc": "api", "seconds_since_activity": 0.1, 464 } 465 runner._running_agents[sk] = agent 466 runner._running_agents_ts[sk] = time.time() - 5 467 runner.adapters[event.source.platform] = adapter 468 469 await runner._handle_active_session_busy_message(event, sk) 470 471 call_kwargs = adapter._send_with_retry.call_args 472 content = call_kwargs.kwargs.get("content", "") 473 474 # Normal ack body 475 assert "Interrupting" in content 476 # First-touch hint appended 477 assert "First-time tip" in content 478 assert "/busy queue" in content 479 480 # The flag is now persisted to tmp_path/config.yaml 481 import yaml 482 cfg = yaml.safe_load((tmp_path / "config.yaml").read_text()) 483 assert cfg["onboarding"]["seen"]["busy_input_prompt"] is True 484 485 @pytest.mark.asyncio 486 async def test_second_busy_ack_omits_hint(self, tmp_path, monkeypatch): 487 """Once the flag is marked, the hint never appears again.""" 488 import gateway.run as _gr 489 import yaml 490 491 monkeypatch.setattr(_gr, "_hermes_home", tmp_path) 492 # Pre-populate the config so is_seen() returns True from the start. 493 (tmp_path / "config.yaml").write_text(yaml.safe_dump({ 494 "onboarding": {"seen": {"busy_input_prompt": True}}, 495 })) 496 monkeypatch.setattr( 497 _gr, "_load_gateway_config", 498 lambda: yaml.safe_load((tmp_path / "config.yaml").read_text()), 499 ) 500 501 runner, _sentinel = _make_runner() 502 runner._busy_input_mode = "interrupt" 503 adapter = _make_adapter() 504 505 event = _make_event(text="ping again") 506 sk = build_session_key(event.source) 507 508 agent = MagicMock() 509 agent.get_activity_summary.return_value = { 510 "api_call_count": 3, "max_iterations": 60, 511 "current_tool": None, "last_activity_ts": time.time(), 512 "last_activity_desc": "api", "seconds_since_activity": 0.1, 513 } 514 runner._running_agents[sk] = agent 515 runner._running_agents_ts[sk] = time.time() - 5 516 runner.adapters[event.source.platform] = adapter 517 518 await runner._handle_active_session_busy_message(event, sk) 519 520 call_kwargs = adapter._send_with_retry.call_args 521 content = call_kwargs.kwargs.get("content", "") 522 523 assert "Interrupting" in content 524 assert "First-time tip" not in content 525 assert "/busy queue" not in content 526 527 @pytest.mark.asyncio 528 async def test_queue_mode_hint_points_to_interrupt(self, tmp_path, monkeypatch): 529 """In queue mode the hint should suggest /busy interrupt, not /busy queue.""" 530 import gateway.run as _gr 531 532 monkeypatch.setattr(_gr, "_hermes_home", tmp_path) 533 monkeypatch.setattr(_gr, "_load_gateway_config", lambda: {}) 534 535 runner, _sentinel = _make_runner() 536 runner._busy_input_mode = "queue" 537 adapter = _make_adapter() 538 539 event = _make_event(text="queue me") 540 sk = build_session_key(event.source) 541 runner.adapters[event.source.platform] = adapter 542 543 agent = MagicMock() 544 runner._running_agents[sk] = agent 545 546 with patch("gateway.run.merge_pending_message_event"): 547 await runner._handle_active_session_busy_message(event, sk) 548 549 content = adapter._send_with_retry.call_args.kwargs.get("content", "") 550 assert "Queued for the next turn" in content 551 assert "First-time tip" in content 552 assert "/busy interrupt" in content 553 # Must NOT tell the user to /busy queue when they're already on queue. 554 assert "/busy queue" not in content