/ tests / gateway / test_shutdown_cache_cleanup.py
test_shutdown_cache_cleanup.py
  1  """Regression tests for gateway shutdown cleaning up cached agent memory providers (issue #11205).
  2  
  3  When the gateway shuts down, ``stop()`` called ``_finalize_shutdown_agents()``
  4  which only drained agents in ``_running_agents``.  Idle agents sitting in
  5  ``_agent_cache`` (LRU cache) were never cleaned up, so their
  6  ``MemoryProvider.on_session_end()`` hooks never fired.
  7  
  8  The fix adds an explicit sweep of ``_agent_cache`` after
  9  ``_finalize_shutdown_agents`` in the ``_stop_impl`` coroutine.
 10  """
 11  
 12  import asyncio
 13  import threading
 14  from collections import OrderedDict
 15  from unittest.mock import MagicMock, patch
 16  
 17  import pytest
 18  
 19  # Import the module (not the class) to reach stop() and helpers
 20  import gateway.run as gw_mod
 21  
 22  
 23  # ---------------------------------------------------------------------------
 24  # Helpers
 25  # ---------------------------------------------------------------------------
 26  
 27  class _FakeGateway:
 28      """Minimal stand-in with just enough state for ``stop()`` to run."""
 29  
 30      def __init__(self):
 31          self._running = True
 32          self._draining = False
 33          self._restart_requested = False
 34          self._restart_detached = False
 35          self._restart_via_service = False
 36          self._stop_task = None
 37          self._exit_cleanly = False
 38          self._exit_with_failure = False
 39          self._exit_reason = None
 40          self._exit_code = None
 41          self._restart_drain_timeout = 0.01
 42          self._running_agents = {}
 43          self._running_agents_ts = {}
 44          self._agent_cache = OrderedDict()
 45          self._agent_cache_lock = threading.Lock()
 46          self.adapters = {}
 47          self._background_tasks = set()
 48          self._failed_platforms = []
 49          self._shutdown_event = asyncio.Event()
 50          self._pending_messages = {}
 51          self._pending_approvals = {}
 52          self._busy_ack_ts = {}
 53  
 54      def _running_agent_count(self):
 55          return len(self._running_agents)
 56  
 57      def _update_runtime_status(self, *_a, **_kw):
 58          pass
 59  
 60      async def _notify_active_sessions_of_shutdown(self):
 61          pass
 62  
 63      async def _drain_active_agents(self, timeout):
 64          return {}, False
 65  
 66      def _finalize_shutdown_agents(self, agents):
 67          for agent in agents.values():
 68              self._cleanup_agent_resources(agent)
 69  
 70      def _cleanup_agent_resources(self, agent):
 71          if agent is None:
 72              return
 73          try:
 74              if hasattr(agent, "shutdown_memory_provider"):
 75                  agent.shutdown_memory_provider()
 76          except Exception:
 77              pass
 78          try:
 79              if hasattr(agent, "close"):
 80                  agent.close()
 81          except Exception:
 82              pass
 83  
 84      def _evict_cached_agent(self, key):
 85          pass
 86  
 87  
 88  def _make_mock_agent():
 89      a = MagicMock()
 90      a.shutdown_memory_provider = MagicMock()
 91      a.close = MagicMock()
 92      return a
 93  
 94  
 95  # ---------------------------------------------------------------------------
 96  # Tests
 97  # ---------------------------------------------------------------------------
 98  
 99  class TestCachedAgentCleanupOnShutdown:
