test_stream_consumer.py
1 """Tests for GatewayStreamConsumer — media directive stripping in streaming.""" 2 3 import asyncio 4 from types import SimpleNamespace 5 from unittest.mock import AsyncMock, MagicMock 6 7 import pytest 8 9 from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig 10 11 12 # ── _clean_for_display unit tests ──────────────────────────────────────── 13 14 15 class TestCleanForDisplay: 16 """Verify MEDIA: directives and internal markers are stripped from display text.""" 17 18 def test_no_media_passthrough(self): 19 """Text without MEDIA: passes through unchanged.""" 20 text = "Here is your analysis of the image." 21 assert GatewayStreamConsumer._clean_for_display(text) == text 22 23 def test_media_tag_stripped(self): 24 """Basic MEDIA:<path> tag is removed.""" 25 text = "Here is the image\nMEDIA:/tmp/hermes/image.png" 26 result = GatewayStreamConsumer._clean_for_display(text) 27 assert "MEDIA:" not in result 28 assert "Here is the image" in result 29 30 def test_media_tag_with_space(self): 31 """MEDIA: tag with space after colon is removed.""" 32 text = "Audio generated\nMEDIA: /home/user/.hermes/audio_cache/voice.mp3" 33 result = GatewayStreamConsumer._clean_for_display(text) 34 assert "MEDIA:" not in result 35 assert "Audio generated" in result 36 37 def test_media_tag_with_quotes(self): 38 """MEDIA: tags wrapped in quotes or backticks are removed.""" 39 for wrapper in ['`MEDIA:/path/file.png`', '"MEDIA:/path/file.png"', "'MEDIA:/path/file.png'"]: 40 text = f"Result: {wrapper}" 41 result = GatewayStreamConsumer._clean_for_display(text) 42 assert "MEDIA:" not in result, f"Failed for wrapper: {wrapper}" 43 44 def test_audio_as_voice_stripped(self): 45 """[[audio_as_voice]] directive is removed.""" 46 text = "[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg" 47 result = GatewayStreamConsumer._clean_for_display(text) 48 assert "[[audio_as_voice]]" not in result 49 assert "MEDIA:" not in result 50 51 def test_multiple_media_tags(self): 52 """Multiple MEDIA: tags are all removed.""" 53 text = "Here are two files:\nMEDIA:/tmp/a.png\nMEDIA:/tmp/b.jpg" 54 result = GatewayStreamConsumer._clean_for_display(text) 55 assert "MEDIA:" not in result 56 assert "Here are two files:" in result 57 58 def test_excessive_newlines_collapsed(self): 59 """Blank lines left by removed tags are collapsed.""" 60 text = "Before\n\n\nMEDIA:/tmp/file.png\n\n\nAfter" 61 result = GatewayStreamConsumer._clean_for_display(text) 62 # Should not have 3+ consecutive newlines 63 assert "\n\n\n" not in result 64 65 def test_media_only_response(self): 66 """Response that is entirely MEDIA: tags returns empty/whitespace.""" 67 text = "MEDIA:/tmp/image.png" 68 result = GatewayStreamConsumer._clean_for_display(text) 69 assert result.strip() == "" 70 71 def test_media_mid_sentence(self): 72 """MEDIA: tag embedded in prose is stripped cleanly.""" 73 text = "I generated this image MEDIA:/tmp/art.png for you." 74 result = GatewayStreamConsumer._clean_for_display(text) 75 assert "MEDIA:" not in result 76 assert "generated" in result 77 assert "for you." in result 78 79 def test_preserves_non_media_colons(self): 80 """Normal colons and text with 'MEDIA' as a word aren't stripped.""" 81 text = "The media: files are stored in /tmp. Use social MEDIA carefully." 82 result = GatewayStreamConsumer._clean_for_display(text) 83 # "MEDIA:" in upper case without a path won't match \S+ (space follows) 84 # But "media:" is lowercase so won't match either 85 assert result == text 86 87 88 # ── Integration: _send_or_edit strips MEDIA: ───────────────────────────── 89 90 91 class TestFinalizeCapabilityGate: 92 """Verify REQUIRES_EDIT_FINALIZE gates the redundant final edit. 93 94 Platforms that don't need an explicit finalize signal (Telegram, 95 Slack, Matrix, …) should skip the redundant final edit when the 96 mid-stream edit already delivered the final content. Platforms that 97 *do* need it (DingTalk AI Cards) must always receive a finalize=True 98 edit at the end of the stream. 99 """ 100 101 @pytest.mark.asyncio 102 async def test_identical_text_skip_respects_adapter_flag(self): 103 """_send_or_edit short-circuits identical-text only when the 104 adapter doesn't require an explicit finalize signal.""" 105 # Adapter without finalize requirement — should skip identical edit. 106 plain = MagicMock() 107 plain.REQUIRES_EDIT_FINALIZE = False 108 plain.send = AsyncMock(return_value=SimpleNamespace( 109 success=True, message_id="m1", 110 )) 111 plain.edit_message = AsyncMock() 112 plain.MAX_MESSAGE_LENGTH = 4096 113 c1 = GatewayStreamConsumer(plain, "chat_1") 114 await c1._send_or_edit("hello") # first send 115 await c1._send_or_edit("hello", finalize=True) # identical → skip 116 plain.edit_message.assert_not_called() 117 118 # Adapter that requires finalize — must still fire the edit. 119 picky = MagicMock() 120 picky.REQUIRES_EDIT_FINALIZE = True 121 picky.send = AsyncMock(return_value=SimpleNamespace( 122 success=True, message_id="m1", 123 )) 124 picky.edit_message = AsyncMock(return_value=SimpleNamespace( 125 success=True, message_id="m1", 126 )) 127 picky.MAX_MESSAGE_LENGTH = 4096 128 c2 = GatewayStreamConsumer(picky, "chat_1") 129 await c2._send_or_edit("hello") 130 await c2._send_or_edit("hello", finalize=True) 131 # Finalize edit must go through even on identical content. 132 picky.edit_message.assert_called_once() 133 assert picky.edit_message.call_args[1]["finalize"] is True 134 135 136 class TestEditMessageFinalizeSignature: 137 """Every concrete platform adapter must accept the ``finalize`` kwarg. 138 139 stream_consumer._send_or_edit always passes ``finalize=`` to 140 ``adapter.edit_message(...)`` (see gateway/stream_consumer.py). An 141 adapter that overrides edit_message without accepting finalize raises 142 TypeError the first time streaming hits a segment break or final edit. 143 Guard the contract with an explicit signature check so it cannot 144 silently regress — existing tests use MagicMock which swallows any 145 kwarg and cannot catch this. 146 """ 147 148 @pytest.mark.parametrize( 149 "module_path,class_name", 150 [ 151 ("gateway.platforms.telegram", "TelegramAdapter"), 152 ("gateway.platforms.discord", "DiscordAdapter"), 153 ("gateway.platforms.slack", "SlackAdapter"), 154 ("gateway.platforms.matrix", "MatrixAdapter"), 155 ("gateway.platforms.mattermost", "MattermostAdapter"), 156 ("gateway.platforms.feishu", "FeishuAdapter"), 157 ("gateway.platforms.whatsapp", "WhatsAppAdapter"), 158 ("gateway.platforms.dingtalk", "DingTalkAdapter"), 159 ], 160 ) 161 def test_edit_message_accepts_finalize(self, module_path, class_name): 162 import inspect 163 164 module = pytest.importorskip(module_path) 165 cls = getattr(module, class_name) 166 params = inspect.signature(cls.edit_message).parameters 167 assert "finalize" in params, ( 168 f"{class_name}.edit_message must accept 'finalize' kwarg; " 169 f"stream_consumer._send_or_edit passes it unconditionally" 170 ) 171 172 173 class TestSendOrEditMediaStripping: 174 """Verify _send_or_edit strips MEDIA: before sending to the platform.""" 175 176 @pytest.mark.asyncio 177 async def test_first_send_strips_media(self): 178 """Initial send removes MEDIA: tags from visible text.""" 179 adapter = MagicMock() 180 send_result = SimpleNamespace(success=True, message_id="msg_1") 181 adapter.send = AsyncMock(return_value=send_result) 182 adapter.MAX_MESSAGE_LENGTH = 4096 183 184 consumer = GatewayStreamConsumer(adapter, "chat_123") 185 await consumer._send_or_edit("Here is your image\nMEDIA:/tmp/test.png") 186 187 adapter.send.assert_called_once() 188 sent_text = adapter.send.call_args[1]["content"] 189 assert "MEDIA:" not in sent_text 190 assert "Here is your image" in sent_text 191 192 @pytest.mark.asyncio 193 async def test_edit_strips_media(self): 194 """Edit call removes MEDIA: tags from visible text.""" 195 adapter = MagicMock() 196 send_result = SimpleNamespace(success=True, message_id="msg_1") 197 edit_result = SimpleNamespace(success=True) 198 adapter.send = AsyncMock(return_value=send_result) 199 adapter.edit_message = AsyncMock(return_value=edit_result) 200 adapter.MAX_MESSAGE_LENGTH = 4096 201 202 consumer = GatewayStreamConsumer(adapter, "chat_123") 203 # First send 204 await consumer._send_or_edit("Starting response...") 205 # Edit with MEDIA: tag 206 await consumer._send_or_edit("Here is the result\nMEDIA:/tmp/image.png") 207 208 adapter.edit_message.assert_called_once() 209 edited_text = adapter.edit_message.call_args[1]["content"] 210 assert "MEDIA:" not in edited_text 211 212 @pytest.mark.asyncio 213 async def test_media_only_skips_send(self): 214 """If text is entirely MEDIA: tags, the send is skipped.""" 215 adapter = MagicMock() 216 adapter.send = AsyncMock() 217 adapter.MAX_MESSAGE_LENGTH = 4096 218 219 consumer = GatewayStreamConsumer(adapter, "chat_123") 220 await consumer._send_or_edit("MEDIA:/tmp/image.png") 221 222 adapter.send.assert_not_called() 223 224 @pytest.mark.asyncio 225 async def test_cursor_only_update_skips_send(self): 226 """A bare streaming cursor should not be sent as its own message.""" 227 adapter = MagicMock() 228 adapter.send = AsyncMock() 229 adapter.MAX_MESSAGE_LENGTH = 4096 230 231 consumer = GatewayStreamConsumer( 232 adapter, 233 "chat_123", 234 StreamConsumerConfig(cursor=" ▉"), 235 ) 236 await consumer._send_or_edit(" ▉") 237 238 adapter.send.assert_not_called() 239 240 @pytest.mark.asyncio 241 async def test_short_text_with_cursor_skips_new_message(self): 242 """Short text + cursor should not create a standalone new message. 243 244 During rapid tool-calling the model often emits 1-2 tokens before 245 switching to tool calls. Sending 'I ▉' as a new message risks 246 leaving the cursor permanently visible if the follow-up edit is 247 rate-limited. The guard should skip the first send and let the 248 text accumulate into the next segment. 249 """ 250 adapter = MagicMock() 251 adapter.send = AsyncMock() 252 adapter.MAX_MESSAGE_LENGTH = 4096 253 254 consumer = GatewayStreamConsumer( 255 adapter, 256 "chat_123", 257 StreamConsumerConfig(cursor=" ▉"), 258 ) 259 # No message_id yet (first send) — short text + cursor should be skipped 260 assert consumer._message_id is None 261 result = await consumer._send_or_edit("I ▉") 262 assert result is True 263 adapter.send.assert_not_called() 264 265 # 3 chars is still under the threshold 266 result = await consumer._send_or_edit("Hi! ▉") 267 assert result is True 268 adapter.send.assert_not_called() 269 270 @pytest.mark.asyncio 271 async def test_longer_text_with_cursor_sends_new_message(self): 272 """Text >= 4 visible chars + cursor should create a new message normally.""" 273 adapter = MagicMock() 274 send_result = SimpleNamespace(success=True, message_id="msg_1") 275 adapter.send = AsyncMock(return_value=send_result) 276 adapter.MAX_MESSAGE_LENGTH = 4096 277 278 consumer = GatewayStreamConsumer( 279 adapter, 280 "chat_123", 281 StreamConsumerConfig(cursor=" ▉"), 282 ) 283 result = await consumer._send_or_edit("Hello ▉") 284 assert result is True 285 adapter.send.assert_called_once() 286 287 @pytest.mark.asyncio 288 async def test_short_text_without_cursor_sends_normally(self): 289 """Short text without cursor (e.g. final edit) should send normally.""" 290 adapter = MagicMock() 291 send_result = SimpleNamespace(success=True, message_id="msg_1") 292 adapter.send = AsyncMock(return_value=send_result) 293 adapter.MAX_MESSAGE_LENGTH = 4096 294 295 consumer = GatewayStreamConsumer( 296 adapter, 297 "chat_123", 298 StreamConsumerConfig(cursor=" ▉"), 299 ) 300 # No cursor in text — even short text should be sent 301 result = await consumer._send_or_edit("OK") 302 assert result is True 303 adapter.send.assert_called_once() 304 305 @pytest.mark.asyncio 306 async def test_short_text_cursor_edit_existing_message_allowed(self): 307 """Short text + cursor editing an existing message should proceed.""" 308 adapter = MagicMock() 309 edit_result = SimpleNamespace(success=True) 310 adapter.edit_message = AsyncMock(return_value=edit_result) 311 adapter.MAX_MESSAGE_LENGTH = 4096 312 313 consumer = GatewayStreamConsumer( 314 adapter, 315 "chat_123", 316 StreamConsumerConfig(cursor=" ▉"), 317 ) 318 consumer._message_id = "msg_1" # Existing message — guard should not fire 319 consumer._last_sent_text = "" 320 result = await consumer._send_or_edit("I ▉") 321 assert result is True 322 adapter.edit_message.assert_called_once() 323 324 325 # ── Integration: full stream run ───────────────────────────────────────── 326 327 328 class TestStreamRunMediaStripping: 329 """End-to-end: deltas with MEDIA: produce clean visible text.""" 330 331 @pytest.mark.asyncio 332 async def test_stream_with_media_tag(self): 333 """Full stream run strips MEDIA: from the final visible message.""" 334 adapter = MagicMock() 335 send_result = SimpleNamespace(success=True, message_id="msg_1") 336 edit_result = SimpleNamespace(success=True) 337 adapter.send = AsyncMock(return_value=send_result) 338 adapter.edit_message = AsyncMock(return_value=edit_result) 339 adapter.MAX_MESSAGE_LENGTH = 4096 340 341 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 342 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 343 344 # Feed deltas 345 consumer.on_delta("Here is your generated image\n") 346 consumer.on_delta("MEDIA:/home/user/.hermes/cache/images/abc123.png") 347 consumer.finish() 348 349 await consumer.run() 350 351 # Verify the final text sent/edited doesn't contain MEDIA: 352 all_calls = [] 353 for call in adapter.send.call_args_list: 354 all_calls.append(call[1].get("content", "")) 355 for call in adapter.edit_message.call_args_list: 356 all_calls.append(call[1].get("content", "")) 357 358 for sent_text in all_calls: 359 assert "MEDIA:" not in sent_text, f"MEDIA: leaked into display: {sent_text!r}" 360 361 assert consumer.already_sent 362 363 364 # ── Segment break (tool boundary) tests ────────────────────────────────── 365 366 367 class TestSegmentBreakOnToolBoundary: 368 """Verify that on_delta(None) finalizes the current message and starts a 369 new one so the final response appears below tool-progress messages.""" 370 371 @pytest.mark.asyncio 372 async def test_segment_break_creates_new_message(self): 373 """After a None boundary, next text creates a fresh message.""" 374 adapter = MagicMock() 375 send_result_1 = SimpleNamespace(success=True, message_id="msg_1") 376 send_result_2 = SimpleNamespace(success=True, message_id="msg_2") 377 edit_result = SimpleNamespace(success=True) 378 adapter.send = AsyncMock(side_effect=[send_result_1, send_result_2]) 379 adapter.edit_message = AsyncMock(return_value=edit_result) 380 adapter.MAX_MESSAGE_LENGTH = 4096 381 382 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 383 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 384 385 # Phase 1: intermediate text before tool calls 386 consumer.on_delta("Let me search for that...") 387 # Tool boundary — model is about to call tools 388 consumer.on_delta(None) 389 # Phase 2: final response text after tools finished 390 consumer.on_delta("Here are the results.") 391 consumer.finish() 392 393 await consumer.run() 394 395 # Should have sent TWO separate messages (two adapter.send calls), 396 # not just edited the first one. 397 assert adapter.send.call_count == 2 398 first_text = adapter.send.call_args_list[0][1]["content"] 399 second_text = adapter.send.call_args_list[1][1]["content"] 400 assert "search" in first_text 401 assert "results" in second_text 402 403 @pytest.mark.asyncio 404 async def test_segment_break_no_text_before(self): 405 """A None boundary with no preceding text is a no-op.""" 406 adapter = MagicMock() 407 send_result = SimpleNamespace(success=True, message_id="msg_1") 408 adapter.send = AsyncMock(return_value=send_result) 409 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 410 adapter.MAX_MESSAGE_LENGTH = 4096 411 412 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 413 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 414 415 # No text before the boundary — model went straight to tool calls 416 consumer.on_delta(None) 417 consumer.on_delta("Final answer.") 418 consumer.finish() 419 420 await consumer.run() 421 422 # Only one send call (the final answer) 423 assert adapter.send.call_count == 1 424 assert "Final answer" in adapter.send.call_args_list[0][1]["content"] 425 426 @pytest.mark.asyncio 427 async def test_segment_break_removes_cursor(self): 428 """The finalized segment message should not have a cursor.""" 429 adapter = MagicMock() 430 send_result = SimpleNamespace(success=True, message_id="msg_1") 431 edit_result = SimpleNamespace(success=True) 432 adapter.send = AsyncMock(return_value=send_result) 433 adapter.edit_message = AsyncMock(return_value=edit_result) 434 adapter.MAX_MESSAGE_LENGTH = 4096 435 436 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") 437 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 438 439 consumer.on_delta("Thinking...") 440 consumer.on_delta(None) 441 consumer.on_delta("Done.") 442 consumer.finish() 443 444 await consumer.run() 445 446 # The first segment should have been finalized without cursor. 447 # Check all edit_message calls + the initial send for the first segment. 448 # The last state of msg_1 should NOT have the cursor. 449 all_texts = [] 450 for call in adapter.send.call_args_list: 451 all_texts.append(call[1].get("content", "")) 452 for call in adapter.edit_message.call_args_list: 453 all_texts.append(call[1].get("content", "")) 454 455 # Find the text(s) that contain "Thinking" — the finalized version 456 # should not have the cursor. 457 thinking_texts = [t for t in all_texts if "Thinking" in t] 458 assert thinking_texts, "Expected at least one message with 'Thinking'" 459 # The LAST occurrence is the finalized version 460 assert "▉" not in thinking_texts[-1], ( 461 f"Cursor found in finalized segment: {thinking_texts[-1]!r}" 462 ) 463 464 @pytest.mark.asyncio 465 async def test_multiple_segment_breaks(self): 466 """Multiple tool boundaries create multiple message segments.""" 467 adapter = MagicMock() 468 msg_counter = iter(["msg_1", "msg_2", "msg_3"]) 469 adapter.send = AsyncMock( 470 side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter)) 471 ) 472 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 473 adapter.MAX_MESSAGE_LENGTH = 4096 474 475 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 476 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 477 478 consumer.on_delta("Phase 1") 479 consumer.on_delta(None) # tool boundary 480 consumer.on_delta("Phase 2") 481 consumer.on_delta(None) # another tool boundary 482 consumer.on_delta("Phase 3") 483 consumer.finish() 484 485 await consumer.run() 486 487 # Three separate messages 488 assert adapter.send.call_count == 3 489 490 @pytest.mark.asyncio 491 async def test_already_sent_stays_true_after_segment(self): 492 """already_sent remains True after a segment break.""" 493 adapter = MagicMock() 494 send_result = SimpleNamespace(success=True, message_id="msg_1") 495 adapter.send = AsyncMock(return_value=send_result) 496 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 497 adapter.MAX_MESSAGE_LENGTH = 4096 498 499 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 500 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 501 502 consumer.on_delta("Text") 503 consumer.on_delta(None) 504 consumer.finish() 505 506 await consumer.run() 507 508 assert consumer.already_sent 509 510 @pytest.mark.asyncio 511 async def test_edit_failure_sends_only_unsent_tail_at_finish(self): 512 """If an edit fails mid-stream, send only the missing tail once at finish.""" 513 adapter = MagicMock() 514 send_results = [ 515 SimpleNamespace(success=True, message_id="msg_1"), 516 SimpleNamespace(success=True, message_id="msg_2"), 517 ] 518 adapter.send = AsyncMock(side_effect=send_results) 519 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6")) 520 adapter.MAX_MESSAGE_LENGTH = 4096 521 522 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") 523 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 524 525 consumer.on_delta("Hello") 526 task = asyncio.create_task(consumer.run()) 527 await asyncio.sleep(0.08) 528 consumer.on_delta(" world") 529 await asyncio.sleep(0.08) 530 consumer.finish() 531 await task 532 533 assert adapter.send.call_count == 2 534 first_text = adapter.send.call_args_list[0][1]["content"] 535 second_text = adapter.send.call_args_list[1][1]["content"] 536 assert "Hello" in first_text 537 assert second_text.strip() == "world" 538 assert consumer.already_sent 539 540 @pytest.mark.asyncio 541 async def test_segment_break_clears_failed_edit_fallback_state(self): 542 """A tool boundary after edit failure must flush the undelivered tail 543 without duplicating the prefix the user already saw (#8124).""" 544 adapter = MagicMock() 545 send_results = [ 546 SimpleNamespace(success=True, message_id="msg_1"), 547 SimpleNamespace(success=True, message_id="msg_2"), 548 SimpleNamespace(success=True, message_id="msg_3"), 549 ] 550 adapter.send = AsyncMock(side_effect=send_results) 551 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6")) 552 adapter.MAX_MESSAGE_LENGTH = 4096 553 554 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") 555 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 556 557 consumer.on_delta("Hello") 558 task = asyncio.create_task(consumer.run()) 559 await asyncio.sleep(0.08) 560 consumer.on_delta(" world") 561 await asyncio.sleep(0.08) 562 consumer.on_delta(None) 563 consumer.on_delta("Next segment") 564 consumer.finish() 565 await task 566 567 sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] 568 # The undelivered "world" tail must reach the user, and the next 569 # segment must not duplicate "Hello" that was already visible. 570 assert sent_texts == ["Hello ▉", "world", "Next segment"] 571 572 @pytest.mark.asyncio 573 async def test_segment_break_after_mid_stream_edit_failure_preserves_tail(self): 574 """Regression for #8124: when an earlier edit succeeded but later edits 575 fail (persistent flood control) and a tool boundary arrives before the 576 fallback threshold is reached, the pre-boundary tail must still be 577 delivered — not silently dropped by the segment reset.""" 578 adapter = MagicMock() 579 # msg_1 for the initial partial, msg_2 for the flushed tail, 580 # msg_3 for the post-boundary segment. 581 send_results = [ 582 SimpleNamespace(success=True, message_id="msg_1"), 583 SimpleNamespace(success=True, message_id="msg_2"), 584 SimpleNamespace(success=True, message_id="msg_3"), 585 ] 586 adapter.send = AsyncMock(side_effect=send_results) 587 588 # First two edits succeed, everything after fails with flood control 589 # — simulating Telegram's "edit once then get rate-limited" pattern. 590 edit_results = [ 591 SimpleNamespace(success=True), # "Hello world ▉" — succeeds 592 SimpleNamespace(success=False, error="flood_control:6.0"), # "Hello world more ▉" — flood triggered 593 SimpleNamespace(success=False, error="flood_control:6.0"), # finalize edit at segment break 594 SimpleNamespace(success=False, error="flood_control:6.0"), # cursor-strip attempt 595 ] 596 adapter.edit_message = AsyncMock(side_effect=edit_results + [edit_results[-1]] * 10) 597 adapter.MAX_MESSAGE_LENGTH = 4096 598 599 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") 600 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 601 602 consumer.on_delta("Hello") 603 task = asyncio.create_task(consumer.run()) 604 await asyncio.sleep(0.08) 605 consumer.on_delta(" world") 606 await asyncio.sleep(0.08) 607 consumer.on_delta(" more") 608 await asyncio.sleep(0.08) 609 consumer.on_delta(None) # tool boundary 610 consumer.on_delta("Here is the tool result.") 611 consumer.finish() 612 await task 613 614 sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] 615 # "more" must have been delivered, not dropped. 616 all_text = " ".join(sent_texts) 617 assert "more" in all_text, ( 618 f"Pre-boundary tail 'more' was silently dropped: sends={sent_texts}" 619 ) 620 # Post-boundary text must also reach the user. 621 assert "Here is the tool result." in all_text 622 623 @pytest.mark.asyncio 624 async def test_no_message_id_enters_fallback_mode(self): 625 """Platform returns success but no message_id (Signal) — must not 626 re-send on every delta. Should enter fallback mode and send only 627 the continuation at finish.""" 628 adapter = MagicMock() 629 # First send succeeds but returns no message_id (Signal behavior) 630 send_result_no_id = SimpleNamespace(success=True, message_id=None) 631 # Fallback final send succeeds 632 send_result_final = SimpleNamespace(success=True, message_id="msg_final") 633 adapter.send = AsyncMock(side_effect=[send_result_no_id, send_result_final]) 634 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 635 adapter.MAX_MESSAGE_LENGTH = 4096 636 637 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 638 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 639 640 consumer.on_delta("Hello") 641 task = asyncio.create_task(consumer.run()) 642 await asyncio.sleep(0.08) 643 consumer.on_delta(" world, this is a longer response.") 644 await asyncio.sleep(0.08) 645 consumer.finish() 646 await task 647 648 # Should send exactly 2 messages: initial chunk + fallback continuation 649 # NOT one message per delta 650 assert adapter.send.call_count == 2 651 assert consumer.already_sent 652 # edit_message should NOT have been called (no valid message_id to edit) 653 adapter.edit_message.assert_not_called() 654 655 @pytest.mark.asyncio 656 async def test_no_message_id_single_delta_marks_already_sent(self): 657 """When the entire response fits in one delta and platform returns no 658 message_id, already_sent must still be True to prevent the gateway 659 from re-sending the full response.""" 660 adapter = MagicMock() 661 send_result = SimpleNamespace(success=True, message_id=None) 662 adapter.send = AsyncMock(return_value=send_result) 663 adapter.MAX_MESSAGE_LENGTH = 4096 664 665 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 666 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 667 668 consumer.on_delta("Short response.") 669 consumer.finish() 670 671 await consumer.run() 672 673 assert consumer.already_sent 674 # Only one send call (the initial message) 675 assert adapter.send.call_count == 1 676 677 @pytest.mark.asyncio 678 async def test_no_message_id_segment_breaks_do_not_resend(self): 679 """On a platform that never returns a message_id (e.g. webhook with 680 github_comment delivery), tool-call segment breaks must NOT trigger 681 a new adapter.send() per boundary. The fix: _message_id == '__no_edit__' 682 suppresses the reset so all text accumulates and is sent once.""" 683 adapter = MagicMock() 684 # No message_id on first send, then one more for the fallback final 685 adapter.send = AsyncMock(side_effect=[ 686 SimpleNamespace(success=True, message_id=None), 687 SimpleNamespace(success=True, message_id=None), 688 ]) 689 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 690 adapter.MAX_MESSAGE_LENGTH = 4096 691 692 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 693 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 694 695 # Simulate: text → tool boundary → text → tool boundary → text (3 segments) 696 consumer.on_delta("Phase 1 text") 697 consumer.on_delta(None) # tool call boundary 698 consumer.on_delta("Phase 2 text") 699 consumer.on_delta(None) # another tool call boundary 700 consumer.on_delta("Phase 3 text") 701 consumer.finish() 702 703 await consumer.run() 704 705 # Before the fix this would post 3 comments (one per segment). 706 # After the fix: only the initial partial + one fallback-final continuation. 707 assert adapter.send.call_count == 2, ( 708 f"Expected 2 sends (initial + fallback), got {adapter.send.call_count}" 709 ) 710 assert consumer.already_sent 711 # The continuation must contain the text from segments 2 and 3 712 final_text = adapter.send.call_args_list[1][1]["content"] 713 assert "Phase 2" in final_text 714 assert "Phase 3" in final_text 715 716 @pytest.mark.asyncio 717 async def test_fallback_final_splits_long_continuation_without_dropping_text(self): 718 """Long continuation tails should be chunked when fallback final-send runs.""" 719 adapter = MagicMock() 720 adapter.send = AsyncMock(side_effect=[ 721 SimpleNamespace(success=True, message_id="msg_1"), 722 SimpleNamespace(success=True, message_id="msg_2"), 723 SimpleNamespace(success=True, message_id="msg_3"), 724 ]) 725 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6")) 726 adapter.MAX_MESSAGE_LENGTH = 610 727 728 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") 729 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 730 731 prefix = "Hello world" 732 tail = "x" * 620 733 consumer.on_delta(prefix) 734 task = asyncio.create_task(consumer.run()) 735 await asyncio.sleep(0.08) 736 consumer.on_delta(tail) 737 await asyncio.sleep(0.08) 738 consumer.finish() 739 await task 740 741 sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] 742 assert len(sent_texts) == 3 743 assert sent_texts[0].startswith(prefix) 744 assert sum(len(t) for t in sent_texts[1:]) == len(tail) 745 746 @pytest.mark.asyncio 747 async def test_fallback_final_sends_full_text_at_tool_boundary(self): 748 """After a tool call, the streamed prefix is stale (from the pre-tool 749 segment). _send_fallback_final must still send the post-tool response 750 even when continuation_text calculates as empty (#10807).""" 751 adapter = MagicMock() 752 adapter.send = AsyncMock( 753 return_value=SimpleNamespace(success=True, message_id="msg_1"), 754 ) 755 adapter.edit_message = AsyncMock( 756 return_value=SimpleNamespace(success=True), 757 ) 758 adapter.MAX_MESSAGE_LENGTH = 4096 759 760 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) 761 consumer = GatewayStreamConsumer(adapter, "chat_123", config) 762 763 # Simulate a pre-tool streamed segment that becomes the visible prefix 764 pre_tool_text = "I'll run that code now." 765 consumer.on_delta(pre_tool_text) 766 task = asyncio.create_task(consumer.run()) 767 await asyncio.sleep(0.05) 768 769 # After the tool call, the model returns a SHORT final response that 770 # does NOT start with the pre-tool prefix. The continuation calculator 771 # would return empty (no prefix match → full text returned, but if the 772 # streaming edit already showed pre_tool_text, the prefix-based logic 773 # wrongly matches). Simulate this by setting _last_sent_text to the 774 # pre-tool content, then finishing with different post-tool content. 775 consumer._last_sent_text = pre_tool_text 776 post_tool_response = "⏰ Script timed out after 30s and was killed." 777 consumer.finish() 778 await task 779 780 # The fallback should send the post-tool response via 781 # _send_fallback_final. 782 await consumer._send_fallback_final(post_tool_response) 783 784 # Verify the final text was sent (not silently dropped) 785 sent = False 786 for call in adapter.send.call_args_list: 787 content = call[1].get("content", call[0][0] if call[0] else "") 788 if "timed out" in str(content): 789 sent = True 790 break 791 assert sent, ( 792 "Post-tool timeout response was silently dropped by " 793 "_send_fallback_final — the #10807 fix should prevent this" 794 ) 795 796 797 class TestInterimCommentaryMessages: 798 @pytest.mark.asyncio 799 async def test_commentary_message_stays_separate_from_final_stream(self): 800 adapter = MagicMock() 801 adapter.send = AsyncMock(side_effect=[ 802 SimpleNamespace(success=True, message_id="msg_1"), 803 SimpleNamespace(success=True, message_id="msg_2"), 804 ]) 805 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 806 adapter.MAX_MESSAGE_LENGTH = 4096 807 808 consumer = GatewayStreamConsumer( 809 adapter, 810 "chat_123", 811 StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), 812 ) 813 814 consumer.on_commentary("I'll inspect the repository first.") 815 consumer.on_delta("Done.") 816 consumer.finish() 817 818 await consumer.run() 819 820 sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] 821 assert sent_texts == ["I'll inspect the repository first.", "Done."] 822 assert consumer.final_response_sent is True 823 824 @pytest.mark.asyncio 825 async def test_failed_final_send_does_not_mark_final_response_sent(self): 826 adapter = MagicMock() 827 adapter.send = AsyncMock(return_value=SimpleNamespace(success=False, message_id=None)) 828 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 829 adapter.MAX_MESSAGE_LENGTH = 4096 830 831 consumer = GatewayStreamConsumer( 832 adapter, 833 "chat_123", 834 StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), 835 ) 836 837 consumer.on_delta("Done.") 838 consumer.finish() 839 840 await consumer.run() 841 842 assert consumer.final_response_sent is False 843 assert consumer.already_sent is False 844 845 @pytest.mark.asyncio 846 async def test_success_without_message_id_marks_visible_and_sends_only_tail(self): 847 adapter = MagicMock() 848 adapter.send = AsyncMock(side_effect=[ 849 SimpleNamespace(success=True, message_id=None), 850 SimpleNamespace(success=True, message_id=None), 851 ]) 852 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 853 adapter.MAX_MESSAGE_LENGTH = 4096 854 855 consumer = GatewayStreamConsumer( 856 adapter, 857 "chat_123", 858 StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉"), 859 ) 860 861 consumer.on_delta("Hello") 862 task = asyncio.create_task(consumer.run()) 863 await asyncio.sleep(0.08) 864 consumer.on_delta(" world") 865 await asyncio.sleep(0.08) 866 consumer.finish() 867 await task 868 869 sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] 870 assert sent_texts == ["Hello ▉", "world"] 871 assert consumer.already_sent is True 872 assert consumer.final_response_sent is True 873 874 875 class TestCancelledConsumerSetsFlags: 876 """Cancellation must set final_response_sent when already_sent is True. 877 878 The 5-second stream_task timeout in gateway/run.py can cancel the 879 consumer while it's still processing. If final_response_sent stays 880 False, the gateway falls through to the normal send path and the 881 user sees a duplicate message. 882 """ 883 884 @pytest.mark.asyncio 885 async def test_cancelled_with_already_sent_marks_final_response_sent(self): 886 """Cancelling after content was sent should set final_response_sent.""" 887 adapter = MagicMock() 888 adapter.send = AsyncMock( 889 return_value=SimpleNamespace(success=True, message_id="msg_1") 890 ) 891 adapter.edit_message = AsyncMock( 892 return_value=SimpleNamespace(success=True) 893 ) 894 adapter.MAX_MESSAGE_LENGTH = 4096 895 896 consumer = GatewayStreamConsumer( 897 adapter, 898 "chat_123", 899 StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), 900 ) 901 902 # Stream some text — the consumer sends it and sets already_sent 903 consumer.on_delta("Hello world") 904 task = asyncio.create_task(consumer.run()) 905 await asyncio.sleep(0.08) 906 907 assert consumer.already_sent is True 908 909 # Cancel the task (simulates the 5-second timeout in gateway) 910 task.cancel() 911 try: 912 await task 913 except asyncio.CancelledError: 914 pass 915 916 # The fix: final_response_sent should be True even though _DONE 917 # was never processed, preventing a duplicate message. 918 assert consumer.final_response_sent is True 919 920 @pytest.mark.asyncio 921 async def test_cancelled_without_any_sends_does_not_mark_final(self): 922 """Cancelling before anything was sent should NOT set final_response_sent.""" 923 adapter = MagicMock() 924 adapter.send = AsyncMock( 925 return_value=SimpleNamespace(success=False, message_id=None) 926 ) 927 adapter.edit_message = AsyncMock( 928 return_value=SimpleNamespace(success=True) 929 ) 930 adapter.MAX_MESSAGE_LENGTH = 4096 931 932 consumer = GatewayStreamConsumer( 933 adapter, 934 "chat_123", 935 StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), 936 ) 937 938 # Send fails — already_sent stays False 939 consumer.on_delta("x") 940 task = asyncio.create_task(consumer.run()) 941 await asyncio.sleep(0.08) 942 943 assert consumer.already_sent is False 944 945 task.cancel() 946 try: 947 await task 948 except asyncio.CancelledError: 949 pass 950 951 # Without a successful send, final_response_sent should stay False 952 # so the normal gateway send path can deliver the response. 953 assert consumer.final_response_sent is False 954 955 956 # ── Think-block filtering unit tests ───────────────────────────────────── 957 958 959 def _make_consumer() -> GatewayStreamConsumer: 960 """Create a bare consumer for unit-testing the filter (no adapter needed).""" 961 adapter = MagicMock() 962 return GatewayStreamConsumer(adapter, "chat_test") 963 964 965 class TestFilterAndAccumulate: 966 """Unit tests for _filter_and_accumulate think-block suppression.""" 967 968 def test_plain_text_passes_through(self): 969 c = _make_consumer() 970 c._filter_and_accumulate("Hello world") 971 assert c._accumulated == "Hello world" 972 973 def test_complete_think_block_stripped(self): 974 c = _make_consumer() 975 c._filter_and_accumulate("<think>internal reasoning</think>Answer here") 976 assert c._accumulated == "Answer here" 977 978 def test_think_block_in_middle(self): 979 c = _make_consumer() 980 c._filter_and_accumulate("Prefix\n<think>reasoning</think>\nSuffix") 981 assert c._accumulated == "Prefix\n\nSuffix" 982 983 def test_think_block_split_across_deltas(self): 984 c = _make_consumer() 985 c._filter_and_accumulate("<think>start of") 986 c._filter_and_accumulate(" reasoning</think>visible text") 987 assert c._accumulated == "visible text" 988 989 def test_opening_tag_split_across_deltas(self): 990 c = _make_consumer() 991 c._filter_and_accumulate("<thi") 992 # Partial tag held back 993 assert c._accumulated == "" 994 c._filter_and_accumulate("nk>hidden</think>shown") 995 assert c._accumulated == "shown" 996 997 def test_closing_tag_split_across_deltas(self): 998 c = _make_consumer() 999 c._filter_and_accumulate("<think>hidden</thi") 1000 assert c._accumulated == "" 1001 c._filter_and_accumulate("nk>shown") 1002 assert c._accumulated == "shown" 1003 1004 def test_multiple_think_blocks(self): 1005 c = _make_consumer() 1006 # Consecutive blocks with no text between them — both stripped 1007 c._filter_and_accumulate( 1008 "<think>block1</think><think>block2</think>visible" 1009 ) 1010 assert c._accumulated == "visible" 1011 1012 def test_multiple_think_blocks_with_text_between(self): 1013 """Think tag after non-whitespace is NOT a boundary (prose safety).""" 1014 c = _make_consumer() 1015 c._filter_and_accumulate( 1016 "<think>block1</think>A<think>block2</think>B" 1017 ) 1018 # Second <think> follows 'A' (not a block boundary) — treated as prose 1019 assert "A" in c._accumulated 1020 assert "B" in c._accumulated 1021 1022 def test_thinking_tag_variant(self): 1023 c = _make_consumer() 1024 c._filter_and_accumulate("<thinking>deep thought</thinking>Result") 1025 assert c._accumulated == "Result" 1026 1027 def test_thought_tag_variant(self): 1028 c = _make_consumer() 1029 c._filter_and_accumulate("<thought>Gemma style</thought>Output") 1030 assert c._accumulated == "Output" 1031 1032 def test_reasoning_scratchpad_variant(self): 1033 c = _make_consumer() 1034 c._filter_and_accumulate( 1035 "<REASONING_SCRATCHPAD>long plan</REASONING_SCRATCHPAD>Done" 1036 ) 1037 assert c._accumulated == "Done" 1038 1039 def test_case_insensitive_THINKING(self): 1040 c = _make_consumer() 1041 c._filter_and_accumulate("<THINKING>caps</THINKING>answer") 1042 assert c._accumulated == "answer" 1043 1044 def test_prose_mention_not_stripped(self): 1045 """<think> mentioned mid-line in prose should NOT trigger filtering.""" 1046 c = _make_consumer() 1047 c._filter_and_accumulate("The <think> tag is used for reasoning") 1048 assert "<think>" in c._accumulated 1049 assert "used for reasoning" in c._accumulated 1050 1051 def test_prose_mention_after_text(self): 1052 """<think> after non-whitespace on same line is not a block boundary.""" 1053 c = _make_consumer() 1054 c._filter_and_accumulate("Try using <think>some content</think> tags") 1055 assert "<think>" in c._accumulated 1056 1057 def test_think_at_line_start_is_stripped(self): 1058 """<think> at start of a new line IS a block boundary.""" 1059 c = _make_consumer() 1060 c._filter_and_accumulate("Previous line\n<think>reasoning</think>Next") 1061 assert "Previous line\nNext" == c._accumulated 1062 1063 def test_think_with_only_whitespace_before(self): 1064 """<think> preceded by only whitespace on its line is a boundary.""" 1065 c = _make_consumer() 1066 c._filter_and_accumulate(" <think>hidden</think>visible") 1067 # Leading whitespace before the tag is emitted, then block is stripped 1068 assert c._accumulated == " visible" 1069 1070 def test_flush_think_buffer_on_non_tag(self): 1071 """Partial tag that turns out not to be a tag is flushed.""" 1072 c = _make_consumer() 1073 c._filter_and_accumulate("<thi") 1074 assert c._accumulated == "" 1075 # Flush explicitly (simulates stream end) 1076 c._flush_think_buffer() 1077 assert c._accumulated == "<thi" 1078 1079 def test_flush_think_buffer_when_inside_block(self): 1080 """Flush while inside a think block does NOT emit buffered content.""" 1081 c = _make_consumer() 1082 c._filter_and_accumulate("<think>still thinking") 1083 c._flush_think_buffer() 1084 assert c._accumulated == "" 1085 1086 def test_unclosed_think_block_suppresses(self): 1087 """An unclosed <think> suppresses all subsequent content.""" 1088 c = _make_consumer() 1089 c._filter_and_accumulate("Before\n<think>reasoning that never ends...") 1090 assert c._accumulated == "Before\n" 1091 1092 def test_multiline_think_block(self): 1093 c = _make_consumer() 1094 c._filter_and_accumulate( 1095 "<think>\nLine 1\nLine 2\nLine 3\n</think>Final answer" 1096 ) 1097 assert c._accumulated == "Final answer" 1098 1099 def test_segment_reset_preserves_think_state(self): 1100 """_reset_segment_state should NOT clear think-block filter state.""" 1101 c = _make_consumer() 1102 c._filter_and_accumulate("<think>start") 1103 c._reset_segment_state() 1104 # Still inside think block — subsequent text should be suppressed 1105 c._filter_and_accumulate("still hidden</think>visible") 1106 assert c._accumulated == "visible" 1107 1108 1109 class TestFilterAndAccumulateIntegration: 1110 """Integration: verify think blocks don't leak through the full run() path.""" 1111 1112 @pytest.mark.asyncio 1113 async def test_think_block_not_sent_to_platform(self): 1114 """Think blocks should be filtered before platform edit.""" 1115 adapter = MagicMock() 1116 adapter.send = AsyncMock( 1117 return_value=SimpleNamespace(success=True, message_id="msg_1") 1118 ) 1119 adapter.edit_message = AsyncMock( 1120 return_value=SimpleNamespace(success=True) 1121 ) 1122 adapter.MAX_MESSAGE_LENGTH = 4096 1123 1124 consumer = GatewayStreamConsumer( 1125 adapter, 1126 "chat_test", 1127 StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), 1128 ) 1129 1130 # Simulate streaming: think block then visible text 1131 consumer.on_delta("<think>deep reasoning here</think>") 1132 consumer.on_delta("The answer is 42.") 1133 consumer.finish() 1134 1135 task = asyncio.create_task(consumer.run()) 1136 await asyncio.sleep(0.15) 1137 1138 # The final text sent to the platform should NOT contain <think> 1139 all_calls = list(adapter.send.call_args_list) + list( 1140 adapter.edit_message.call_args_list 1141 ) 1142 for call in all_calls: 1143 args, kwargs = call 1144 content = kwargs.get("content") or (args[0] if args else "") 1145 assert "<think>" not in content, f"Think tag leaked: {content}" 1146 assert "deep reasoning" not in content 1147 1148 try: 1149 task.cancel() 1150 await task 1151 except asyncio.CancelledError: 1152 pass 1153 1154 1155 # ── buffer_only mode tests ───────────────────────────────────────────── 1156 1157 1158 class TestBufferOnlyMode: 1159 """Verify buffer_only mode suppresses intermediate edits and only 1160 flushes on structural boundaries (done, segment break, commentary).""" 1161 1162 @pytest.mark.asyncio 1163 async def test_suppresses_intermediate_edits(self): 1164 """Time-based and size-based edits are skipped; only got_done flushes.""" 1165 adapter = MagicMock() 1166 adapter.MAX_MESSAGE_LENGTH = 4096 1167 adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg1")) 1168 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1169 1170 cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True) 1171 consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg) 1172 1173 for word in ["Hello", " world", ", this", " is", " a", " test"]: 1174 consumer.on_delta(word) 1175 consumer.finish() 1176 1177 await consumer.run() 1178 1179 adapter.send.assert_called_once() 1180 adapter.edit_message.assert_not_called() 1181 assert "Hello world, this is a test" in adapter.send.call_args_list[0][1]["content"] 1182 1183 @pytest.mark.asyncio 1184 async def test_flushes_on_segment_break(self): 1185 """A segment break (tool call boundary) flushes accumulated text.""" 1186 adapter = MagicMock() 1187 adapter.MAX_MESSAGE_LENGTH = 4096 1188 adapter.send = AsyncMock(side_effect=[ 1189 SimpleNamespace(success=True, message_id="msg1"), 1190 SimpleNamespace(success=True, message_id="msg2"), 1191 ]) 1192 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1193 1194 cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True) 1195 consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg) 1196 1197 consumer.on_delta("Before tool call") 1198 consumer.on_delta(None) 1199 consumer.on_delta("After tool call") 1200 consumer.finish() 1201 1202 await consumer.run() 1203 1204 assert adapter.send.call_count == 2 1205 assert "Before tool call" in adapter.send.call_args_list[0][1]["content"] 1206 assert "After tool call" in adapter.send.call_args_list[1][1]["content"] 1207 adapter.edit_message.assert_not_called() 1208 1209 @pytest.mark.asyncio 1210 async def test_flushes_on_commentary(self): 1211 """An interim commentary message flushes in buffer_only mode.""" 1212 adapter = MagicMock() 1213 adapter.MAX_MESSAGE_LENGTH = 4096 1214 adapter.send = AsyncMock(side_effect=[ 1215 SimpleNamespace(success=True, message_id="msg1"), 1216 SimpleNamespace(success=True, message_id="msg2"), 1217 SimpleNamespace(success=True, message_id="msg3"), 1218 ]) 1219 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1220 1221 cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True) 1222 consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg) 1223 1224 consumer.on_delta("Working on it...") 1225 consumer.on_commentary("I'll search for that first.") 1226 consumer.on_delta("Here are the results.") 1227 consumer.finish() 1228 1229 await consumer.run() 1230 1231 # Three sends: accumulated text, commentary, final text 1232 assert adapter.send.call_count >= 2 1233 adapter.edit_message.assert_not_called() 1234 1235 @pytest.mark.asyncio 1236 async def test_default_mode_still_triggers_intermediate_edits(self): 1237 """Regression: buffer_only=False (default) still does progressive edits.""" 1238 adapter = MagicMock() 1239 adapter.MAX_MESSAGE_LENGTH = 4096 1240 adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg1")) 1241 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1242 1243 # buffer_threshold=5 means any 5+ chars triggers an early edit 1244 cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="") 1245 consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg) 1246 1247 consumer.on_delta("Hello world, this is long enough to trigger edits") 1248 consumer.finish() 1249 1250 await consumer.run() 1251 1252 # Should have at least one send. With buffer_threshold=5 and this much 1253 # text, the consumer may send then edit, or just send once at got_done. 1254 # The key assertion: this doesn't break. 1255 assert adapter.send.call_count >= 1 1256 1257 1258 # ── Cursor stripping on fallback (#7183) ──────────────────────────────────── 1259 1260 1261 class TestCursorStrippingOnFallback: 1262 """Regression: cursor must be stripped when fallback continuation is empty (#7183). 1263 1264 When _send_fallback_final is called with nothing new to deliver (the visible 1265 partial already matches final_text), the last edit may still show the cursor 1266 character because fallback mode was entered after a failed edit. Before the 1267 fix this would leave the message permanently frozen with a visible ▉. 1268 """ 1269 1270 @pytest.mark.asyncio 1271 async def test_cursor_stripped_when_continuation_empty(self): 1272 """_send_fallback_final must attempt a final edit to strip the cursor.""" 1273 adapter = MagicMock() 1274 adapter.MAX_MESSAGE_LENGTH = 4096 1275 adapter.edit_message = AsyncMock( 1276 return_value=SimpleNamespace(success=True, message_id="msg-1") 1277 ) 1278 1279 consumer = GatewayStreamConsumer( 1280 adapter, "chat-1", 1281 config=StreamConsumerConfig(cursor=" ▉"), 1282 ) 1283 consumer._message_id = "msg-1" 1284 consumer._last_sent_text = "Hello world ▉" 1285 consumer._fallback_final_send = False 1286 1287 await consumer._send_fallback_final("Hello world") 1288 1289 adapter.edit_message.assert_called_once() 1290 call_args = adapter.edit_message.call_args 1291 assert call_args.kwargs["content"] == "Hello world" 1292 assert consumer._already_sent is True 1293 # _last_sent_text should reflect the cleaned text after a successful strip 1294 assert consumer._last_sent_text == "Hello world" 1295 1296 @pytest.mark.asyncio 1297 async def test_cursor_not_stripped_when_no_cursor_configured(self): 1298 """No edit attempted when cursor is not configured.""" 1299 adapter = MagicMock() 1300 adapter.MAX_MESSAGE_LENGTH = 4096 1301 adapter.edit_message = AsyncMock() 1302 1303 consumer = GatewayStreamConsumer( 1304 adapter, "chat-1", 1305 config=StreamConsumerConfig(cursor=""), 1306 ) 1307 consumer._message_id = "msg-1" 1308 consumer._last_sent_text = "Hello world" 1309 consumer._fallback_final_send = False 1310 1311 await consumer._send_fallback_final("Hello world") 1312 1313 adapter.edit_message.assert_not_called() 1314 assert consumer._already_sent is True 1315 1316 @pytest.mark.asyncio 1317 async def test_cursor_strip_edit_failure_handled(self): 1318 """If the cursor-stripping edit itself fails, it must not crash and 1319 must not corrupt _last_sent_text.""" 1320 adapter = MagicMock() 1321 adapter.MAX_MESSAGE_LENGTH = 4096 1322 adapter.edit_message = AsyncMock( 1323 return_value=SimpleNamespace(success=False, error="flood_control") 1324 ) 1325 1326 consumer = GatewayStreamConsumer( 1327 adapter, "chat-1", 1328 config=StreamConsumerConfig(cursor=" ▉"), 1329 ) 1330 consumer._message_id = "msg-1" 1331 consumer._last_sent_text = "Hello ▉" 1332 consumer._fallback_final_send = False 1333 1334 await consumer._send_fallback_final("Hello") 1335 1336 # Should still set already_sent despite the cursor-strip edit failure 1337 assert consumer._already_sent is True 1338 # _last_sent_text must NOT be updated when the edit failed 1339 assert consumer._last_sent_text == "Hello ▉" 1340 1341 1342 # ── on_new_message callback (tool-progress linearization) ───────────── 1343 1344 1345 class TestOnNewMessageCallback: 1346 """The on_new_message callback fires whenever a fresh content bubble 1347 lands on the platform. Gateway uses this to close off the current 1348 tool-progress bubble so the next tool.started opens a new bubble 1349 below the content — preserving chronological order in the chat. 1350 1351 Before this callback existed (post PR #7885), content messages got 1352 their own bubbles after segment breaks, but the tool-progress task 1353 kept editing the ORIGINAL progress bubble above all new content. 1354 Result: tool lines appeared stacked in the upper bubble while 1355 content messages lined up below, making the timeline look scrambled. 1356 """ 1357 1358 @pytest.mark.asyncio 1359 async def test_callback_fires_on_first_send(self): 1360 """First-send of a new content bubble fires on_new_message.""" 1361 adapter = MagicMock() 1362 adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) 1363 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1364 adapter.MAX_MESSAGE_LENGTH = 4096 1365 1366 events = [] 1367 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) 1368 consumer = GatewayStreamConsumer( 1369 adapter, "chat", config, 1370 on_new_message=lambda: events.append("reset"), 1371 ) 1372 1373 consumer.on_delta("Hello") 1374 consumer.finish() 1375 await consumer.run() 1376 1377 assert events == ["reset"] 1378 1379 @pytest.mark.asyncio 1380 async def test_callback_fires_once_per_segment(self): 1381 """A new first-send fires the callback again after segment break.""" 1382 adapter = MagicMock() 1383 msg_counter = iter(["msg_1", "msg_2", "msg_3"]) 1384 adapter.send = AsyncMock( 1385 side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter)) 1386 ) 1387 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1388 adapter.MAX_MESSAGE_LENGTH = 4096 1389 1390 events = [] 1391 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) 1392 consumer = GatewayStreamConsumer( 1393 adapter, "chat", config, 1394 on_new_message=lambda: events.append("reset"), 1395 ) 1396 1397 consumer.on_delta("A") 1398 consumer.on_delta(None) 1399 consumer.on_delta("B") 1400 consumer.on_delta(None) 1401 consumer.on_delta("C") 1402 consumer.finish() 1403 await consumer.run() 1404 1405 # Three content bubbles ⇒ three reset notifications 1406 assert events == ["reset", "reset", "reset"] 1407 1408 @pytest.mark.asyncio 1409 async def test_callback_not_fired_on_edit(self): 1410 """Subsequent edits of the same bubble do NOT fire the callback.""" 1411 adapter = MagicMock() 1412 adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) 1413 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1414 adapter.MAX_MESSAGE_LENGTH = 4096 1415 1416 events = [] 1417 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) 1418 consumer = GatewayStreamConsumer( 1419 adapter, "chat", config, 1420 on_new_message=lambda: events.append("reset"), 1421 ) 1422 1423 consumer.on_delta("Hello") 1424 task = asyncio.create_task(consumer.run()) 1425 await asyncio.sleep(0.05) 1426 consumer.on_delta(" world") 1427 await asyncio.sleep(0.05) 1428 consumer.on_delta(" more") 1429 await asyncio.sleep(0.05) 1430 consumer.finish() 1431 await task 1432 1433 # Only one first-send happened; edits do not re-fire. 1434 assert events == ["reset"] 1435 1436 @pytest.mark.asyncio 1437 async def test_callback_fires_on_commentary(self): 1438 """Commentary messages are fresh bubbles too — fire the callback.""" 1439 adapter = MagicMock() 1440 adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) 1441 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1442 adapter.MAX_MESSAGE_LENGTH = 4096 1443 1444 events = [] 1445 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) 1446 consumer = GatewayStreamConsumer( 1447 adapter, "chat", config, 1448 on_new_message=lambda: events.append("reset"), 1449 ) 1450 1451 consumer.on_commentary("I'll search for that first.") 1452 consumer.finish() 1453 await consumer.run() 1454 1455 assert events == ["reset"] 1456 1457 @pytest.mark.asyncio 1458 async def test_callback_error_swallowed(self): 1459 """Exceptions in the callback do not crash the consumer.""" 1460 adapter = MagicMock() 1461 adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) 1462 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1463 adapter.MAX_MESSAGE_LENGTH = 4096 1464 1465 def raiser(): 1466 raise RuntimeError("boom") 1467 1468 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) 1469 consumer = GatewayStreamConsumer( 1470 adapter, "chat", config, 1471 on_new_message=raiser, 1472 ) 1473 1474 consumer.on_delta("Hello") 1475 consumer.finish() 1476 await consumer.run() # must not raise 1477 1478 assert consumer.already_sent is True 1479 1480 @pytest.mark.asyncio 1481 async def test_no_callback_when_none(self): 1482 """Consumer works correctly when on_new_message is None (default).""" 1483 adapter = MagicMock() 1484 adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) 1485 adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) 1486 adapter.MAX_MESSAGE_LENGTH = 4096 1487 1488 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) 1489 consumer = GatewayStreamConsumer(adapter, "chat", config) # no callback 1490 1491 consumer.on_delta("Hello") 1492 consumer.finish() 1493 await consumer.run() 1494 1495 assert consumer.already_sent is True