/ tests / gateway / test_stream_consumer.py
test_stream_consumer.py
   1  """Tests for GatewayStreamConsumer — media directive stripping in streaming."""
   2  
   3  import asyncio
   4  from types import SimpleNamespace
   5  from unittest.mock import AsyncMock, MagicMock
   6  
   7  import pytest
   8  
   9  from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
  10  
  11  
  12  # ── _clean_for_display unit tests ────────────────────────────────────────
  13  
  14  
  15  class TestCleanForDisplay:
  16      """Verify MEDIA: directives and internal markers are stripped from display text."""
  17  
  18      def test_no_media_passthrough(self):
  19          """Text without MEDIA: passes through unchanged."""
  20          text = "Here is your analysis of the image."
  21          assert GatewayStreamConsumer._clean_for_display(text) == text
  22  
  23      def test_media_tag_stripped(self):
  24          """Basic MEDIA:<path> tag is removed."""
  25          text = "Here is the image\nMEDIA:/tmp/hermes/image.png"
  26          result = GatewayStreamConsumer._clean_for_display(text)
  27          assert "MEDIA:" not in result
  28          assert "Here is the image" in result
  29  
  30      def test_media_tag_with_space(self):
  31          """MEDIA: tag with space after colon is removed."""
  32          text = "Audio generated\nMEDIA: /home/user/.hermes/audio_cache/voice.mp3"
  33          result = GatewayStreamConsumer._clean_for_display(text)
  34          assert "MEDIA:" not in result
  35          assert "Audio generated" in result
  36  
  37      def test_media_tag_with_quotes(self):
  38          """MEDIA: tags wrapped in quotes or backticks are removed."""
  39          for wrapper in ['`MEDIA:/path/file.png`', '"MEDIA:/path/file.png"', "'MEDIA:/path/file.png'"]:
  40              text = f"Result: {wrapper}"
  41              result = GatewayStreamConsumer._clean_for_display(text)
  42              assert "MEDIA:" not in result, f"Failed for wrapper: {wrapper}"
  43  
  44      def test_audio_as_voice_stripped(self):
  45          """[[audio_as_voice]] directive is removed."""
  46          text = "[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg"
  47          result = GatewayStreamConsumer._clean_for_display(text)
  48          assert "[[audio_as_voice]]" not in result
  49          assert "MEDIA:" not in result
  50  
  51      def test_multiple_media_tags(self):
  52          """Multiple MEDIA: tags are all removed."""
  53          text = "Here are two files:\nMEDIA:/tmp/a.png\nMEDIA:/tmp/b.jpg"
  54          result = GatewayStreamConsumer._clean_for_display(text)
  55          assert "MEDIA:" not in result
  56          assert "Here are two files:" in result
  57  
  58      def test_excessive_newlines_collapsed(self):
  59          """Blank lines left by removed tags are collapsed."""
  60          text = "Before\n\n\nMEDIA:/tmp/file.png\n\n\nAfter"
  61          result = GatewayStreamConsumer._clean_for_display(text)
  62          # Should not have 3+ consecutive newlines
  63          assert "\n\n\n" not in result
  64  
  65      def test_media_only_response(self):
  66          """Response that is entirely MEDIA: tags returns empty/whitespace."""
  67          text = "MEDIA:/tmp/image.png"
  68          result = GatewayStreamConsumer._clean_for_display(text)
  69          assert result.strip() == ""
  70  
  71      def test_media_mid_sentence(self):
  72          """MEDIA: tag embedded in prose is stripped cleanly."""
  73          text = "I generated this image MEDIA:/tmp/art.png for you."
  74          result = GatewayStreamConsumer._clean_for_display(text)
  75          assert "MEDIA:" not in result
  76          assert "generated" in result
  77          assert "for you." in result
  78  
  79      def test_preserves_non_media_colons(self):
  80          """Normal colons and text with 'MEDIA' as a word aren't stripped."""
  81          text = "The media: files are stored in /tmp. Use social MEDIA carefully."
  82          result = GatewayStreamConsumer._clean_for_display(text)
  83          # "MEDIA:" in upper case without a path won't match \S+ (space follows)
  84          # But "media:" is lowercase so won't match either
  85          assert result == text
  86  
  87  
  88  # ── Integration: _send_or_edit strips MEDIA: ─────────────────────────────
  89  
  90  
  91  class TestFinalizeCapabilityGate:
  92      """Verify REQUIRES_EDIT_FINALIZE gates the redundant final edit.
  93  
  94      Platforms that don't need an explicit finalize signal (Telegram,
  95      Slack, Matrix, …) should skip the redundant final edit when the
  96      mid-stream edit already delivered the final content.  Platforms that
  97      *do* need it (DingTalk AI Cards) must always receive a finalize=True
  98      edit at the end of the stream.
  99      """
 100  
 101      @pytest.mark.asyncio
 102      async def test_identical_text_skip_respects_adapter_flag(self):
 103          """_send_or_edit short-circuits identical-text only when the
 104          adapter doesn't require an explicit finalize signal."""
 105          # Adapter without finalize requirement — should skip identical edit.
 106          plain = MagicMock()
 107          plain.REQUIRES_EDIT_FINALIZE = False
 108          plain.send = AsyncMock(return_value=SimpleNamespace(
 109              success=True, message_id="m1",
 110          ))
 111          plain.edit_message = AsyncMock()
 112          plain.MAX_MESSAGE_LENGTH = 4096
 113          c1 = GatewayStreamConsumer(plain, "chat_1")
 114          await c1._send_or_edit("hello")  # first send
 115          await c1._send_or_edit("hello", finalize=True)  # identical → skip
 116          plain.edit_message.assert_not_called()
 117  
 118          # Adapter that requires finalize — must still fire the edit.
 119          picky = MagicMock()
 120          picky.REQUIRES_EDIT_FINALIZE = True
 121          picky.send = AsyncMock(return_value=SimpleNamespace(
 122              success=True, message_id="m1",
 123          ))
 124          picky.edit_message = AsyncMock(return_value=SimpleNamespace(
 125              success=True, message_id="m1",
 126          ))
 127          picky.MAX_MESSAGE_LENGTH = 4096
 128          c2 = GatewayStreamConsumer(picky, "chat_1")
 129          await c2._send_or_edit("hello")
 130          await c2._send_or_edit("hello", finalize=True)
 131          # Finalize edit must go through even on identical content.
 132          picky.edit_message.assert_called_once()
 133          assert picky.edit_message.call_args[1]["finalize"] is True
 134  
 135  
 136  class TestEditMessageFinalizeSignature:
 137      """Every concrete platform adapter must accept the ``finalize`` kwarg.
 138  
 139      stream_consumer._send_or_edit always passes ``finalize=`` to
 140      ``adapter.edit_message(...)`` (see gateway/stream_consumer.py).  An
 141      adapter that overrides edit_message without accepting finalize raises
 142      TypeError the first time streaming hits a segment break or final edit.
 143      Guard the contract with an explicit signature check so it cannot
 144      silently regress — existing tests use MagicMock which swallows any
 145      kwarg and cannot catch this.
 146      """
 147  
 148      @pytest.mark.parametrize(
 149          "module_path,class_name",
 150          [
 151              ("gateway.platforms.telegram", "TelegramAdapter"),
 152              ("gateway.platforms.discord", "DiscordAdapter"),
 153              ("gateway.platforms.slack", "SlackAdapter"),
 154              ("gateway.platforms.matrix", "MatrixAdapter"),
 155              ("gateway.platforms.mattermost", "MattermostAdapter"),
 156              ("gateway.platforms.feishu", "FeishuAdapter"),
 157              ("gateway.platforms.whatsapp", "WhatsAppAdapter"),
 158              ("gateway.platforms.dingtalk", "DingTalkAdapter"),
 159          ],
 160      )
 161      def test_edit_message_accepts_finalize(self, module_path, class_name):
 162          import inspect
 163  
 164          module = pytest.importorskip(module_path)
 165          cls = getattr(module, class_name)
 166          params = inspect.signature(cls.edit_message).parameters
 167          assert "finalize" in params, (
 168              f"{class_name}.edit_message must accept 'finalize' kwarg; "
 169              f"stream_consumer._send_or_edit passes it unconditionally"
 170          )
 171  
 172  
 173  class TestSendOrEditMediaStripping:
 174      """Verify _send_or_edit strips MEDIA: before sending to the platform."""
 175  
 176      @pytest.mark.asyncio
 177      async def test_first_send_strips_media(self):
 178          """Initial send removes MEDIA: tags from visible text."""
 179          adapter = MagicMock()
 180          send_result = SimpleNamespace(success=True, message_id="msg_1")
 181          adapter.send = AsyncMock(return_value=send_result)
 182          adapter.MAX_MESSAGE_LENGTH = 4096
 183  
 184          consumer = GatewayStreamConsumer(adapter, "chat_123")
 185          await consumer._send_or_edit("Here is your image\nMEDIA:/tmp/test.png")
 186  
 187          adapter.send.assert_called_once()
 188          sent_text = adapter.send.call_args[1]["content"]
 189          assert "MEDIA:" not in sent_text
 190          assert "Here is your image" in sent_text
 191  
 192      @pytest.mark.asyncio
 193      async def test_edit_strips_media(self):
 194          """Edit call removes MEDIA: tags from visible text."""
 195          adapter = MagicMock()
 196          send_result = SimpleNamespace(success=True, message_id="msg_1")
 197          edit_result = SimpleNamespace(success=True)
 198          adapter.send = AsyncMock(return_value=send_result)
 199          adapter.edit_message = AsyncMock(return_value=edit_result)
 200          adapter.MAX_MESSAGE_LENGTH = 4096
 201  
 202          consumer = GatewayStreamConsumer(adapter, "chat_123")
 203          # First send
 204          await consumer._send_or_edit("Starting response...")
 205          # Edit with MEDIA: tag
 206          await consumer._send_or_edit("Here is the result\nMEDIA:/tmp/image.png")
 207  
 208          adapter.edit_message.assert_called_once()
 209          edited_text = adapter.edit_message.call_args[1]["content"]
 210          assert "MEDIA:" not in edited_text
 211  
 212      @pytest.mark.asyncio
 213      async def test_media_only_skips_send(self):
 214          """If text is entirely MEDIA: tags, the send is skipped."""
 215          adapter = MagicMock()
 216          adapter.send = AsyncMock()
 217          adapter.MAX_MESSAGE_LENGTH = 4096
 218  
 219          consumer = GatewayStreamConsumer(adapter, "chat_123")
 220          await consumer._send_or_edit("MEDIA:/tmp/image.png")
 221  
 222          adapter.send.assert_not_called()
 223  
 224      @pytest.mark.asyncio
 225      async def test_cursor_only_update_skips_send(self):
 226          """A bare streaming cursor should not be sent as its own message."""
 227          adapter = MagicMock()
 228          adapter.send = AsyncMock()
 229          adapter.MAX_MESSAGE_LENGTH = 4096
 230  
 231          consumer = GatewayStreamConsumer(
 232              adapter,
 233              "chat_123",
 234              StreamConsumerConfig(cursor=" ▉"),
 235          )
 236          await consumer._send_or_edit(" ▉")
 237  
 238          adapter.send.assert_not_called()
 239  
 240      @pytest.mark.asyncio
 241      async def test_short_text_with_cursor_skips_new_message(self):
 242          """Short text + cursor should not create a standalone new message.
 243  
 244          During rapid tool-calling the model often emits 1-2 tokens before
 245          switching to tool calls.  Sending 'I ▉' as a new message risks
 246          leaving the cursor permanently visible if the follow-up edit is
 247          rate-limited.  The guard should skip the first send and let the
 248          text accumulate into the next segment.
 249          """
 250          adapter = MagicMock()
 251          adapter.send = AsyncMock()
 252          adapter.MAX_MESSAGE_LENGTH = 4096
 253  
 254          consumer = GatewayStreamConsumer(
 255              adapter,
 256              "chat_123",
 257              StreamConsumerConfig(cursor=" ▉"),
 258          )
 259          # No message_id yet (first send) — short text + cursor should be skipped
 260          assert consumer._message_id is None
 261          result = await consumer._send_or_edit("I ▉")
 262          assert result is True
 263          adapter.send.assert_not_called()
 264  
 265          # 3 chars is still under the threshold
 266          result = await consumer._send_or_edit("Hi! ▉")
 267          assert result is True
 268          adapter.send.assert_not_called()
 269  
 270      @pytest.mark.asyncio
 271      async def test_longer_text_with_cursor_sends_new_message(self):
 272          """Text >= 4 visible chars + cursor should create a new message normally."""
 273          adapter = MagicMock()
 274          send_result = SimpleNamespace(success=True, message_id="msg_1")
 275          adapter.send = AsyncMock(return_value=send_result)
 276          adapter.MAX_MESSAGE_LENGTH = 4096
 277  
 278          consumer = GatewayStreamConsumer(
 279              adapter,
 280              "chat_123",
 281              StreamConsumerConfig(cursor=" ▉"),
 282          )
 283          result = await consumer._send_or_edit("Hello ▉")
 284          assert result is True
 285          adapter.send.assert_called_once()
 286  
 287      @pytest.mark.asyncio
 288      async def test_short_text_without_cursor_sends_normally(self):
 289          """Short text without cursor (e.g. final edit) should send normally."""
 290          adapter = MagicMock()
 291          send_result = SimpleNamespace(success=True, message_id="msg_1")
 292          adapter.send = AsyncMock(return_value=send_result)
 293          adapter.MAX_MESSAGE_LENGTH = 4096
 294  
 295          consumer = GatewayStreamConsumer(
 296              adapter,
 297              "chat_123",
 298              StreamConsumerConfig(cursor=" ▉"),
 299          )
 300          # No cursor in text — even short text should be sent
 301          result = await consumer._send_or_edit("OK")
 302          assert result is True
 303          adapter.send.assert_called_once()
 304  
 305      @pytest.mark.asyncio
 306      async def test_short_text_cursor_edit_existing_message_allowed(self):
 307          """Short text + cursor editing an existing message should proceed."""
 308          adapter = MagicMock()
 309          edit_result = SimpleNamespace(success=True)
 310          adapter.edit_message = AsyncMock(return_value=edit_result)
 311          adapter.MAX_MESSAGE_LENGTH = 4096
 312  
 313          consumer = GatewayStreamConsumer(
 314              adapter,
 315              "chat_123",
 316              StreamConsumerConfig(cursor=" ▉"),
 317          )
 318          consumer._message_id = "msg_1"  # Existing message — guard should not fire
 319          consumer._last_sent_text = ""
 320          result = await consumer._send_or_edit("I ▉")
 321          assert result is True
 322          adapter.edit_message.assert_called_once()
 323  
 324  
 325  # ── Integration: full stream run ─────────────────────────────────────────
 326  
 327  
 328  class TestStreamRunMediaStripping:
 329      """End-to-end: deltas with MEDIA: produce clean visible text."""
 330  
 331      @pytest.mark.asyncio
 332      async def test_stream_with_media_tag(self):
 333          """Full stream run strips MEDIA: from the final visible message."""
 334          adapter = MagicMock()
 335          send_result = SimpleNamespace(success=True, message_id="msg_1")
 336          edit_result = SimpleNamespace(success=True)
 337          adapter.send = AsyncMock(return_value=send_result)
 338          adapter.edit_message = AsyncMock(return_value=edit_result)
 339          adapter.MAX_MESSAGE_LENGTH = 4096
 340  
 341          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 342          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 343  
 344          # Feed deltas
 345          consumer.on_delta("Here is your generated image\n")
 346          consumer.on_delta("MEDIA:/home/user/.hermes/cache/images/abc123.png")
 347          consumer.finish()
 348  
 349          await consumer.run()
 350  
 351          # Verify the final text sent/edited doesn't contain MEDIA:
 352          all_calls = []
 353          for call in adapter.send.call_args_list:
 354              all_calls.append(call[1].get("content", ""))
 355          for call in adapter.edit_message.call_args_list:
 356              all_calls.append(call[1].get("content", ""))
 357  
 358          for sent_text in all_calls:
 359              assert "MEDIA:" not in sent_text, f"MEDIA: leaked into display: {sent_text!r}"
 360  
 361          assert consumer.already_sent
 362  
 363  
 364  # ── Segment break (tool boundary) tests ──────────────────────────────────
 365  
 366  
 367  class TestSegmentBreakOnToolBoundary:
 368      """Verify that on_delta(None) finalizes the current message and starts a
 369      new one so the final response appears below tool-progress messages."""
 370  
 371      @pytest.mark.asyncio
 372      async def test_segment_break_creates_new_message(self):
 373          """After a None boundary, next text creates a fresh message."""
 374          adapter = MagicMock()
 375          send_result_1 = SimpleNamespace(success=True, message_id="msg_1")
 376          send_result_2 = SimpleNamespace(success=True, message_id="msg_2")
 377          edit_result = SimpleNamespace(success=True)
 378          adapter.send = AsyncMock(side_effect=[send_result_1, send_result_2])
 379          adapter.edit_message = AsyncMock(return_value=edit_result)
 380          adapter.MAX_MESSAGE_LENGTH = 4096
 381  
 382          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 383          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 384  
 385          # Phase 1: intermediate text before tool calls
 386          consumer.on_delta("Let me search for that...")
 387          # Tool boundary — model is about to call tools
 388          consumer.on_delta(None)
 389          # Phase 2: final response text after tools finished
 390          consumer.on_delta("Here are the results.")
 391          consumer.finish()
 392  
 393          await consumer.run()
 394  
 395          # Should have sent TWO separate messages (two adapter.send calls),
 396          # not just edited the first one.
 397          assert adapter.send.call_count == 2
 398          first_text = adapter.send.call_args_list[0][1]["content"]
 399          second_text = adapter.send.call_args_list[1][1]["content"]
 400          assert "search" in first_text
 401          assert "results" in second_text
 402  
 403      @pytest.mark.asyncio
 404      async def test_segment_break_no_text_before(self):
 405          """A None boundary with no preceding text is a no-op."""
 406          adapter = MagicMock()
 407          send_result = SimpleNamespace(success=True, message_id="msg_1")
 408          adapter.send = AsyncMock(return_value=send_result)
 409          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
 410          adapter.MAX_MESSAGE_LENGTH = 4096
 411  
 412          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 413          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 414  
 415          # No text before the boundary — model went straight to tool calls
 416          consumer.on_delta(None)
 417          consumer.on_delta("Final answer.")
 418          consumer.finish()
 419  
 420          await consumer.run()
 421  
 422          # Only one send call (the final answer)
 423          assert adapter.send.call_count == 1
 424          assert "Final answer" in adapter.send.call_args_list[0][1]["content"]
 425  
 426      @pytest.mark.asyncio
 427      async def test_segment_break_removes_cursor(self):
 428          """The finalized segment message should not have a cursor."""
 429          adapter = MagicMock()
 430          send_result = SimpleNamespace(success=True, message_id="msg_1")
 431          edit_result = SimpleNamespace(success=True)
 432          adapter.send = AsyncMock(return_value=send_result)
 433          adapter.edit_message = AsyncMock(return_value=edit_result)
 434          adapter.MAX_MESSAGE_LENGTH = 4096
 435  
 436          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
 437          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 438  
 439          consumer.on_delta("Thinking...")
 440          consumer.on_delta(None)
 441          consumer.on_delta("Done.")
 442          consumer.finish()
 443  
 444          await consumer.run()
 445  
 446          # The first segment should have been finalized without cursor.
 447          # Check all edit_message calls + the initial send for the first segment.
 448          # The last state of msg_1 should NOT have the cursor.
 449          all_texts = []
 450          for call in adapter.send.call_args_list:
 451              all_texts.append(call[1].get("content", ""))
 452          for call in adapter.edit_message.call_args_list:
 453              all_texts.append(call[1].get("content", ""))
 454  
 455          # Find the text(s) that contain "Thinking" — the finalized version
 456          # should not have the cursor.
 457          thinking_texts = [t for t in all_texts if "Thinking" in t]
 458          assert thinking_texts, "Expected at least one message with 'Thinking'"
 459          # The LAST occurrence is the finalized version
 460          assert "▉" not in thinking_texts[-1], (
 461              f"Cursor found in finalized segment: {thinking_texts[-1]!r}"
 462          )
 463  
 464      @pytest.mark.asyncio
 465      async def test_multiple_segment_breaks(self):
 466          """Multiple tool boundaries create multiple message segments."""
 467          adapter = MagicMock()
 468          msg_counter = iter(["msg_1", "msg_2", "msg_3"])
 469          adapter.send = AsyncMock(
 470              side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter))
 471          )
 472          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
 473          adapter.MAX_MESSAGE_LENGTH = 4096
 474  
 475          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 476          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 477  
 478          consumer.on_delta("Phase 1")
 479          consumer.on_delta(None)  # tool boundary
 480          consumer.on_delta("Phase 2")
 481          consumer.on_delta(None)  # another tool boundary
 482          consumer.on_delta("Phase 3")
 483          consumer.finish()
 484  
 485          await consumer.run()
 486  
 487          # Three separate messages
 488          assert adapter.send.call_count == 3
 489  
 490      @pytest.mark.asyncio
 491      async def test_already_sent_stays_true_after_segment(self):
 492          """already_sent remains True after a segment break."""
 493          adapter = MagicMock()
 494          send_result = SimpleNamespace(success=True, message_id="msg_1")
 495          adapter.send = AsyncMock(return_value=send_result)
 496          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
 497          adapter.MAX_MESSAGE_LENGTH = 4096
 498  
 499          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 500          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 501  
 502          consumer.on_delta("Text")
 503          consumer.on_delta(None)
 504          consumer.finish()
 505  
 506          await consumer.run()
 507  
 508          assert consumer.already_sent
 509  
 510      @pytest.mark.asyncio
 511      async def test_edit_failure_sends_only_unsent_tail_at_finish(self):
 512          """If an edit fails mid-stream, send only the missing tail once at finish."""
 513          adapter = MagicMock()
 514          send_results = [
 515              SimpleNamespace(success=True, message_id="msg_1"),
 516              SimpleNamespace(success=True, message_id="msg_2"),
 517          ]
 518          adapter.send = AsyncMock(side_effect=send_results)
 519          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6"))
 520          adapter.MAX_MESSAGE_LENGTH = 4096
 521  
 522          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
 523          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 524  
 525          consumer.on_delta("Hello")
 526          task = asyncio.create_task(consumer.run())
 527          await asyncio.sleep(0.08)
 528          consumer.on_delta(" world")
 529          await asyncio.sleep(0.08)
 530          consumer.finish()
 531          await task
 532  
 533          assert adapter.send.call_count == 2
 534          first_text = adapter.send.call_args_list[0][1]["content"]
 535          second_text = adapter.send.call_args_list[1][1]["content"]
 536          assert "Hello" in first_text
 537          assert second_text.strip() == "world"
 538          assert consumer.already_sent
 539  
 540      @pytest.mark.asyncio
 541      async def test_segment_break_clears_failed_edit_fallback_state(self):
 542          """A tool boundary after edit failure must flush the undelivered tail
 543          without duplicating the prefix the user already saw (#8124)."""
 544          adapter = MagicMock()
 545          send_results = [
 546              SimpleNamespace(success=True, message_id="msg_1"),
 547              SimpleNamespace(success=True, message_id="msg_2"),
 548              SimpleNamespace(success=True, message_id="msg_3"),
 549          ]
 550          adapter.send = AsyncMock(side_effect=send_results)
 551          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6"))
 552          adapter.MAX_MESSAGE_LENGTH = 4096
 553  
 554          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
 555          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 556  
 557          consumer.on_delta("Hello")
 558          task = asyncio.create_task(consumer.run())
 559          await asyncio.sleep(0.08)
 560          consumer.on_delta(" world")
 561          await asyncio.sleep(0.08)
 562          consumer.on_delta(None)
 563          consumer.on_delta("Next segment")
 564          consumer.finish()
 565          await task
 566  
 567          sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
 568          # The undelivered "world" tail must reach the user, and the next
 569          # segment must not duplicate "Hello" that was already visible.
 570          assert sent_texts == ["Hello ▉", "world", "Next segment"]
 571  
 572      @pytest.mark.asyncio
 573      async def test_segment_break_after_mid_stream_edit_failure_preserves_tail(self):
 574          """Regression for #8124: when an earlier edit succeeded but later edits
 575          fail (persistent flood control) and a tool boundary arrives before the
 576          fallback threshold is reached, the pre-boundary tail must still be
 577          delivered — not silently dropped by the segment reset."""
 578          adapter = MagicMock()
 579          # msg_1 for the initial partial, msg_2 for the flushed tail,
 580          # msg_3 for the post-boundary segment.
 581          send_results = [
 582              SimpleNamespace(success=True, message_id="msg_1"),
 583              SimpleNamespace(success=True, message_id="msg_2"),
 584              SimpleNamespace(success=True, message_id="msg_3"),
 585          ]
 586          adapter.send = AsyncMock(side_effect=send_results)
 587  
 588          # First two edits succeed, everything after fails with flood control
 589          # — simulating Telegram's "edit once then get rate-limited" pattern.
 590          edit_results = [
 591              SimpleNamespace(success=True),   # "Hello world ▉"  — succeeds
 592              SimpleNamespace(success=False, error="flood_control:6.0"),  # "Hello world more ▉" — flood triggered
 593              SimpleNamespace(success=False, error="flood_control:6.0"),  # finalize edit at segment break
 594              SimpleNamespace(success=False, error="flood_control:6.0"),  # cursor-strip attempt
 595          ]
 596          adapter.edit_message = AsyncMock(side_effect=edit_results + [edit_results[-1]] * 10)
 597          adapter.MAX_MESSAGE_LENGTH = 4096
 598  
 599          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
 600          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 601  
 602          consumer.on_delta("Hello")
 603          task = asyncio.create_task(consumer.run())
 604          await asyncio.sleep(0.08)
 605          consumer.on_delta(" world")
 606          await asyncio.sleep(0.08)
 607          consumer.on_delta(" more")
 608          await asyncio.sleep(0.08)
 609          consumer.on_delta(None)  # tool boundary
 610          consumer.on_delta("Here is the tool result.")
 611          consumer.finish()
 612          await task
 613  
 614          sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
 615          # "more" must have been delivered, not dropped.
 616          all_text = " ".join(sent_texts)
 617          assert "more" in all_text, (
 618              f"Pre-boundary tail 'more' was silently dropped: sends={sent_texts}"
 619          )
 620          # Post-boundary text must also reach the user.
 621          assert "Here is the tool result." in all_text
 622  
 623      @pytest.mark.asyncio
 624      async def test_no_message_id_enters_fallback_mode(self):
 625          """Platform returns success but no message_id (Signal) — must not
 626          re-send on every delta.  Should enter fallback mode and send only
 627          the continuation at finish."""
 628          adapter = MagicMock()
 629          # First send succeeds but returns no message_id (Signal behavior)
 630          send_result_no_id = SimpleNamespace(success=True, message_id=None)
 631          # Fallback final send succeeds
 632          send_result_final = SimpleNamespace(success=True, message_id="msg_final")
 633          adapter.send = AsyncMock(side_effect=[send_result_no_id, send_result_final])
 634          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
 635          adapter.MAX_MESSAGE_LENGTH = 4096
 636  
 637          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 638          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 639  
 640          consumer.on_delta("Hello")
 641          task = asyncio.create_task(consumer.run())
 642          await asyncio.sleep(0.08)
 643          consumer.on_delta(" world, this is a longer response.")
 644          await asyncio.sleep(0.08)
 645          consumer.finish()
 646          await task
 647  
 648          # Should send exactly 2 messages: initial chunk + fallback continuation
 649          # NOT one message per delta
 650          assert adapter.send.call_count == 2
 651          assert consumer.already_sent
 652          # edit_message should NOT have been called (no valid message_id to edit)
 653          adapter.edit_message.assert_not_called()
 654  
 655      @pytest.mark.asyncio
 656      async def test_no_message_id_single_delta_marks_already_sent(self):
 657          """When the entire response fits in one delta and platform returns no
 658          message_id, already_sent must still be True to prevent the gateway
 659          from re-sending the full response."""
 660          adapter = MagicMock()
 661          send_result = SimpleNamespace(success=True, message_id=None)
 662          adapter.send = AsyncMock(return_value=send_result)
 663          adapter.MAX_MESSAGE_LENGTH = 4096
 664  
 665          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 666          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 667  
 668          consumer.on_delta("Short response.")
 669          consumer.finish()
 670  
 671          await consumer.run()
 672  
 673          assert consumer.already_sent
 674          # Only one send call (the initial message)
 675          assert adapter.send.call_count == 1
 676  
 677      @pytest.mark.asyncio
 678      async def test_no_message_id_segment_breaks_do_not_resend(self):
 679          """On a platform that never returns a message_id (e.g. webhook with
 680          github_comment delivery), tool-call segment breaks must NOT trigger
 681          a new adapter.send() per boundary.  The fix: _message_id == '__no_edit__'
 682          suppresses the reset so all text accumulates and is sent once."""
 683          adapter = MagicMock()
 684          # No message_id on first send, then one more for the fallback final
 685          adapter.send = AsyncMock(side_effect=[
 686              SimpleNamespace(success=True, message_id=None),
 687              SimpleNamespace(success=True, message_id=None),
 688          ])
 689          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
 690          adapter.MAX_MESSAGE_LENGTH = 4096
 691  
 692          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 693          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 694  
 695          # Simulate: text → tool boundary → text → tool boundary → text (3 segments)
 696          consumer.on_delta("Phase 1 text")
 697          consumer.on_delta(None)   # tool call boundary
 698          consumer.on_delta("Phase 2 text")
 699          consumer.on_delta(None)   # another tool call boundary
 700          consumer.on_delta("Phase 3 text")
 701          consumer.finish()
 702  
 703          await consumer.run()
 704  
 705          # Before the fix this would post 3 comments (one per segment).
 706          # After the fix: only the initial partial + one fallback-final continuation.
 707          assert adapter.send.call_count == 2, (
 708              f"Expected 2 sends (initial + fallback), got {adapter.send.call_count}"
 709          )
 710          assert consumer.already_sent
 711          # The continuation must contain the text from segments 2 and 3
 712          final_text = adapter.send.call_args_list[1][1]["content"]
 713          assert "Phase 2" in final_text
 714          assert "Phase 3" in final_text
 715  
 716      @pytest.mark.asyncio
 717      async def test_fallback_final_splits_long_continuation_without_dropping_text(self):
 718          """Long continuation tails should be chunked when fallback final-send runs."""
 719          adapter = MagicMock()
 720          adapter.send = AsyncMock(side_effect=[
 721              SimpleNamespace(success=True, message_id="msg_1"),
 722              SimpleNamespace(success=True, message_id="msg_2"),
 723              SimpleNamespace(success=True, message_id="msg_3"),
 724          ])
 725          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6"))
 726          adapter.MAX_MESSAGE_LENGTH = 610
 727  
 728          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
 729          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 730  
 731          prefix = "Hello world"
 732          tail = "x" * 620
 733          consumer.on_delta(prefix)
 734          task = asyncio.create_task(consumer.run())
 735          await asyncio.sleep(0.08)
 736          consumer.on_delta(tail)
 737          await asyncio.sleep(0.08)
 738          consumer.finish()
 739          await task
 740  
 741          sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
 742          assert len(sent_texts) == 3
 743          assert sent_texts[0].startswith(prefix)
 744          assert sum(len(t) for t in sent_texts[1:]) == len(tail)
 745  
 746      @pytest.mark.asyncio
 747      async def test_fallback_final_sends_full_text_at_tool_boundary(self):
 748          """After a tool call, the streamed prefix is stale (from the pre-tool
 749          segment).  _send_fallback_final must still send the post-tool response
 750          even when continuation_text calculates as empty (#10807)."""
 751          adapter = MagicMock()
 752          adapter.send = AsyncMock(
 753              return_value=SimpleNamespace(success=True, message_id="msg_1"),
 754          )
 755          adapter.edit_message = AsyncMock(
 756              return_value=SimpleNamespace(success=True),
 757          )
 758          adapter.MAX_MESSAGE_LENGTH = 4096
 759  
 760          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
 761          consumer = GatewayStreamConsumer(adapter, "chat_123", config)
 762  
 763          # Simulate a pre-tool streamed segment that becomes the visible prefix
 764          pre_tool_text = "I'll run that code now."
 765          consumer.on_delta(pre_tool_text)
 766          task = asyncio.create_task(consumer.run())
 767          await asyncio.sleep(0.05)
 768  
 769          # After the tool call, the model returns a SHORT final response that
 770          # does NOT start with the pre-tool prefix.  The continuation calculator
 771          # would return empty (no prefix match → full text returned, but if the
 772          # streaming edit already showed pre_tool_text, the prefix-based logic
 773          # wrongly matches).  Simulate this by setting _last_sent_text to the
 774          # pre-tool content, then finishing with different post-tool content.
 775          consumer._last_sent_text = pre_tool_text
 776          post_tool_response = "⏰ Script timed out after 30s and was killed."
 777          consumer.finish()
 778          await task
 779  
 780          # The fallback should send the post-tool response via
 781          # _send_fallback_final.
 782          await consumer._send_fallback_final(post_tool_response)
 783  
 784          # Verify the final text was sent (not silently dropped)
 785          sent = False
 786          for call in adapter.send.call_args_list:
 787              content = call[1].get("content", call[0][0] if call[0] else "")
 788              if "timed out" in str(content):
 789                  sent = True
 790                  break
 791          assert sent, (
 792              "Post-tool timeout response was silently dropped by "
 793              "_send_fallback_final — the #10807 fix should prevent this"
 794          )
 795  
 796  
 797  class TestInterimCommentaryMessages:
 798      @pytest.mark.asyncio
 799      async def test_commentary_message_stays_separate_from_final_stream(self):
 800          adapter = MagicMock()
 801          adapter.send = AsyncMock(side_effect=[
 802              SimpleNamespace(success=True, message_id="msg_1"),
 803              SimpleNamespace(success=True, message_id="msg_2"),
 804          ])
 805          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
 806          adapter.MAX_MESSAGE_LENGTH = 4096
 807  
 808          consumer = GatewayStreamConsumer(
 809              adapter,
 810              "chat_123",
 811              StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
 812          )
 813  
 814          consumer.on_commentary("I'll inspect the repository first.")
 815          consumer.on_delta("Done.")
 816          consumer.finish()
 817  
 818          await consumer.run()
 819  
 820          sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
 821          assert sent_texts == ["I'll inspect the repository first.", "Done."]
 822          assert consumer.final_response_sent is True
 823  
 824      @pytest.mark.asyncio
 825      async def test_failed_final_send_does_not_mark_final_response_sent(self):
 826          adapter = MagicMock()
 827          adapter.send = AsyncMock(return_value=SimpleNamespace(success=False, message_id=None))
 828          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
 829          adapter.MAX_MESSAGE_LENGTH = 4096
 830  
 831          consumer = GatewayStreamConsumer(
 832              adapter,
 833              "chat_123",
 834              StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
 835          )
 836  
 837          consumer.on_delta("Done.")
 838          consumer.finish()
 839  
 840          await consumer.run()
 841  
 842          assert consumer.final_response_sent is False
 843          assert consumer.already_sent is False
 844  
 845      @pytest.mark.asyncio
 846      async def test_success_without_message_id_marks_visible_and_sends_only_tail(self):
 847          adapter = MagicMock()
 848          adapter.send = AsyncMock(side_effect=[
 849              SimpleNamespace(success=True, message_id=None),
 850              SimpleNamespace(success=True, message_id=None),
 851          ])
 852          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
 853          adapter.MAX_MESSAGE_LENGTH = 4096
 854  
 855          consumer = GatewayStreamConsumer(
 856              adapter,
 857              "chat_123",
 858              StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉"),
 859          )
 860  
 861          consumer.on_delta("Hello")
 862          task = asyncio.create_task(consumer.run())
 863          await asyncio.sleep(0.08)
 864          consumer.on_delta(" world")
 865          await asyncio.sleep(0.08)
 866          consumer.finish()
 867          await task
 868  
 869          sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
 870          assert sent_texts == ["Hello ▉", "world"]
 871          assert consumer.already_sent is True
 872          assert consumer.final_response_sent is True
 873  
 874  
 875  class TestCancelledConsumerSetsFlags:
 876      """Cancellation must set final_response_sent when already_sent is True.
 877  
 878      The 5-second stream_task timeout in gateway/run.py can cancel the
 879      consumer while it's still processing.  If final_response_sent stays
 880      False, the gateway falls through to the normal send path and the
 881      user sees a duplicate message.
 882      """
 883  
 884      @pytest.mark.asyncio
 885      async def test_cancelled_with_already_sent_marks_final_response_sent(self):
 886          """Cancelling after content was sent should set final_response_sent."""
 887          adapter = MagicMock()
 888          adapter.send = AsyncMock(
 889              return_value=SimpleNamespace(success=True, message_id="msg_1")
 890          )
 891          adapter.edit_message = AsyncMock(
 892              return_value=SimpleNamespace(success=True)
 893          )
 894          adapter.MAX_MESSAGE_LENGTH = 4096
 895  
 896          consumer = GatewayStreamConsumer(
 897              adapter,
 898              "chat_123",
 899              StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
 900          )
 901  
 902          # Stream some text — the consumer sends it and sets already_sent
 903          consumer.on_delta("Hello world")
 904          task = asyncio.create_task(consumer.run())
 905          await asyncio.sleep(0.08)
 906  
 907          assert consumer.already_sent is True
 908  
 909          # Cancel the task (simulates the 5-second timeout in gateway)
 910          task.cancel()
 911          try:
 912              await task
 913          except asyncio.CancelledError:
 914              pass
 915  
 916          # The fix: final_response_sent should be True even though _DONE
 917          # was never processed, preventing a duplicate message.
 918          assert consumer.final_response_sent is True
 919  
 920      @pytest.mark.asyncio
 921      async def test_cancelled_without_any_sends_does_not_mark_final(self):
 922          """Cancelling before anything was sent should NOT set final_response_sent."""
 923          adapter = MagicMock()
 924          adapter.send = AsyncMock(
 925              return_value=SimpleNamespace(success=False, message_id=None)
 926          )
 927          adapter.edit_message = AsyncMock(
 928              return_value=SimpleNamespace(success=True)
 929          )
 930          adapter.MAX_MESSAGE_LENGTH = 4096
 931  
 932          consumer = GatewayStreamConsumer(
 933              adapter,
 934              "chat_123",
 935              StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
 936          )
 937  
 938          # Send fails — already_sent stays False
 939          consumer.on_delta("x")
 940          task = asyncio.create_task(consumer.run())
 941          await asyncio.sleep(0.08)
 942  
 943          assert consumer.already_sent is False
 944  
 945          task.cancel()
 946          try:
 947              await task
 948          except asyncio.CancelledError:
 949              pass
 950  
 951          # Without a successful send, final_response_sent should stay False
 952          # so the normal gateway send path can deliver the response.
 953          assert consumer.final_response_sent is False
 954  
 955  
 956  # ── Think-block filtering unit tests ─────────────────────────────────────
 957  
 958  
 959  def _make_consumer() -> GatewayStreamConsumer:
 960      """Create a bare consumer for unit-testing the filter (no adapter needed)."""
 961      adapter = MagicMock()
 962      return GatewayStreamConsumer(adapter, "chat_test")
 963  
 964  
 965  class TestFilterAndAccumulate:
 966      """Unit tests for _filter_and_accumulate think-block suppression."""
 967  
 968      def test_plain_text_passes_through(self):
 969          c = _make_consumer()
 970          c._filter_and_accumulate("Hello world")
 971          assert c._accumulated == "Hello world"
 972  
 973      def test_complete_think_block_stripped(self):
 974          c = _make_consumer()
 975          c._filter_and_accumulate("<think>internal reasoning</think>Answer here")
 976          assert c._accumulated == "Answer here"
 977  
 978      def test_think_block_in_middle(self):
 979          c = _make_consumer()
 980          c._filter_and_accumulate("Prefix\n<think>reasoning</think>\nSuffix")
 981          assert c._accumulated == "Prefix\n\nSuffix"
 982  
 983      def test_think_block_split_across_deltas(self):
 984          c = _make_consumer()
 985          c._filter_and_accumulate("<think>start of")
 986          c._filter_and_accumulate(" reasoning</think>visible text")
 987          assert c._accumulated == "visible text"
 988  
 989      def test_opening_tag_split_across_deltas(self):
 990          c = _make_consumer()
 991          c._filter_and_accumulate("<thi")
 992          # Partial tag held back
 993          assert c._accumulated == ""
 994          c._filter_and_accumulate("nk>hidden</think>shown")
 995          assert c._accumulated == "shown"
 996  
 997      def test_closing_tag_split_across_deltas(self):
 998          c = _make_consumer()
 999          c._filter_and_accumulate("<think>hidden</thi")
