/ tests / gateway / test_pending_drain_no_recursion.py
test_pending_drain_no_recursion.py
  1  """Regression test for #17758 — chained pending-message drains must not
  2  grow the call stack.
  3  
  4  Before the fix, ``_process_message_background`` finished a turn, found a
  5  pending follow-up, and drained it via ``await
  6  self._process_message_background(pending_event, session_key)``.  Each
  7  queued follow-up added a frame to the call stack instead of starting
  8  fresh, so under sustained pending-queue activity the C stack would
  9  exhaust at ~2000 nested frames and the process would crash with
 10  SIGSEGV.
 11  
 12  After the fix, the in-band drain spawns a fresh task (mirroring the
 13  late-arrival drain pattern), so the stack stays bounded regardless of
 14  chain length.
 15  
 16  We assert the invariant directly: count nested
 17  ``_process_message_background`` frames at handler entry across a chain
 18  of N follow-ups.  Recursion makes depth grow linearly (1, 2, 3, …, N);
 19  task spawning keeps it constant (1 every time).
 20  """
 21  
 22  import asyncio
 23  import sys
 24  from unittest.mock import AsyncMock
 25  
 26  import pytest
 27  
 28  from gateway.config import Platform, PlatformConfig
 29  from gateway.platforms.base import (
 30      BasePlatformAdapter,
 31      MessageEvent,
 32      MessageType,
 33  )
 34  from gateway.session import SessionSource, build_session_key
 35  
 36  
 37  class _StubAdapter(BasePlatformAdapter):
 38      async def connect(self):
 39          pass
 40  
 41      async def disconnect(self):
 42          pass
 43  
 44      async def send(self, chat_id, text, **kwargs):
 45          return None
 46  
 47      async def get_chat_info(self, chat_id):
 48          return {}
 49  
 50  
 51  def _make_adapter():
 52      adapter = _StubAdapter(PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM)
 53      adapter._send_with_retry = AsyncMock(return_value=None)
 54      return adapter
 55  
 56  
 57  def _make_event(text="hi", chat_id="42"):
 58      return MessageEvent(
 59          text=text,
 60          message_type=MessageType.TEXT,
 61          source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"),
 62      )
 63  
 64  
 65  def _sk(chat_id="42"):
 66      return build_session_key(
 67          SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm")
 68      )
 69  
 70  
 71  def _count_pmb_frames() -> int:
 72      """Walk the current call stack and count nested
 73      ``_process_message_background`` frames.  Used to detect recursive
 74      in-band drains."""
 75      f = sys._getframe()
 76      n = 0
 77      while f is not None:
 78          if f.f_code.co_name == "_process_message_background":
 79              n += 1
 80          f = f.f_back
 81      return n
 82  
 83  
 84  @pytest.mark.asyncio
 85  async def test_in_band_drain_does_not_grow_stack():
 86      """Issue #17758: chained pending-message drains must not recurse.
 87  
 88      Queue a fresh pending message inside each handler invocation so the
 89      in-band drain block fires for every turn in the chain.  After N
 90      turns, the recorded stack depth at handler entry must stay bounded.
 91      Pre-fix, depths would be 1, 2, 3, …, N; post-fix, depths are 1
 92      every time because each drain runs in its own task.
 93      """
 94      N = 12
 95      adapter = _make_adapter()
 96      sk = _sk()
 97  
 98      depths: list[int] = []
 99      next_index = [1]
