test_base_topic_sessions.py
1 """Tests for BasePlatformAdapter topic-aware session handling.""" 2 3 import asyncio 4 from types import SimpleNamespace 5 6 import pytest 7 8 from gateway.config import Platform, PlatformConfig 9 from gateway.platforms.base import BasePlatformAdapter, MessageEvent, ProcessingOutcome, SendResult 10 from gateway.session import SessionSource, build_session_key 11 12 13 class DummyTelegramAdapter(BasePlatformAdapter): 14 def __init__(self): 15 super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM) 16 self.sent = [] 17 self.typing = [] 18 self.processing_hooks = [] 19 20 async def connect(self) -> bool: 21 return True 22 23 async def disconnect(self) -> None: 24 return None 25 26 async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult: 27 self.sent.append( 28 { 29 "chat_id": chat_id, 30 "content": content, 31 "reply_to": reply_to, 32 "metadata": metadata, 33 } 34 ) 35 return SendResult(success=True, message_id="1") 36 37 async def send_typing(self, chat_id: str, metadata=None) -> None: 38 self.typing.append({"chat_id": chat_id, "metadata": metadata}) 39 return None 40 41 async def get_chat_info(self, chat_id: str): 42 return {"id": chat_id} 43 44 async def on_processing_start(self, event: MessageEvent) -> None: 45 self.processing_hooks.append(("start", event.message_id)) 46 47 async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None: 48 self.processing_hooks.append(("complete", event.message_id, outcome)) 49 50 51 def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent: 52 return MessageEvent( 53 text="hello", 54 source=SessionSource( 55 platform=Platform.TELEGRAM, 56 chat_id=chat_id, 57 chat_type="group", 58 thread_id=thread_id, 59 ), 60 message_id=message_id, 61 ) 62 63 64 class TestBasePlatformTopicSessions: 65 @pytest.mark.asyncio 66 async def test_handle_message_does_not_interrupt_different_topic(self, monkeypatch): 67 adapter = DummyTelegramAdapter() 68 adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None)) 69 70 active_event = _make_event("-1001", "10") 71 adapter._active_sessions[build_session_key(active_event.source)] = asyncio.Event() 72 73 scheduled = [] 74 75 def fake_create_task(coro): 76 scheduled.append(coro) 77 coro.close() 78 return SimpleNamespace() 79 80 monkeypatch.setattr(asyncio, "create_task", fake_create_task) 81 82 await adapter.handle_message(_make_event("-1001", "11")) 83 84 assert len(scheduled) == 1 85 assert adapter._pending_messages == {} 86 87 @pytest.mark.asyncio 88 async def test_handle_message_interrupts_same_topic(self, monkeypatch): 89 adapter = DummyTelegramAdapter() 90 adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None)) 91 92 active_event = _make_event("-1001", "10") 93 adapter._active_sessions[build_session_key(active_event.source)] = asyncio.Event() 94 95 scheduled = [] 96 97 def fake_create_task(coro): 98 scheduled.append(coro) 99 coro.close() 100 return SimpleNamespace() 101 102 monkeypatch.setattr(asyncio, "create_task", fake_create_task) 103 104 pending_event = _make_event("-1001", "10", message_id="2") 105 await adapter.handle_message(pending_event) 106 107 assert scheduled == [] 108 assert adapter.get_pending_message(build_session_key(pending_event.source)) == pending_event 109 110 @pytest.mark.asyncio 111 async def test_process_message_background_replies_in_same_topic(self): 112 adapter = DummyTelegramAdapter() 113 typing_calls = [] 114 115 async def handler(_event): 116 await asyncio.sleep(0) 117 return "ack" 118 119 async def hold_typing(_chat_id, interval=2.0, metadata=None): 120 typing_calls.append({"chat_id": _chat_id, "metadata": metadata}) 121 await asyncio.Event().wait() 122 123 adapter.set_message_handler(handler) 124 adapter._keep_typing = hold_typing 125 126 event = _make_event("-1001", "17585") 127 await adapter._process_message_background(event, build_session_key(event.source)) 128 129 assert adapter.sent == [ 130 { 131 "chat_id": "-1001", 132 "content": "ack", 133 "reply_to": "1", 134 "metadata": {"thread_id": "17585"}, 135 } 136 ] 137 assert typing_calls == [ 138 { 139 "chat_id": "-1001", 140 "metadata": {"thread_id": "17585"}, 141 } 142 ] 143 assert adapter.processing_hooks == [ 144 ("start", "1"), 145 ("complete", "1", ProcessingOutcome.SUCCESS), 146 ] 147 148 @pytest.mark.asyncio 149 async def test_process_message_background_marks_total_send_failure_unsuccessful(self): 150 adapter = DummyTelegramAdapter() 151 152 async def handler(_event): 153 await asyncio.sleep(0) 154 return "ack" 155 156 async def failing_send(*_args, **_kwargs): 157 return SendResult(success=False, error="send failed") 158 159 async def hold_typing(_chat_id, interval=2.0, metadata=None): 160 await asyncio.Event().wait() 161 162 adapter.set_message_handler(handler) 163 adapter.send = failing_send 164 adapter._keep_typing = hold_typing 165 166 event = _make_event("-1001", "17585") 167 await adapter._process_message_background(event, build_session_key(event.source)) 168 169 assert adapter.processing_hooks == [ 170 ("start", "1"), 171 ("complete", "1", ProcessingOutcome.FAILURE), 172 ] 173 174 @pytest.mark.asyncio 175 async def test_process_message_background_marks_exception_unsuccessful(self): 176 adapter = DummyTelegramAdapter() 177 178 async def handler(_event): 179 await asyncio.sleep(0) 180 raise RuntimeError("boom") 181 182 async def hold_typing(_chat_id, interval=2.0, metadata=None): 183 await asyncio.Event().wait() 184 185 adapter.set_message_handler(handler) 186 adapter._keep_typing = hold_typing 187 188 event = _make_event("-1001", "17585") 189 await adapter._process_message_background(event, build_session_key(event.source)) 190 191 assert adapter.processing_hooks == [ 192 ("start", "1"), 193 ("complete", "1", ProcessingOutcome.FAILURE), 194 ] 195 196 @pytest.mark.asyncio 197 async def test_process_message_background_marks_cancellation_unsuccessful(self): 198 adapter = DummyTelegramAdapter() 199 release = asyncio.Event() 200 201 async def handler(_event): 202 await release.wait() 203 return "ack" 204 205 async def hold_typing(_chat_id, interval=2.0, metadata=None): 206 await asyncio.Event().wait() 207 208 adapter.set_message_handler(handler) 209 adapter._keep_typing = hold_typing 210 211 event = _make_event("-1001", "17585") 212 task = asyncio.create_task(adapter._process_message_background(event, build_session_key(event.source))) 213 await asyncio.sleep(0) 214 task.cancel() 215 216 with pytest.raises(asyncio.CancelledError): 217 await task 218 219 assert adapter.processing_hooks == [ 220 ("start", "1"), 221 ("complete", "1", ProcessingOutcome.FAILURE), 222 ] 223 224 @pytest.mark.asyncio 225 async def test_cancel_background_tasks_marks_expected_cancellation_cancelled(self): 226 adapter = DummyTelegramAdapter() 227 release = asyncio.Event() 228 229 async def handler(_event): 230 await release.wait() 231 return "ack" 232 233 async def hold_typing(_chat_id, interval=2.0, metadata=None): 234 await asyncio.Event().wait() 235 236 adapter.set_message_handler(handler) 237 adapter._keep_typing = hold_typing 238 239 event = _make_event("-1001", "17585") 240 await adapter.handle_message(event) 241 await asyncio.sleep(0) 242 243 await adapter.cancel_background_tasks() 244 245 assert adapter.processing_hooks == [ 246 ("start", "1"), 247 ("complete", "1", ProcessingOutcome.CANCELLED), 248 ]