/ tests / gateway / test_base_topic_sessions.py
test_base_topic_sessions.py
  1  """Tests for BasePlatformAdapter topic-aware session handling."""
  2  
  3  import asyncio
  4  from types import SimpleNamespace
  5  
  6  import pytest
  7  
  8  from gateway.config import Platform, PlatformConfig
  9  from gateway.platforms.base import BasePlatformAdapter, MessageEvent, ProcessingOutcome, SendResult
 10  from gateway.session import SessionSource, build_session_key
 11  
 12  
 13  class DummyTelegramAdapter(BasePlatformAdapter):
 14      def __init__(self):
 15          super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM)
 16          self.sent = []
 17          self.typing = []
 18          self.processing_hooks = []
 19  
 20      async def connect(self) -> bool:
 21          return True
 22  
 23      async def disconnect(self) -> None:
 24          return None
 25  
 26      async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
 27          self.sent.append(
 28              {
 29                  "chat_id": chat_id,
 30                  "content": content,
 31                  "reply_to": reply_to,
 32                  "metadata": metadata,
 33              }
 34          )
 35          return SendResult(success=True, message_id="1")
 36  
 37      async def send_typing(self, chat_id: str, metadata=None) -> None:
 38          self.typing.append({"chat_id": chat_id, "metadata": metadata})
 39          return None
 40  
 41      async def get_chat_info(self, chat_id: str):
 42          return {"id": chat_id}
 43  
 44      async def on_processing_start(self, event: MessageEvent) -> None:
 45          self.processing_hooks.append(("start", event.message_id))
 46  
 47      async def on_processing_complete(self, event: MessageEvent, outcome: ProcessingOutcome) -> None:
 48          self.processing_hooks.append(("complete", event.message_id, outcome))
 49  
 50  
 51  def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent:
 52      return MessageEvent(
 53          text="hello",
 54          source=SessionSource(
 55              platform=Platform.TELEGRAM,
 56              chat_id=chat_id,
 57              chat_type="group",
 58              thread_id=thread_id,
 59          ),
 60          message_id=message_id,
 61      )
 62  
 63  
 64  class TestBasePlatformTopicSessions:
 65      @pytest.mark.asyncio
 66      async def test_handle_message_does_not_interrupt_different_topic(self, monkeypatch):
 67          adapter = DummyTelegramAdapter()
 68          adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
 69  
 70          active_event = _make_event("-1001", "10")
 71          adapter._active_sessions[build_session_key(active_event.source)] = asyncio.Event()
 72  
 73          scheduled = []
 74  
 75          def fake_create_task(coro):
 76              scheduled.append(coro)
 77              coro.close()
 78              return SimpleNamespace()
 79  
 80          monkeypatch.setattr(asyncio, "create_task", fake_create_task)
 81  
 82          await adapter.handle_message(_make_event("-1001", "11"))
 83  
 84          assert len(scheduled) == 1
 85          assert adapter._pending_messages == {}
 86  
 87      @pytest.mark.asyncio
 88      async def test_handle_message_interrupts_same_topic(self, monkeypatch):
 89          adapter = DummyTelegramAdapter()
 90          adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
 91  
 92          active_event = _make_event("-1001", "10")
 93          adapter._active_sessions[build_session_key(active_event.source)] = asyncio.Event()
 94  
 95          scheduled = []
 96  
 97          def fake_create_task(coro):
 98              scheduled.append(coro)
 99              coro.close()
