test_keep_typing_timeout.py
1 """Tests for BasePlatformAdapter._keep_typing timeout-per-tick behavior. 2 3 When the gateway is waiting on a long upstream provider response (e.g. 4 Anthropic/opus-4.7 first-token latency climbing during an upstream blip), 5 the model-call socket is blocked on the worker thread but the asyncio loop 6 is still running, and ``_keep_typing`` refreshes the platform typing 7 indicator every 2 seconds. 8 9 The bug: each ``send_typing`` call is an HTTP round-trip to the platform API 10 (Telegram/Discord). If the same network instability that's slowing the model 11 call also makes ``send_typing`` slow (5-30s response time), the refresh loop 12 stalls inside the ``await self.send_typing(...)`` call. Platform-side typing 13 expires at ~5s, so the bubble dies and doesn't come back until that stuck 14 call returns — exactly when the user most needs the "yes, still working" 15 signal. 16 17 The fix: bound each ``send_typing`` with ``asyncio.wait_for``. If a 18 send_typing takes longer than the per-tick budget (default 1.5s when 19 interval=2.0), abandon it and let the next scheduled tick fire a fresh 20 call. As long as any one of them succeeds within the ~5s platform window, 21 the bubble stays visible across provider stalls. 22 """ 23 24 import asyncio 25 from unittest.mock import MagicMock 26 27 import pytest 28 29 from gateway.platforms.base import ( 30 BasePlatformAdapter, 31 Platform, 32 PlatformConfig, 33 SendResult, 34 ) 35 36 37 class _StubAdapter(BasePlatformAdapter): 38 def __init__(self): 39 super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) 40 41 async def connect(self) -> bool: 42 return True 43 44 async def disconnect(self) -> None: 45 self._mark_disconnected() 46 47 async def send(self, chat_id, content, reply_to=None, metadata=None): 48 return SendResult(success=True, message_id="m1") 49 50 async def get_chat_info(self, chat_id): 51 return {"id": chat_id, "type": "dm"} 52 53 54 class TestKeepTypingTimeoutPerTick: 55 @pytest.mark.asyncio 56 async def test_slow_send_typing_does_not_block_cadence(self, monkeypatch): 57 """A send_typing that hangs longer than the per-tick budget must be 58 abandoned so the next scheduled tick can fire a fresh call.""" 59 adapter = _StubAdapter() 60 call_events = [] 61 62 async def slow_send_typing(chat_id, metadata=None): 63 # Simulate a stuck HTTP round-trip. If _keep_typing awaits this 64 # unconditionally, the loop stalls for the full duration. 65 call_events.append("start") 66 try: 67 await asyncio.sleep(10) 68 finally: 69 call_events.append("finish-or-cancel") 70 71 monkeypatch.setattr(adapter, "send_typing", slow_send_typing) 72 # Avoid stop_typing side-effects in the finally block. 73 adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0)) 74 75 stop_event = asyncio.Event() 76 # Start the typing loop, let it run ~3s (should fire 2 ticks) then stop. 77 task = asyncio.create_task( 78 adapter._keep_typing( 79 chat_id="123", 80 interval=1.0, 81 stop_event=stop_event, 82 ) 83 ) 84 await asyncio.sleep(3.0) 85 stop_event.set() 86 try: 87 await asyncio.wait_for(task, timeout=2.0) 88 except asyncio.TimeoutError: 89 task.cancel() 90 pytest.fail( 91 "_keep_typing did not exit within 2s of stop_event.set() — " 92 "it is blocked on a slow send_typing call" 93 ) 94 95 # With per-tick timeout, we should see MULTIPLE send_typing starts 96 # despite each being slow (abandoned via TimeoutError). Without the 97 # fix there would be exactly 1 start (the one still stuck). 98 starts = [e for e in call_events if e == "start"] 99 assert len(starts) >= 2, ( 100 f"expected at least 2 send_typing ticks across 3s of slow " 101 f"operation, got {len(starts)} — refresh cadence is stalled " 102 f"on a slow send_typing" 103 ) 104 105 @pytest.mark.asyncio 106 async def test_fast_send_typing_still_gets_awaited(self, monkeypatch): 107 """When send_typing is fast (normal case), it must still complete 108 normally — the timeout is only an upper bound, not a cap on 109 successful calls.""" 110 adapter = _StubAdapter() 111 completed = [] 112 113 async def fast_send_typing(chat_id, metadata=None): 114 await asyncio.sleep(0.01) # well under the timeout 115 completed.append(chat_id) 116 117 monkeypatch.setattr(adapter, "send_typing", fast_send_typing) 118 adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0)) 119 120 stop_event = asyncio.Event() 121 task = asyncio.create_task( 122 adapter._keep_typing( 123 chat_id="456", 124 interval=0.5, 125 stop_event=stop_event, 126 ) 127 ) 128 await asyncio.sleep(1.2) # ~3 ticks 129 stop_event.set() 130 await asyncio.wait_for(task, timeout=1.0) 131 132 assert len(completed) >= 2, ( 133 f"expected multiple completed send_typing calls, got " 134 f"{len(completed)}" 135 ) 136 assert all(c == "456" for c in completed) 137 138 @pytest.mark.asyncio 139 async def test_send_typing_exception_does_not_kill_loop(self, monkeypatch): 140 """A send_typing that raises (e.g. transient HTTP 500) must be 141 caught so the loop continues refreshing on schedule.""" 142 adapter = _StubAdapter() 143 tick_count = {"n": 0} 144 145 async def flaky_send_typing(chat_id, metadata=None): 146 tick_count["n"] += 1 147 if tick_count["n"] == 1: 148 raise RuntimeError("transient upstream error") 149 # Subsequent calls succeed. 150 151 monkeypatch.setattr(adapter, "send_typing", flaky_send_typing) 152 adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0)) 153 154 stop_event = asyncio.Event() 155 task = asyncio.create_task( 156 adapter._keep_typing( 157 chat_id="789", 158 interval=0.3, 159 stop_event=stop_event, 160 ) 161 ) 162 await asyncio.sleep(1.0) 163 stop_event.set() 164 await asyncio.wait_for(task, timeout=1.0) 165 166 assert tick_count["n"] >= 2, ( 167 f"loop exited after first send_typing exception; expected it to " 168 f"keep ticking (got {tick_count['n']} ticks)" 169 ) 170 171 @pytest.mark.asyncio 172 async def test_paused_chat_skips_send_typing(self, monkeypatch): 173 """When a chat is in _typing_paused (e.g. awaiting approval), the 174 loop must not call send_typing at all. Regression guard — existing 175 behavior, preserved through the timeout change.""" 176 adapter = _StubAdapter() 177 calls = [] 178 179 async def recording_send_typing(chat_id, metadata=None): 180 calls.append(chat_id) 181 182 monkeypatch.setattr(adapter, "send_typing", recording_send_typing) 183 adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0)) 184 adapter._typing_paused.add("paused-chat") 185 186 stop_event = asyncio.Event() 187 task = asyncio.create_task( 188 adapter._keep_typing( 189 chat_id="paused-chat", 190 interval=0.3, 191 stop_event=stop_event, 192 ) 193 ) 194 await asyncio.sleep(1.0) 195 stop_event.set() 196 await asyncio.wait_for(task, timeout=1.0) 197 198 assert calls == [], ( 199 f"send_typing was called on a paused chat: {calls}" 200 )