100      """Verify that ``stop()`` calls ``_cleanup_agent_resources`` on idle
101      cached agents, triggering ``shutdown_memory_provider()`` (which calls
102      ``on_session_end``)."""
103  
104      @pytest.mark.asyncio
105      async def test_cached_agent_memory_provider_shut_down(self):
106          """A cached agent's shutdown_memory_provider is called during gateway stop."""
107          gw = _FakeGateway()
108          agent = _make_mock_agent()
109          gw._agent_cache["session-1"] = (agent, "sig-123")
110  
111          # Call the real stop() from GatewayRunner
112          await gw_mod.GatewayRunner.stop(gw)
113  
114          agent.shutdown_memory_provider.assert_called_once()
115  
116      @pytest.mark.asyncio
117      async def test_cache_cleared_after_shutdown(self):
118          """The _agent_cache dict is cleared after stop."""
119          gw = _FakeGateway()
120          agent = _make_mock_agent()
121          gw._agent_cache["s1"] = (agent, "sig1")
122  
123          await gw_mod.GatewayRunner.stop(gw)
124  
125          assert len(gw._agent_cache) == 0
126  
127      @pytest.mark.asyncio
128      async def test_no_cached_agents_no_error(self):
129          """stop() works fine when _agent_cache is empty."""
130          gw = _FakeGateway()
131  
132          await gw_mod.GatewayRunner.stop(gw)  # Should not raise
133  
134          assert len(gw._agent_cache) == 0
135  
136      @pytest.mark.asyncio
137      async def test_multiple_cached_agents_all_cleaned(self):
138          """All cached agents get cleaned up."""
139          gw = _FakeGateway()
140          agents = []
141          for i in range(5):
142              a = _make_mock_agent()
143              agents.append(a)
144              gw._agent_cache[f"s{i}"] = (a, f"sig{i}")
145  
146          await gw_mod.GatewayRunner.stop(gw)
147  
148          for a in agents:
149              a.shutdown_memory_provider.assert_called_once()
150  
151      @pytest.mark.asyncio
152      async def test_cleanup_survives_agent_exception(self):
153          """An exception from one agent's shutdown doesn't prevent others."""
154          gw = _FakeGateway()
155  
156          bad = _make_mock_agent()
157          bad.shutdown_memory_provider.side_effect = RuntimeError("boom")
158          bad.close.side_effect = RuntimeError("boom")
159  
160          good = _make_mock_agent()
161  
162          gw._agent_cache["bad"] = (bad, "sig-bad")
163          gw._agent_cache["good"] = (good, "sig-good")
164  
165          await gw_mod.GatewayRunner.stop(gw)
166  
167          # The good agent should still be cleaned up
168          good.shutdown_memory_provider.assert_called_once()
169  
170      @pytest.mark.asyncio
171      async def test_plain_agent_not_tuple(self):
172          """Cache entries that aren't tuples (just bare agents) are also cleaned."""
173          gw = _FakeGateway()
174          agent = _make_mock_agent()
175          gw._agent_cache["s1"] = agent  # Not a tuple
176  
177          await gw_mod.GatewayRunner.stop(gw)
178  
179          agent.shutdown_memory_provider.assert_called_once()
180          assert len(gw._agent_cache) == 0
181  
182      @pytest.mark.asyncio
183      async def test_none_entry_skipped(self):
184          """A None cache entry doesn't cause errors."""
185          gw = _FakeGateway()
186          gw._agent_cache["s1"] = None
187  
188          await gw_mod.GatewayRunner.stop(gw)
189  
190          assert len(gw._agent_cache) == 0
191  
192  
193  class TestRunningAgentsNotDoubleCleaned:
194      """Verify behavior when agents appear in both _running_agents and _agent_cache."""
195  
196      @pytest.mark.asyncio
197      async def test_running_and_cached_agent_cleaned_at_least_once(self):
198          """An agent in both _running_agents and _agent_cache gets
199          shutdown_memory_provider called at least once."""
200          gw = _FakeGateway()
201          shared = _make_mock_agent()
202  
203          gw._running_agents["s1"] = shared
204          gw._agent_cache["s1"] = (shared, "sig1")
205  
206          await gw_mod.GatewayRunner.stop(gw)
207  
208          # Called at least once — either from _finalize_shutdown_agents
209          # or from the cache sweep (or both)
210          assert shared.shutdown_memory_provider.call_count >= 1