100              return SimpleNamespace()
101  
102          monkeypatch.setattr(asyncio, "create_task", fake_create_task)
103  
104          pending_event = _make_event("-1001", "10", message_id="2")
105          await adapter.handle_message(pending_event)
106  
107          assert scheduled == []
108          assert adapter.get_pending_message(build_session_key(pending_event.source)) == pending_event
109  
110      @pytest.mark.asyncio
111      async def test_process_message_background_replies_in_same_topic(self):
112          adapter = DummyTelegramAdapter()
113          typing_calls = []
114  
115          async def handler(_event):
116              await asyncio.sleep(0)
117              return "ack"
118  
119          async def hold_typing(_chat_id, interval=2.0, metadata=None):
120              typing_calls.append({"chat_id": _chat_id, "metadata": metadata})
121              await asyncio.Event().wait()
122  
123          adapter.set_message_handler(handler)
124          adapter._keep_typing = hold_typing
125  
126          event = _make_event("-1001", "17585")
127          await adapter._process_message_background(event, build_session_key(event.source))
128  
129          assert adapter.sent == [
130              {
131                  "chat_id": "-1001",
132                  "content": "ack",
133                  "reply_to": "1",
134                  "metadata": {"thread_id": "17585"},
135              }
136          ]
137          assert typing_calls == [
138              {
139                  "chat_id": "-1001",
140                  "metadata": {"thread_id": "17585"},
141              }
142          ]
143          assert adapter.processing_hooks == [
144              ("start", "1"),
145              ("complete", "1", ProcessingOutcome.SUCCESS),
146          ]
147  
148      @pytest.mark.asyncio
149      async def test_process_message_background_marks_total_send_failure_unsuccessful(self):
150          adapter = DummyTelegramAdapter()
151  
152          async def handler(_event):
153              await asyncio.sleep(0)
154              return "ack"
155  
156          async def failing_send(*_args, **_kwargs):
157              return SendResult(success=False, error="send failed")
158  
159          async def hold_typing(_chat_id, interval=2.0, metadata=None):
160              await asyncio.Event().wait()
161  
162          adapter.set_message_handler(handler)
163          adapter.send = failing_send
164          adapter._keep_typing = hold_typing
165  
166          event = _make_event("-1001", "17585")
167          await adapter._process_message_background(event, build_session_key(event.source))
168  
169          assert adapter.processing_hooks == [
170              ("start", "1"),
171              ("complete", "1", ProcessingOutcome.FAILURE),
172          ]
173  
174      @pytest.mark.asyncio
175      async def test_process_message_background_marks_exception_unsuccessful(self):
176          adapter = DummyTelegramAdapter()
177  
178          async def handler(_event):
179              await asyncio.sleep(0)
180              raise RuntimeError("boom")
181  
182          async def hold_typing(_chat_id, interval=2.0, metadata=None):
183              await asyncio.Event().wait()
184  
185          adapter.set_message_handler(handler)
186          adapter._keep_typing = hold_typing
187  
188          event = _make_event("-1001", "17585")
189          await adapter._process_message_background(event, build_session_key(event.source))
190  
191          assert adapter.processing_hooks == [
192              ("start", "1"),
193              ("complete", "1", ProcessingOutcome.FAILURE),
194          ]
195  
196      @pytest.mark.asyncio
197      async def test_process_message_background_marks_cancellation_unsuccessful(self):
198          adapter = DummyTelegramAdapter()
199          release = asyncio.Event()
200  
201          async def handler(_event):
202              await release.wait()
203              return "ack"
204  
205          async def hold_typing(_chat_id, interval=2.0, metadata=None):
206              await asyncio.Event().wait()
207  
208          adapter.set_message_handler(handler)
209          adapter._keep_typing = hold_typing
210  
211          event = _make_event("-1001", "17585")
212          task = asyncio.create_task(adapter._process_message_background(event, build_session_key(event.source)))
213          await asyncio.sleep(0)
214          task.cancel()
215  
216          with pytest.raises(asyncio.CancelledError):
217              await task
218  
219          assert adapter.processing_hooks == [
220              ("start", "1"),
221              ("complete", "1", ProcessingOutcome.FAILURE),
222          ]
223  
224      @pytest.mark.asyncio
225      async def test_cancel_background_tasks_marks_expected_cancellation_cancelled(self):
226          adapter = DummyTelegramAdapter()
227          release = asyncio.Event()
228  
229          async def handler(_event):
230              await release.wait()
231              return "ack"
232  
233          async def hold_typing(_chat_id, interval=2.0, metadata=None):
234              await asyncio.Event().wait()
235  
236          adapter.set_message_handler(handler)
237          adapter._keep_typing = hold_typing
238  
239          event = _make_event("-1001", "17585")
240          await adapter.handle_message(event)
241          await asyncio.sleep(0)
242  
243          await adapter.cancel_background_tasks()
244  
245          assert adapter.processing_hooks == [
246              ("start", "1"),
247              ("complete", "1", ProcessingOutcome.CANCELLED),
248          ]