/ tests / gateway / test_telegram_text_batching.py
test_telegram_text_batching.py
  1  """Tests for Telegram text message aggregation.
  2  
  3  When a user sends a long message, Telegram clients split it into multiple
  4  updates.  The TelegramAdapter should buffer rapid successive text messages
  5  from the same session and aggregate them before dispatching.
  6  """
  7  
  8  import asyncio
  9  from unittest.mock import AsyncMock, MagicMock, patch
 10  
 11  import pytest
 12  
 13  from gateway.config import Platform, PlatformConfig
 14  from gateway.platforms.base import MessageEvent, MessageType, SessionSource
 15  
 16  
 17  def _make_adapter():
 18      """Create a minimal TelegramAdapter for testing text batching."""
 19      from gateway.platforms.telegram import TelegramAdapter
 20  
 21      config = PlatformConfig(enabled=True, token="test-token")
 22      adapter = object.__new__(TelegramAdapter)
 23      adapter._platform = Platform.TELEGRAM
 24      adapter.config = config
 25      adapter._pending_text_batches = {}
 26      adapter._pending_text_batch_tasks = {}
 27      adapter._text_batch_delay_seconds = 0.1  # fast for tests
 28      adapter._active_sessions = {}
 29      adapter._pending_messages = {}
 30      adapter._message_handler = AsyncMock()
 31      adapter.handle_message = AsyncMock()
 32      return adapter
 33  
 34  
 35  def _make_event(text: str, chat_id: str = "12345") -> MessageEvent:
 36      return MessageEvent(
 37          text=text,
 38          message_type=MessageType.TEXT,
 39          source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"),
 40      )
 41  
 42  
 43  class TestTextBatching:
 44      @pytest.mark.asyncio
 45      async def test_single_message_dispatched_after_delay(self):
 46          adapter = _make_adapter()
 47          event = _make_event("hello world")
 48  
 49          adapter._enqueue_text_event(event)
 50  
 51          # Not dispatched yet
 52          adapter.handle_message.assert_not_called()
 53  
 54          # Wait for flush
 55          await asyncio.sleep(0.2)
 56  
 57          adapter.handle_message.assert_called_once()
 58          dispatched = adapter.handle_message.call_args[0][0]
 59          assert dispatched.text == "hello world"
 60  
 61      @pytest.mark.asyncio
 62      async def test_split_messages_aggregated(self):
 63          """Two rapid messages from the same chat should be merged."""
 64          adapter = _make_adapter()
 65  
 66          adapter._enqueue_text_event(_make_event("This is part one of a long"))
 67          await asyncio.sleep(0.02)  # small gap, within batch window
 68          adapter._enqueue_text_event(_make_event("message that was split by Telegram."))
 69  
 70          # Not dispatched yet (timer restarted)
 71          adapter.handle_message.assert_not_called()
 72  
 73          # Wait for flush
 74          await asyncio.sleep(0.2)
 75  
 76          adapter.handle_message.assert_called_once()
 77          dispatched = adapter.handle_message.call_args[0][0]
 78          assert "part one" in dispatched.text
 79          assert "split by Telegram" in dispatched.text
 80  
 81      @pytest.mark.asyncio
 82      async def test_three_way_split_aggregated(self):
 83          """Three rapid messages should all merge."""
 84          adapter = _make_adapter()
 85  
 86          adapter._enqueue_text_event(_make_event("chunk 1"))
 87          await asyncio.sleep(0.02)
 88          adapter._enqueue_text_event(_make_event("chunk 2"))
 89          await asyncio.sleep(0.02)
 90          adapter._enqueue_text_event(_make_event("chunk 3"))
 91  
 92          await asyncio.sleep(0.2)
 93  
 94          adapter.handle_message.assert_called_once()
 95          text = adapter.handle_message.call_args[0][0].text
 96          assert "chunk 1" in text
 97          assert "chunk 2" in text
 98          assert "chunk 3" in text
 99  
100      @pytest.mark.asyncio
101      async def test_different_chats_not_merged(self):
102          """Messages from different chats should be separate batches."""
103          adapter = _make_adapter()
104  
105          adapter._enqueue_text_event(_make_event("from user A", chat_id="111"))
106          adapter._enqueue_text_event(_make_event("from user B", chat_id="222"))
107  
108          await asyncio.sleep(0.2)
109  
110          assert adapter.handle_message.call_count == 2
111  
112      @pytest.mark.asyncio
113      async def test_batch_cleans_up_after_flush(self):
114          """After flushing, internal state should be clean."""
115          adapter = _make_adapter()
116  
117          adapter._enqueue_text_event(_make_event("test"))
118          await asyncio.sleep(0.2)
119  
120          assert len(adapter._pending_text_batches) == 0
121          assert len(adapter._pending_text_batch_tasks) == 0