test_pending_drain_no_recursion.py
1 """Regression test for #17758 — chained pending-message drains must not 2 grow the call stack. 3 4 Before the fix, ``_process_message_background`` finished a turn, found a 5 pending follow-up, and drained it via ``await 6 self._process_message_background(pending_event, session_key)``. Each 7 queued follow-up added a frame to the call stack instead of starting 8 fresh, so under sustained pending-queue activity the C stack would 9 exhaust at ~2000 nested frames and the process would crash with 10 SIGSEGV. 11 12 After the fix, the in-band drain spawns a fresh task (mirroring the 13 late-arrival drain pattern), so the stack stays bounded regardless of 14 chain length. 15 16 We assert the invariant directly: count nested 17 ``_process_message_background`` frames at handler entry across a chain 18 of N follow-ups. Recursion makes depth grow linearly (1, 2, 3, …, N); 19 task spawning keeps it constant (1 every time). 20 """ 21 22 import asyncio 23 import sys 24 from unittest.mock import AsyncMock 25 26 import pytest 27 28 from gateway.config import Platform, PlatformConfig 29 from gateway.platforms.base import ( 30 BasePlatformAdapter, 31 MessageEvent, 32 MessageType, 33 ) 34 from gateway.session import SessionSource, build_session_key 35 36 37 class _StubAdapter(BasePlatformAdapter): 38 async def connect(self): 39 pass 40 41 async def disconnect(self): 42 pass 43 44 async def send(self, chat_id, text, **kwargs): 45 return None 46 47 async def get_chat_info(self, chat_id): 48 return {} 49 50 51 def _make_adapter(): 52 adapter = _StubAdapter(PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM) 53 adapter._send_with_retry = AsyncMock(return_value=None) 54 return adapter 55 56 57 def _make_event(text="hi", chat_id="42"): 58 return MessageEvent( 59 text=text, 60 message_type=MessageType.TEXT, 61 source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"), 62 ) 63 64 65 def _sk(chat_id="42"): 66 return build_session_key( 67 SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm") 68 ) 69 70 71 def _count_pmb_frames() -> int: 72 """Walk the current call stack and count nested 73 ``_process_message_background`` frames. Used to detect recursive 74 in-band drains.""" 75 f = sys._getframe() 76 n = 0 77 while f is not None: 78 if f.f_code.co_name == "_process_message_background": 79 n += 1 80 f = f.f_back 81 return n 82 83 84 @pytest.mark.asyncio 85 async def test_in_band_drain_does_not_grow_stack(): 86 """Issue #17758: chained pending-message drains must not recurse. 87 88 Queue a fresh pending message inside each handler invocation so the 89 in-band drain block fires for every turn in the chain. After N 90 turns, the recorded stack depth at handler entry must stay bounded. 91 Pre-fix, depths would be 1, 2, 3, …, N; post-fix, depths are 1 92 every time because each drain runs in its own task. 93 """ 94 N = 12 95 adapter = _make_adapter() 96 sk = _sk() 97 98 depths: list[int] = [] 99 next_index = [1] 100 101 async def handler(event): 102 depths.append(_count_pmb_frames()) 103 if next_index[0] < N: 104 adapter._pending_messages[sk] = _make_event(text=f"M{next_index[0]}") 105 next_index[0] += 1 106 return "ok" 107 108 adapter._message_handler = handler 109 110 await adapter.handle_message(_make_event(text="M0")) 111 112 # Drain the chain. Each turn schedules the next via the in-band 113 # drain block, so we wait until N handler runs have completed and 114 # the session has been released. 115 for _ in range(400): 116 if len(depths) >= N and sk not in adapter._active_sessions: 117 break 118 await asyncio.sleep(0.01) 119 120 await adapter.cancel_background_tasks() 121 122 assert len(depths) == N, ( 123 f"expected {N} handler runs in the chain, got {len(depths)}: depths={depths!r}" 124 ) 125 max_depth = max(depths) 126 assert max_depth <= 2, ( 127 f"in-band drain is recursing instead of spawning a fresh task — " 128 f"stack depth grew with chain length: {depths!r}" 129 ) 130 131 132 @pytest.mark.asyncio 133 async def test_in_band_drain_preserves_active_session_guard(): 134 """The original task must NOT release ``_active_sessions[session_key]`` 135 after handing off to the drain task. 136 137 When the in-band drain spawns ``drain_task`` and transfers ownership 138 via ``_session_tasks[session_key] = drain_task``, the original task 139 still unwinds through the ``finally`` block. The drain task picks 140 up the same ``interrupt_event`` in its own 141 ``_process_message_background`` entry, so a naive 142 ``_release_session_guard(session_key, guard=interrupt_event)`` in 143 the unwind matches and deletes ``_active_sessions[session_key]``. 144 That briefly reopens the Level-1 guard between the original task's 145 finally and the drain task's first await — a concurrent inbound 146 arriving in that window passes the guard and spawns a second 147 handler for the same session. 148 149 Invariant: ``_active_sessions[sk]`` must hold the SAME interrupt 150 Event identity at every handler entry across an in-band drain 151 chain. Pre-fix, the original task's finally deletes the entry, so 152 the drain task falls through to the ``or asyncio.Event()`` branch 153 in ``_process_message_background`` and installs a *new* Event — 154 the identity diverges. Post-fix, the entry is preserved across 155 handoff and the drain task reuses the original Event. 156 """ 157 adapter = _make_adapter() 158 sk = _sk() 159 160 seen_guards: list = [] 161 162 async def handler(event): 163 seen_guards.append(adapter._active_sessions.get(sk)) 164 if len(seen_guards) == 1: 165 adapter._pending_messages[sk] = _make_event(text="M1") 166 return "ok" 167 168 adapter._message_handler = handler 169 170 await adapter.handle_message(_make_event(text="M0")) 171 172 for _ in range(400): 173 if len(seen_guards) >= 2 and sk not in adapter._active_sessions: 174 break 175 await asyncio.sleep(0.01) 176 177 await adapter.cancel_background_tasks() 178 179 assert len(seen_guards) == 2, f"expected 2 handler runs, got {len(seen_guards)}" 180 assert seen_guards[0] is not None, "M0 saw no active-session guard" 181 assert seen_guards[1] is not None, "M1 saw no active-session guard" 182 assert seen_guards[0] is seen_guards[1], ( 183 "in-band drain handoff replaced the active-session guard — the " 184 "original task's finally deleted _active_sessions[sk] and the " 185 "drain task installed a new Event. Concurrent inbounds during " 186 "the handoff window would bypass the Level-1 guard and spawn a " 187 "second handler for the same session." 188 ) 189 190 191 # --------------------------------------------------------------------------- 192 # Follow-up guardrails (belt-and-suspenders on top of the #17758 fix). 193 # 194 # The in-band drain hand-off changed cleanup semantics in three subtle ways 195 # that the original fix reasoned about but didn't test directly. These 196 # tests pin each invariant so future refactors can't silently regress them. 197 # --------------------------------------------------------------------------- 198 199 200 @pytest.mark.asyncio 201 async def test_normal_path_releases_session_guard(): 202 """The common path — one message, nothing queued — must still 203 fully release ``_active_sessions[sk]`` and ``_session_tasks[sk]`` 204 through the end-of-finally block. 205 206 The #17758 fix moved ``_release_session_guard(...)`` under an 207 ``if current_task is self._session_tasks.get(session_key)`` 208 conditional. For the 99%-common case (no pending message, no 209 handoff) ``current_task`` IS the stored task, so the guard must 210 still fire. This test would fail if the conditional were ever 211 tightened in a way that dropped the normal path.""" 212 adapter = _make_adapter() 213 sk = _sk() 214 215 async def handler(event): 216 return "ok" 217 218 adapter._message_handler = handler 219 220 await adapter.handle_message(_make_event(text="solo")) 221 222 # Wait for the single-shot handler to fully unwind. 223 for _ in range(200): 224 if sk not in adapter._active_sessions and sk not in adapter._session_tasks: 225 break 226 await asyncio.sleep(0.01) 227 228 await adapter.cancel_background_tasks() 229 230 assert sk not in adapter._active_sessions, ( 231 "normal-path unwind left _active_sessions[sk] populated — future " 232 "messages would take the busy-handler path forever" 233 ) 234 assert sk not in adapter._session_tasks, ( 235 "normal-path unwind left _session_tasks[sk] populated — " 236 "stale-lock detection will treat a dead task as alive" 237 ) 238 239 240 @pytest.mark.asyncio 241 async def test_drain_task_cancellation_releases_session(): 242 """If the in-band drain task is cancelled (e.g. user sent ``/stop`` 243 mid-drain), the session guard and task registry must still get 244 cleaned up — the cancelled drain task's own ``finally`` runs and 245 fires ``_release_session_guard``. 246 247 The #17758 fix transfers ownership of ``_session_tasks[sk]`` to 248 the drain task; the drain task's ``except asyncio.CancelledError`` 249 branch must then own the cleanup. Without this test a future 250 refactor could move cancellation handling in a way that leaves 251 the session permanently pinned as busy after a cancel.""" 252 adapter = _make_adapter() 253 sk = _sk() 254 255 turn_started = asyncio.Event() 256 drain_hit_handler = asyncio.Event() 257 258 async def handler(event): 259 if event.text == "M0": 260 # Queue a pending follow-up so an in-band drain task gets spawned. 261 adapter._pending_messages[sk] = _make_event(text="M1") 262 turn_started.set() 263 return "ok" 264 # M1 is the drained follow-up — hang so we can cancel the drain task. 265 drain_hit_handler.set() 266 try: 267 await asyncio.sleep(10) 268 except asyncio.CancelledError: 269 raise 270 271 adapter._message_handler = handler 272 273 await adapter.handle_message(_make_event(text="M0")) 274 275 # Wait for the drain task to actually start running M1. 276 await asyncio.wait_for(drain_hit_handler.wait(), timeout=2) 277 278 # Cancel the drain task mid-handler. 279 drain_task = adapter._session_tasks.get(sk) 280 assert drain_task is not None, "in-band drain did not install a drain task" 281 assert not drain_task.done(), "drain task finished before we could cancel" 282 drain_task.cancel() 283 284 # Drain task's finally must release both registries. 285 for _ in range(200): 286 if sk not in adapter._active_sessions and sk not in adapter._session_tasks: 287 break 288 await asyncio.sleep(0.01) 289 290 await adapter.cancel_background_tasks() 291 292 assert sk not in adapter._active_sessions, ( 293 "cancelled drain task did not release _active_sessions[sk] — " 294 "the session stays permanently pinned as busy after a /stop mid-drain" 295 ) 296 assert sk not in adapter._session_tasks, ( 297 "cancelled drain task did not release _session_tasks[sk] — " 298 "stale-lock detection will treat the dead task as alive" 299 ) 300 301 302 @pytest.mark.asyncio 303 async def test_late_arrival_drain_still_fires_when_no_in_band_drain(): 304 """The late-arrival drain in ``finally`` must still spawn a fresh 305 task when no in-band drain preceded it. 306 307 Pre-#17758 this path already existed; the #17758 follow-up guard 308 only re-queues when ``_session_tasks[sk] is not current_task``. 309 For a late-arrival with no in-band drain, ``_session_tasks[sk]`` 310 IS the current task, so the ``else`` branch must fire and spawn 311 a drain task for the queued message. 312 313 Queue a pending message *after* M0's handler returns (so the 314 in-band drain block sees nothing) but *before* ``finally`` runs 315 the late-arrival check — we do this by hooking ``_stop_typing``, 316 which runs in finally before the late-arrival check.""" 317 adapter = _make_adapter() 318 sk = _sk() 319 320 results: list[str] = [] 321 original_stop_typing = getattr(adapter, "stop_typing", None) 322 323 async def injecting_stop_typing(chat_id): 324 # Simulate a message landing during the cleanup awaits. 325 adapter._pending_messages[sk] = _make_event(text="late") 326 if original_stop_typing: 327 await original_stop_typing(chat_id) 328 329 adapter.stop_typing = injecting_stop_typing 330 331 async def handler(event): 332 results.append(event.text) 333 return "ok" 334 335 adapter._message_handler = handler 336 337 await adapter.handle_message(_make_event(text="first")) 338 339 # Wait for the late-arrival drain task to finish the second event. 340 for _ in range(400): 341 if "late" in results and sk not in adapter._active_sessions: 342 break 343 await asyncio.sleep(0.01) 344 345 await adapter.cancel_background_tasks() 346 347 assert "first" in results, "original message handler did not run" 348 assert "late" in results, ( 349 "late-arrival drain did not spawn a drain task — a message that " 350 "landed during cleanup awaits was silently dropped" 351 )