1000          assert c._accumulated == ""
1001          c._filter_and_accumulate("nk>shown")
1002          assert c._accumulated == "shown"
1003  
1004      def test_multiple_think_blocks(self):
1005          c = _make_consumer()
1006          # Consecutive blocks with no text between them — both stripped
1007          c._filter_and_accumulate(
1008              "<think>block1</think><think>block2</think>visible"
1009          )
1010          assert c._accumulated == "visible"
1011  
1012      def test_multiple_think_blocks_with_text_between(self):
1013          """Think tag after non-whitespace is NOT a boundary (prose safety)."""
1014          c = _make_consumer()
1015          c._filter_and_accumulate(
1016              "<think>block1</think>A<think>block2</think>B"
1017          )
1018          # Second <think> follows 'A' (not a block boundary) — treated as prose
1019          assert "A" in c._accumulated
1020          assert "B" in c._accumulated
1021  
1022      def test_thinking_tag_variant(self):
1023          c = _make_consumer()
1024          c._filter_and_accumulate("<thinking>deep thought</thinking>Result")
1025          assert c._accumulated == "Result"
1026  
1027      def test_thought_tag_variant(self):
1028          c = _make_consumer()
1029          c._filter_and_accumulate("<thought>Gemma style</thought>Output")
1030          assert c._accumulated == "Output"
1031  
1032      def test_reasoning_scratchpad_variant(self):
1033          c = _make_consumer()
1034          c._filter_and_accumulate(
1035              "<REASONING_SCRATCHPAD>long plan</REASONING_SCRATCHPAD>Done"
1036          )
1037          assert c._accumulated == "Done"
1038  
1039      def test_case_insensitive_THINKING(self):
1040          c = _make_consumer()
1041          c._filter_and_accumulate("<THINKING>caps</THINKING>answer")
1042          assert c._accumulated == "answer"
1043  
1044      def test_prose_mention_not_stripped(self):
1045          """<think> mentioned mid-line in prose should NOT trigger filtering."""
1046          c = _make_consumer()
1047          c._filter_and_accumulate("The <think> tag is used for reasoning")
1048          assert "<think>" in c._accumulated
1049          assert "used for reasoning" in c._accumulated
1050  
1051      def test_prose_mention_after_text(self):
1052          """<think> after non-whitespace on same line is not a block boundary."""
1053          c = _make_consumer()
1054          c._filter_and_accumulate("Try using <think>some content</think> tags")
1055          assert "<think>" in c._accumulated
1056  
1057      def test_think_at_line_start_is_stripped(self):
1058          """<think> at start of a new line IS a block boundary."""
1059          c = _make_consumer()
1060          c._filter_and_accumulate("Previous line\n<think>reasoning</think>Next")
1061          assert "Previous line\nNext" == c._accumulated
1062  
1063      def test_think_with_only_whitespace_before(self):
1064          """<think> preceded by only whitespace on its line is a boundary."""
1065          c = _make_consumer()
1066          c._filter_and_accumulate("  <think>hidden</think>visible")
1067          # Leading whitespace before the tag is emitted, then block is stripped
1068          assert c._accumulated == "  visible"
1069  
1070      def test_flush_think_buffer_on_non_tag(self):
1071          """Partial tag that turns out not to be a tag is flushed."""
1072          c = _make_consumer()
1073          c._filter_and_accumulate("<thi")
1074          assert c._accumulated == ""
1075          # Flush explicitly (simulates stream end)
1076          c._flush_think_buffer()
1077          assert c._accumulated == "<thi"
1078  
1079      def test_flush_think_buffer_when_inside_block(self):
1080          """Flush while inside a think block does NOT emit buffered content."""
1081          c = _make_consumer()
1082          c._filter_and_accumulate("<think>still thinking")
1083          c._flush_think_buffer()
1084          assert c._accumulated == ""
1085  
1086      def test_unclosed_think_block_suppresses(self):
1087          """An unclosed <think> suppresses all subsequent content."""
1088          c = _make_consumer()
1089          c._filter_and_accumulate("Before\n<think>reasoning that never ends...")
1090          assert c._accumulated == "Before\n"
1091  
1092      def test_multiline_think_block(self):
1093          c = _make_consumer()
1094          c._filter_and_accumulate(
1095              "<think>\nLine 1\nLine 2\nLine 3\n</think>Final answer"
1096          )
1097          assert c._accumulated == "Final answer"
1098  
1099      def test_segment_reset_preserves_think_state(self):
1100          """_reset_segment_state should NOT clear think-block filter state."""
1101          c = _make_consumer()
1102          c._filter_and_accumulate("<think>start")
1103          c._reset_segment_state()
1104          # Still inside think block — subsequent text should be suppressed
1105          c._filter_and_accumulate("still hidden</think>visible")
1106          assert c._accumulated == "visible"
1107  
1108  
1109  class TestFilterAndAccumulateIntegration:
1110      """Integration: verify think blocks don't leak through the full run() path."""
1111  
1112      @pytest.mark.asyncio
1113      async def test_think_block_not_sent_to_platform(self):
1114          """Think blocks should be filtered before platform edit."""
1115          adapter = MagicMock()
1116          adapter.send = AsyncMock(
1117              return_value=SimpleNamespace(success=True, message_id="msg_1")
1118          )
1119          adapter.edit_message = AsyncMock(
1120              return_value=SimpleNamespace(success=True)
1121          )
1122          adapter.MAX_MESSAGE_LENGTH = 4096
1123  
1124          consumer = GatewayStreamConsumer(
1125              adapter,
1126              "chat_test",
1127              StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
1128          )
1129  
1130          # Simulate streaming: think block then visible text
1131          consumer.on_delta("<think>deep reasoning here</think>")
1132          consumer.on_delta("The answer is 42.")
1133          consumer.finish()
1134  
1135          task = asyncio.create_task(consumer.run())
1136          await asyncio.sleep(0.15)
1137  
1138          # The final text sent to the platform should NOT contain <think>
1139          all_calls = list(adapter.send.call_args_list) + list(
1140              adapter.edit_message.call_args_list
1141          )
1142          for call in all_calls:
1143              args, kwargs = call
1144              content = kwargs.get("content") or (args[0] if args else "")
1145              assert "<think>" not in content, f"Think tag leaked: {content}"
1146              assert "deep reasoning" not in content
1147  
1148          try:
1149              task.cancel()
1150              await task
1151          except asyncio.CancelledError:
1152              pass
1153  
1154  
1155  # ── buffer_only mode tests ─────────────────────────────────────────────
1156  
1157  
1158  class TestBufferOnlyMode:
1159      """Verify buffer_only mode suppresses intermediate edits and only
1160      flushes on structural boundaries (done, segment break, commentary)."""
1161  
1162      @pytest.mark.asyncio
1163      async def test_suppresses_intermediate_edits(self):
1164          """Time-based and size-based edits are skipped; only got_done flushes."""
1165          adapter = MagicMock()
1166          adapter.MAX_MESSAGE_LENGTH = 4096
1167          adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg1"))
1168          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1169  
1170          cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
1171          consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
1172  
1173          for word in ["Hello", " world", ", this", " is", " a", " test"]:
1174              consumer.on_delta(word)
1175          consumer.finish()
1176  
1177          await consumer.run()
1178  
1179          adapter.send.assert_called_once()
1180          adapter.edit_message.assert_not_called()
1181          assert "Hello world, this is a test" in adapter.send.call_args_list[0][1]["content"]
1182  
1183      @pytest.mark.asyncio
1184      async def test_flushes_on_segment_break(self):
1185          """A segment break (tool call boundary) flushes accumulated text."""
1186          adapter = MagicMock()
1187          adapter.MAX_MESSAGE_LENGTH = 4096
1188          adapter.send = AsyncMock(side_effect=[
1189              SimpleNamespace(success=True, message_id="msg1"),
1190              SimpleNamespace(success=True, message_id="msg2"),
1191          ])
1192          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1193  
1194          cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
1195          consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
1196  
1197          consumer.on_delta("Before tool call")
1198          consumer.on_delta(None)
1199          consumer.on_delta("After tool call")
1200          consumer.finish()
1201  
1202          await consumer.run()
1203  
1204          assert adapter.send.call_count == 2
1205          assert "Before tool call" in adapter.send.call_args_list[0][1]["content"]
1206          assert "After tool call" in adapter.send.call_args_list[1][1]["content"]
1207          adapter.edit_message.assert_not_called()
1208  
1209      @pytest.mark.asyncio
1210      async def test_flushes_on_commentary(self):
1211          """An interim commentary message flushes in buffer_only mode."""
1212          adapter = MagicMock()
1213          adapter.MAX_MESSAGE_LENGTH = 4096
1214          adapter.send = AsyncMock(side_effect=[
1215              SimpleNamespace(success=True, message_id="msg1"),
1216              SimpleNamespace(success=True, message_id="msg2"),
1217              SimpleNamespace(success=True, message_id="msg3"),
1218          ])
1219          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1220  
1221          cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
1222          consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
1223  
1224          consumer.on_delta("Working on it...")
1225          consumer.on_commentary("I'll search for that first.")
1226          consumer.on_delta("Here are the results.")
1227          consumer.finish()
1228  
1229          await consumer.run()
1230  
1231          # Three sends: accumulated text, commentary, final text
1232          assert adapter.send.call_count >= 2
1233          adapter.edit_message.assert_not_called()
1234  
1235      @pytest.mark.asyncio
1236      async def test_default_mode_still_triggers_intermediate_edits(self):
1237          """Regression: buffer_only=False (default) still does progressive edits."""
1238          adapter = MagicMock()
1239          adapter.MAX_MESSAGE_LENGTH = 4096
1240          adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg1"))
1241          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1242  
1243          # buffer_threshold=5 means any 5+ chars triggers an early edit
1244          cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="")
1245          consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
1246  
1247          consumer.on_delta("Hello world, this is long enough to trigger edits")
1248          consumer.finish()
1249  
1250          await consumer.run()
1251  
1252          # Should have at least one send. With buffer_threshold=5 and this much
1253          # text, the consumer may send then edit, or just send once at got_done.
1254          # The key assertion: this doesn't break.
1255          assert adapter.send.call_count >= 1
1256  
1257  
1258  # ── Cursor stripping on fallback (#7183) ────────────────────────────────────
1259  
1260  
1261  class TestCursorStrippingOnFallback:
1262      """Regression: cursor must be stripped when fallback continuation is empty (#7183).
1263  
1264      When _send_fallback_final is called with nothing new to deliver (the visible
1265      partial already matches final_text), the last edit may still show the cursor
1266      character because fallback mode was entered after a failed edit.  Before the
1267      fix this would leave the message permanently frozen with a visible ▉.
1268      """
1269  
1270      @pytest.mark.asyncio
1271      async def test_cursor_stripped_when_continuation_empty(self):
1272          """_send_fallback_final must attempt a final edit to strip the cursor."""
1273          adapter = MagicMock()
1274          adapter.MAX_MESSAGE_LENGTH = 4096
1275          adapter.edit_message = AsyncMock(
1276              return_value=SimpleNamespace(success=True, message_id="msg-1")
1277          )
1278  
1279          consumer = GatewayStreamConsumer(
1280              adapter, "chat-1",
1281              config=StreamConsumerConfig(cursor=" ▉"),
1282          )
1283          consumer._message_id = "msg-1"
1284          consumer._last_sent_text = "Hello world ▉"
1285          consumer._fallback_final_send = False
1286  
1287          await consumer._send_fallback_final("Hello world")
1288  
1289          adapter.edit_message.assert_called_once()
1290          call_args = adapter.edit_message.call_args
1291          assert call_args.kwargs["content"] == "Hello world"
1292          assert consumer._already_sent is True
1293          # _last_sent_text should reflect the cleaned text after a successful strip
1294          assert consumer._last_sent_text == "Hello world"
1295  
1296      @pytest.mark.asyncio
1297      async def test_cursor_not_stripped_when_no_cursor_configured(self):
1298          """No edit attempted when cursor is not configured."""
1299          adapter = MagicMock()
1300          adapter.MAX_MESSAGE_LENGTH = 4096
1301          adapter.edit_message = AsyncMock()
1302  
1303          consumer = GatewayStreamConsumer(
1304              adapter, "chat-1",
1305              config=StreamConsumerConfig(cursor=""),
1306          )
1307          consumer._message_id = "msg-1"
1308          consumer._last_sent_text = "Hello world"
1309          consumer._fallback_final_send = False
1310  
1311          await consumer._send_fallback_final("Hello world")
1312  
1313          adapter.edit_message.assert_not_called()
1314          assert consumer._already_sent is True
1315  
1316      @pytest.mark.asyncio
1317      async def test_cursor_strip_edit_failure_handled(self):
1318          """If the cursor-stripping edit itself fails, it must not crash and
1319          must not corrupt _last_sent_text."""
1320          adapter = MagicMock()
1321          adapter.MAX_MESSAGE_LENGTH = 4096
1322          adapter.edit_message = AsyncMock(
1323              return_value=SimpleNamespace(success=False, error="flood_control")
1324          )
1325  
1326          consumer = GatewayStreamConsumer(
1327              adapter, "chat-1",
1328              config=StreamConsumerConfig(cursor=" ▉"),
1329          )
1330          consumer._message_id = "msg-1"
1331          consumer._last_sent_text = "Hello ▉"
1332          consumer._fallback_final_send = False
1333  
1334          await consumer._send_fallback_final("Hello")
1335  
1336          # Should still set already_sent despite the cursor-strip edit failure
1337          assert consumer._already_sent is True
1338          # _last_sent_text must NOT be updated when the edit failed
1339          assert consumer._last_sent_text == "Hello ▉"
1340  
1341  
1342  # ── on_new_message callback (tool-progress linearization) ─────────────
1343  
1344  
1345  class TestOnNewMessageCallback:
1346      """The on_new_message callback fires whenever a fresh content bubble
1347      lands on the platform. Gateway uses this to close off the current
1348      tool-progress bubble so the next tool.started opens a new bubble
1349      below the content — preserving chronological order in the chat.
1350  
1351      Before this callback existed (post PR #7885), content messages got
1352      their own bubbles after segment breaks, but the tool-progress task
1353      kept editing the ORIGINAL progress bubble above all new content.
1354      Result: tool lines appeared stacked in the upper bubble while
1355      content messages lined up below, making the timeline look scrambled.
1356      """
1357  
1358      @pytest.mark.asyncio
1359      async def test_callback_fires_on_first_send(self):
1360          """First-send of a new content bubble fires on_new_message."""
1361          adapter = MagicMock()
1362          adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
1363          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1364          adapter.MAX_MESSAGE_LENGTH = 4096
1365  
1366          events = []
1367          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
1368          consumer = GatewayStreamConsumer(
1369              adapter, "chat", config,
1370              on_new_message=lambda: events.append("reset"),
1371          )
1372  
1373          consumer.on_delta("Hello")
1374          consumer.finish()
1375          await consumer.run()
1376  
1377          assert events == ["reset"]
1378  
1379      @pytest.mark.asyncio
1380      async def test_callback_fires_once_per_segment(self):
1381          """A new first-send fires the callback again after segment break."""
1382          adapter = MagicMock()
1383          msg_counter = iter(["msg_1", "msg_2", "msg_3"])
1384          adapter.send = AsyncMock(
1385              side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter))
1386          )
1387          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1388          adapter.MAX_MESSAGE_LENGTH = 4096
1389  
1390          events = []
1391          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
1392          consumer = GatewayStreamConsumer(
1393              adapter, "chat", config,
1394              on_new_message=lambda: events.append("reset"),
1395          )
1396  
1397          consumer.on_delta("A")
1398          consumer.on_delta(None)
1399          consumer.on_delta("B")
1400          consumer.on_delta(None)
1401          consumer.on_delta("C")
1402          consumer.finish()
1403          await consumer.run()
1404  
1405          # Three content bubbles ⇒ three reset notifications
1406          assert events == ["reset", "reset", "reset"]
1407  
1408      @pytest.mark.asyncio
1409      async def test_callback_not_fired_on_edit(self):
1410          """Subsequent edits of the same bubble do NOT fire the callback."""
1411          adapter = MagicMock()
1412          adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
1413          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1414          adapter.MAX_MESSAGE_LENGTH = 4096
1415  
1416          events = []
1417          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
1418          consumer = GatewayStreamConsumer(
1419              adapter, "chat", config,
1420              on_new_message=lambda: events.append("reset"),
1421          )
1422  
1423          consumer.on_delta("Hello")
1424          task = asyncio.create_task(consumer.run())
1425          await asyncio.sleep(0.05)
1426          consumer.on_delta(" world")
1427          await asyncio.sleep(0.05)
1428          consumer.on_delta(" more")
1429          await asyncio.sleep(0.05)
1430          consumer.finish()
1431          await task
1432  
1433          # Only one first-send happened; edits do not re-fire.
1434          assert events == ["reset"]
1435  
1436      @pytest.mark.asyncio
1437      async def test_callback_fires_on_commentary(self):
1438          """Commentary messages are fresh bubbles too — fire the callback."""
1439          adapter = MagicMock()
1440          adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
1441          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1442          adapter.MAX_MESSAGE_LENGTH = 4096
1443  
1444          events = []
1445          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
1446          consumer = GatewayStreamConsumer(
1447              adapter, "chat", config,
1448              on_new_message=lambda: events.append("reset"),
1449          )
1450  
1451          consumer.on_commentary("I'll search for that first.")
1452          consumer.finish()
1453          await consumer.run()
1454  
1455          assert events == ["reset"]
1456  
1457      @pytest.mark.asyncio
1458      async def test_callback_error_swallowed(self):
1459          """Exceptions in the callback do not crash the consumer."""
1460          adapter = MagicMock()
1461          adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
1462          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1463          adapter.MAX_MESSAGE_LENGTH = 4096
1464  
1465          def raiser():
1466              raise RuntimeError("boom")
1467  
1468          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
1469          consumer = GatewayStreamConsumer(
1470              adapter, "chat", config,
1471              on_new_message=raiser,
1472          )
1473  
1474          consumer.on_delta("Hello")
1475          consumer.finish()
1476          await consumer.run()  # must not raise
1477  
1478          assert consumer.already_sent is True
1479  
1480      @pytest.mark.asyncio
1481      async def test_no_callback_when_none(self):
1482          """Consumer works correctly when on_new_message is None (default)."""
1483          adapter = MagicMock()
1484          adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
1485          adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
1486          adapter.MAX_MESSAGE_LENGTH = 4096
1487  
1488          config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
1489          consumer = GatewayStreamConsumer(adapter, "chat", config)  # no callback
1490  
1491          consumer.on_delta("Hello")
1492          consumer.finish()
1493          await consumer.run()
1494  
1495          assert consumer.already_sent is True