100  
101      async def handler(event):
102          depths.append(_count_pmb_frames())
103          if next_index[0] < N:
104              adapter._pending_messages[sk] = _make_event(text=f"M{next_index[0]}")
105              next_index[0] += 1
106          return "ok"
107  
108      adapter._message_handler = handler
109  
110      await adapter.handle_message(_make_event(text="M0"))
111  
112      # Drain the chain.  Each turn schedules the next via the in-band
113      # drain block, so we wait until N handler runs have completed and
114      # the session has been released.
115      for _ in range(400):
116          if len(depths) >= N and sk not in adapter._active_sessions:
117              break
118          await asyncio.sleep(0.01)
119  
120      await adapter.cancel_background_tasks()
121  
122      assert len(depths) == N, (
123          f"expected {N} handler runs in the chain, got {len(depths)}: depths={depths!r}"
124      )
125      max_depth = max(depths)
126      assert max_depth <= 2, (
127          f"in-band drain is recursing instead of spawning a fresh task — "
128          f"stack depth grew with chain length: {depths!r}"
129      )
130  
131  
132  @pytest.mark.asyncio
133  async def test_in_band_drain_preserves_active_session_guard():
134      """The original task must NOT release ``_active_sessions[session_key]``
135      after handing off to the drain task.
136  
137      When the in-band drain spawns ``drain_task`` and transfers ownership
138      via ``_session_tasks[session_key] = drain_task``, the original task
139      still unwinds through the ``finally`` block.  The drain task picks
140      up the same ``interrupt_event`` in its own
141      ``_process_message_background`` entry, so a naive
142      ``_release_session_guard(session_key, guard=interrupt_event)`` in
143      the unwind matches and deletes ``_active_sessions[session_key]``.
144      That briefly reopens the Level-1 guard between the original task's
145      finally and the drain task's first await — a concurrent inbound
146      arriving in that window passes the guard and spawns a second
147      handler for the same session.
148  
149      Invariant: ``_active_sessions[sk]`` must hold the SAME interrupt
150      Event identity at every handler entry across an in-band drain
151      chain.  Pre-fix, the original task's finally deletes the entry, so
152      the drain task falls through to the ``or asyncio.Event()`` branch
153      in ``_process_message_background`` and installs a *new* Event —
154      the identity diverges.  Post-fix, the entry is preserved across
155      handoff and the drain task reuses the original Event.
156      """
157      adapter = _make_adapter()
158      sk = _sk()
159  
160      seen_guards: list = []
161  
162      async def handler(event):
163          seen_guards.append(adapter._active_sessions.get(sk))
164          if len(seen_guards) == 1:
165              adapter._pending_messages[sk] = _make_event(text="M1")
166          return "ok"
167  
168      adapter._message_handler = handler
169  
170      await adapter.handle_message(_make_event(text="M0"))
171  
172      for _ in range(400):
173          if len(seen_guards) >= 2 and sk not in adapter._active_sessions:
174              break
175          await asyncio.sleep(0.01)
176  
177      await adapter.cancel_background_tasks()
178  
179      assert len(seen_guards) == 2, f"expected 2 handler runs, got {len(seen_guards)}"
180      assert seen_guards[0] is not None, "M0 saw no active-session guard"
181      assert seen_guards[1] is not None, "M1 saw no active-session guard"
182      assert seen_guards[0] is seen_guards[1], (
183          "in-band drain handoff replaced the active-session guard — the "
184          "original task's finally deleted _active_sessions[sk] and the "
185          "drain task installed a new Event.  Concurrent inbounds during "
186          "the handoff window would bypass the Level-1 guard and spawn a "
187          "second handler for the same session."
188      )
189  
190  
191  # ---------------------------------------------------------------------------
192  # Follow-up guardrails (belt-and-suspenders on top of the #17758 fix).
193  #
194  # The in-band drain hand-off changed cleanup semantics in three subtle ways
195  # that the original fix reasoned about but didn't test directly.  These
196  # tests pin each invariant so future refactors can't silently regress them.
197  # ---------------------------------------------------------------------------
198  
199  
200  @pytest.mark.asyncio
201  async def test_normal_path_releases_session_guard():
202      """The common path — one message, nothing queued — must still
203      fully release ``_active_sessions[sk]`` and ``_session_tasks[sk]``
204      through the end-of-finally block.
205  
206      The #17758 fix moved ``_release_session_guard(...)`` under an
207      ``if current_task is self._session_tasks.get(session_key)``
208      conditional.  For the 99%-common case (no pending message, no
209      handoff) ``current_task`` IS the stored task, so the guard must
210      still fire.  This test would fail if the conditional were ever
211      tightened in a way that dropped the normal path."""
212      adapter = _make_adapter()
213      sk = _sk()
214  
215      async def handler(event):
216          return "ok"
217  
218      adapter._message_handler = handler
219  
220      await adapter.handle_message(_make_event(text="solo"))
221  
222      # Wait for the single-shot handler to fully unwind.
223      for _ in range(200):
224          if sk not in adapter._active_sessions and sk not in adapter._session_tasks:
225              break
226          await asyncio.sleep(0.01)
227  
228      await adapter.cancel_background_tasks()
229  
230      assert sk not in adapter._active_sessions, (
231          "normal-path unwind left _active_sessions[sk] populated — future "
232          "messages would take the busy-handler path forever"
233      )
234      assert sk not in adapter._session_tasks, (
235          "normal-path unwind left _session_tasks[sk] populated — "
236          "stale-lock detection will treat a dead task as alive"
237      )
238  
239  
240  @pytest.mark.asyncio
241  async def test_drain_task_cancellation_releases_session():
242      """If the in-band drain task is cancelled (e.g. user sent ``/stop``
243      mid-drain), the session guard and task registry must still get
244      cleaned up — the cancelled drain task's own ``finally`` runs and
245      fires ``_release_session_guard``.
246  
247      The #17758 fix transfers ownership of ``_session_tasks[sk]`` to
248      the drain task; the drain task's ``except asyncio.CancelledError``
249      branch must then own the cleanup.  Without this test a future
250      refactor could move cancellation handling in a way that leaves
251      the session permanently pinned as busy after a cancel."""
252      adapter = _make_adapter()
253      sk = _sk()
254  
255      turn_started = asyncio.Event()
256      drain_hit_handler = asyncio.Event()
257  
258      async def handler(event):
259          if event.text == "M0":
260              # Queue a pending follow-up so an in-band drain task gets spawned.
261              adapter._pending_messages[sk] = _make_event(text="M1")
262              turn_started.set()
263              return "ok"
264          # M1 is the drained follow-up — hang so we can cancel the drain task.
265          drain_hit_handler.set()
266          try:
267              await asyncio.sleep(10)
268          except asyncio.CancelledError:
269              raise
270  
271      adapter._message_handler = handler
272  
273      await adapter.handle_message(_make_event(text="M0"))
274  
275      # Wait for the drain task to actually start running M1.
276      await asyncio.wait_for(drain_hit_handler.wait(), timeout=2)
277  
278      # Cancel the drain task mid-handler.
279      drain_task = adapter._session_tasks.get(sk)
280      assert drain_task is not None, "in-band drain did not install a drain task"
281      assert not drain_task.done(), "drain task finished before we could cancel"
282      drain_task.cancel()
283  
284      # Drain task's finally must release both registries.
285      for _ in range(200):
286          if sk not in adapter._active_sessions and sk not in adapter._session_tasks:
287              break
288          await asyncio.sleep(0.01)
289  
290      await adapter.cancel_background_tasks()
291  
292      assert sk not in adapter._active_sessions, (
293          "cancelled drain task did not release _active_sessions[sk] — "
294          "the session stays permanently pinned as busy after a /stop mid-drain"
295      )
296      assert sk not in adapter._session_tasks, (
297          "cancelled drain task did not release _session_tasks[sk] — "
298          "stale-lock detection will treat the dead task as alive"
299      )
300  
301  
302  @pytest.mark.asyncio
303  async def test_late_arrival_drain_still_fires_when_no_in_band_drain():
304      """The late-arrival drain in ``finally`` must still spawn a fresh
305      task when no in-band drain preceded it.
306  
307      Pre-#17758 this path already existed; the #17758 follow-up guard
308      only re-queues when ``_session_tasks[sk] is not current_task``.
309      For a late-arrival with no in-band drain, ``_session_tasks[sk]``
310      IS the current task, so the ``else`` branch must fire and spawn
311      a drain task for the queued message.
312  
313      Queue a pending message *after* M0's handler returns (so the
314      in-band drain block sees nothing) but *before* ``finally`` runs
315      the late-arrival check — we do this by hooking ``_stop_typing``,
316      which runs in finally before the late-arrival check."""
317      adapter = _make_adapter()
318      sk = _sk()
319  
320      results: list[str] = []
321      original_stop_typing = getattr(adapter, "stop_typing", None)
322  
323      async def injecting_stop_typing(chat_id):
324          # Simulate a message landing during the cleanup awaits.
325          adapter._pending_messages[sk] = _make_event(text="late")
326          if original_stop_typing:
327              await original_stop_typing(chat_id)
328  
329      adapter.stop_typing = injecting_stop_typing
330  
331      async def handler(event):
332          results.append(event.text)
333          return "ok"
334  
335      adapter._message_handler = handler
336  
337      await adapter.handle_message(_make_event(text="first"))
338  
339      # Wait for the late-arrival drain task to finish the second event.
340      for _ in range(400):
341          if "late" in results and sk not in adapter._active_sessions:
342              break
343          await asyncio.sleep(0.01)
344  
345      await adapter.cancel_background_tasks()
346  
347      assert "first" in results, "original message handler did not run"
348      assert "late" in results, (
349          "late-arrival drain did not spawn a drain task — a message that "
350          "landed during cleanup awaits was silently dropped"
351      )