/ tests / gateway / test_keep_typing_timeout.py
test_keep_typing_timeout.py
  1  """Tests for BasePlatformAdapter._keep_typing timeout-per-tick behavior.
  2  
  3  When the gateway is waiting on a long upstream provider response (e.g.
  4  Anthropic/opus-4.7 first-token latency climbing during an upstream blip),
  5  the model-call socket is blocked on the worker thread but the asyncio loop
  6  is still running, and ``_keep_typing`` refreshes the platform typing
  7  indicator every 2 seconds.
  8  
  9  The bug: each ``send_typing`` call is an HTTP round-trip to the platform API
 10  (Telegram/Discord). If the same network instability that's slowing the model
 11  call also makes ``send_typing`` slow (5-30s response time), the refresh loop
 12  stalls inside the ``await self.send_typing(...)`` call. Platform-side typing
 13  expires at ~5s, so the bubble dies and doesn't come back until that stuck
 14  call returns — exactly when the user most needs the "yes, still working"
 15  signal.
 16  
 17  The fix: bound each ``send_typing`` with ``asyncio.wait_for``. If a
 18  send_typing takes longer than the per-tick budget (default 1.5s when
 19  interval=2.0), abandon it and let the next scheduled tick fire a fresh
 20  call. As long as any one of them succeeds within the ~5s platform window,
 21  the bubble stays visible across provider stalls.
 22  """
 23  
 24  import asyncio
 25  from unittest.mock import MagicMock
 26  
 27  import pytest
 28  
 29  from gateway.platforms.base import (
 30      BasePlatformAdapter,
 31      Platform,
 32      PlatformConfig,
 33      SendResult,
 34  )
 35  
 36  
 37  class _StubAdapter(BasePlatformAdapter):
 38      def __init__(self):
 39          super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
 40  
 41      async def connect(self) -> bool:
 42          return True
 43  
 44      async def disconnect(self) -> None:
 45          self._mark_disconnected()
 46  
 47      async def send(self, chat_id, content, reply_to=None, metadata=None):
 48          return SendResult(success=True, message_id="m1")
 49  
 50      async def get_chat_info(self, chat_id):
 51          return {"id": chat_id, "type": "dm"}
 52  
 53  
 54  class TestKeepTypingTimeoutPerTick:
 55      @pytest.mark.asyncio
 56      async def test_slow_send_typing_does_not_block_cadence(self, monkeypatch):
 57          """A send_typing that hangs longer than the per-tick budget must be
 58          abandoned so the next scheduled tick can fire a fresh call."""
 59          adapter = _StubAdapter()
 60          call_events = []
 61  
 62          async def slow_send_typing(chat_id, metadata=None):
 63              # Simulate a stuck HTTP round-trip. If _keep_typing awaits this
 64              # unconditionally, the loop stalls for the full duration.
 65              call_events.append("start")
 66              try:
 67                  await asyncio.sleep(10)
 68              finally:
 69                  call_events.append("finish-or-cancel")
 70  
 71          monkeypatch.setattr(adapter, "send_typing", slow_send_typing)
 72          # Avoid stop_typing side-effects in the finally block.
 73          adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0))
 74  
 75          stop_event = asyncio.Event()
 76          # Start the typing loop, let it run ~3s (should fire 2 ticks) then stop.
 77          task = asyncio.create_task(
 78              adapter._keep_typing(
 79                  chat_id="123",
 80                  interval=1.0,
 81                  stop_event=stop_event,
 82              )
 83          )
 84          await asyncio.sleep(3.0)
 85          stop_event.set()
 86          try:
 87              await asyncio.wait_for(task, timeout=2.0)
 88          except asyncio.TimeoutError:
 89              task.cancel()
 90              pytest.fail(
 91                  "_keep_typing did not exit within 2s of stop_event.set() — "
 92                  "it is blocked on a slow send_typing call"
 93              )
 94  
 95          # With per-tick timeout, we should see MULTIPLE send_typing starts
 96          # despite each being slow (abandoned via TimeoutError).  Without the
 97          # fix there would be exactly 1 start (the one still stuck).
 98          starts = [e for e in call_events if e == "start"]
 99          assert len(starts) >= 2, (
100              f"expected at least 2 send_typing ticks across 3s of slow "
101              f"operation, got {len(starts)} — refresh cadence is stalled "
102              f"on a slow send_typing"
103          )
104  
105      @pytest.mark.asyncio
106      async def test_fast_send_typing_still_gets_awaited(self, monkeypatch):
107          """When send_typing is fast (normal case), it must still complete
108          normally — the timeout is only an upper bound, not a cap on
109          successful calls."""
110          adapter = _StubAdapter()
111          completed = []
112  
113          async def fast_send_typing(chat_id, metadata=None):
114              await asyncio.sleep(0.01)  # well under the timeout
115              completed.append(chat_id)
116  
117          monkeypatch.setattr(adapter, "send_typing", fast_send_typing)
118          adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0))
119  
120          stop_event = asyncio.Event()
121          task = asyncio.create_task(
122              adapter._keep_typing(
123                  chat_id="456",
124                  interval=0.5,
125                  stop_event=stop_event,
126              )
127          )
128          await asyncio.sleep(1.2)  # ~3 ticks
129          stop_event.set()
130          await asyncio.wait_for(task, timeout=1.0)
131  
132          assert len(completed) >= 2, (
133              f"expected multiple completed send_typing calls, got "
134              f"{len(completed)}"
135          )
136          assert all(c == "456" for c in completed)
137  
138      @pytest.mark.asyncio
139      async def test_send_typing_exception_does_not_kill_loop(self, monkeypatch):
140          """A send_typing that raises (e.g. transient HTTP 500) must be
141          caught so the loop continues refreshing on schedule."""
142          adapter = _StubAdapter()
143          tick_count = {"n": 0}
144  
145          async def flaky_send_typing(chat_id, metadata=None):
146              tick_count["n"] += 1
147              if tick_count["n"] == 1:
148                  raise RuntimeError("transient upstream error")
149              # Subsequent calls succeed.
150  
151          monkeypatch.setattr(adapter, "send_typing", flaky_send_typing)
152          adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0))
153  
154          stop_event = asyncio.Event()
155          task = asyncio.create_task(
156              adapter._keep_typing(
157                  chat_id="789",
158                  interval=0.3,
159                  stop_event=stop_event,
160              )
161          )
162          await asyncio.sleep(1.0)
163          stop_event.set()
164          await asyncio.wait_for(task, timeout=1.0)
165  
166          assert tick_count["n"] >= 2, (
167              f"loop exited after first send_typing exception; expected it to "
168              f"keep ticking (got {tick_count['n']} ticks)"
169          )
170  
171      @pytest.mark.asyncio
172      async def test_paused_chat_skips_send_typing(self, monkeypatch):
173          """When a chat is in _typing_paused (e.g. awaiting approval), the
174          loop must not call send_typing at all. Regression guard — existing
175          behavior, preserved through the timeout change."""
176          adapter = _StubAdapter()
177          calls = []
178  
179          async def recording_send_typing(chat_id, metadata=None):
180              calls.append(chat_id)
181  
182          monkeypatch.setattr(adapter, "send_typing", recording_send_typing)
183          adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0))
184          adapter._typing_paused.add("paused-chat")
185  
186          stop_event = asyncio.Event()
187          task = asyncio.create_task(
188              adapter._keep_typing(
189                  chat_id="paused-chat",
190                  interval=0.3,
191                  stop_event=stop_event,
192              )
193          )
194          await asyncio.sleep(1.0)
195          stop_event.set()
196          await asyncio.wait_for(task, timeout=1.0)
197  
198          assert calls == [], (
199              f"send_typing was called on a paused chat: {calls}"
200          )