test_telegram_text_batching.py
1 """Tests for Telegram text message aggregation. 2 3 When a user sends a long message, Telegram clients split it into multiple 4 updates. The TelegramAdapter should buffer rapid successive text messages 5 from the same session and aggregate them before dispatching. 6 """ 7 8 import asyncio 9 from unittest.mock import AsyncMock, MagicMock, patch 10 11 import pytest 12 13 from gateway.config import Platform, PlatformConfig 14 from gateway.platforms.base import MessageEvent, MessageType, SessionSource 15 16 17 def _make_adapter(): 18 """Create a minimal TelegramAdapter for testing text batching.""" 19 from gateway.platforms.telegram import TelegramAdapter 20 21 config = PlatformConfig(enabled=True, token="test-token") 22 adapter = object.__new__(TelegramAdapter) 23 adapter._platform = Platform.TELEGRAM 24 adapter.config = config 25 adapter._pending_text_batches = {} 26 adapter._pending_text_batch_tasks = {} 27 adapter._text_batch_delay_seconds = 0.1 # fast for tests 28 adapter._active_sessions = {} 29 adapter._pending_messages = {} 30 adapter._message_handler = AsyncMock() 31 adapter.handle_message = AsyncMock() 32 return adapter 33 34 35 def _make_event(text: str, chat_id: str = "12345") -> MessageEvent: 36 return MessageEvent( 37 text=text, 38 message_type=MessageType.TEXT, 39 source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"), 40 ) 41 42 43 class TestTextBatching: 44 @pytest.mark.asyncio 45 async def test_single_message_dispatched_after_delay(self): 46 adapter = _make_adapter() 47 event = _make_event("hello world") 48 49 adapter._enqueue_text_event(event) 50 51 # Not dispatched yet 52 adapter.handle_message.assert_not_called() 53 54 # Wait for flush 55 await asyncio.sleep(0.2) 56 57 adapter.handle_message.assert_called_once() 58 dispatched = adapter.handle_message.call_args[0][0] 59 assert dispatched.text == "hello world" 60 61 @pytest.mark.asyncio 62 async def test_split_messages_aggregated(self): 63 """Two rapid messages from the same chat should be merged.""" 64 adapter = _make_adapter() 65 66 adapter._enqueue_text_event(_make_event("This is part one of a long")) 67 await asyncio.sleep(0.02) # small gap, within batch window 68 adapter._enqueue_text_event(_make_event("message that was split by Telegram.")) 69 70 # Not dispatched yet (timer restarted) 71 adapter.handle_message.assert_not_called() 72 73 # Wait for flush 74 await asyncio.sleep(0.2) 75 76 adapter.handle_message.assert_called_once() 77 dispatched = adapter.handle_message.call_args[0][0] 78 assert "part one" in dispatched.text 79 assert "split by Telegram" in dispatched.text 80 81 @pytest.mark.asyncio 82 async def test_three_way_split_aggregated(self): 83 """Three rapid messages should all merge.""" 84 adapter = _make_adapter() 85 86 adapter._enqueue_text_event(_make_event("chunk 1")) 87 await asyncio.sleep(0.02) 88 adapter._enqueue_text_event(_make_event("chunk 2")) 89 await asyncio.sleep(0.02) 90 adapter._enqueue_text_event(_make_event("chunk 3")) 91 92 await asyncio.sleep(0.2) 93 94 adapter.handle_message.assert_called_once() 95 text = adapter.handle_message.call_args[0][0].text 96 assert "chunk 1" in text 97 assert "chunk 2" in text 98 assert "chunk 3" in text 99 100 @pytest.mark.asyncio 101 async def test_different_chats_not_merged(self): 102 """Messages from different chats should be separate batches.""" 103 adapter = _make_adapter() 104 105 adapter._enqueue_text_event(_make_event("from user A", chat_id="111")) 106 adapter._enqueue_text_event(_make_event("from user B", chat_id="222")) 107 108 await asyncio.sleep(0.2) 109 110 assert adapter.handle_message.call_count == 2 111 112 @pytest.mark.asyncio 113 async def test_batch_cleans_up_after_flush(self): 114 """After flushing, internal state should be clean.""" 115 adapter = _make_adapter() 116 117 adapter._enqueue_text_event(_make_event("test")) 118 await asyncio.sleep(0.2) 119 120 assert len(adapter._pending_text_batches) == 0 121 assert len(adapter._pending_text_batch_tasks) == 0