test_shutdown_cache_cleanup.py
1 """Regression tests for gateway shutdown cleaning up cached agent memory providers (issue #11205). 2 3 When the gateway shuts down, ``stop()`` called ``_finalize_shutdown_agents()`` 4 which only drained agents in ``_running_agents``. Idle agents sitting in 5 ``_agent_cache`` (LRU cache) were never cleaned up, so their 6 ``MemoryProvider.on_session_end()`` hooks never fired. 7 8 The fix adds an explicit sweep of ``_agent_cache`` after 9 ``_finalize_shutdown_agents`` in the ``_stop_impl`` coroutine. 10 """ 11 12 import asyncio 13 import threading 14 from collections import OrderedDict 15 from unittest.mock import MagicMock, patch 16 17 import pytest 18 19 # Import the module (not the class) to reach stop() and helpers 20 import gateway.run as gw_mod 21 22 23 # --------------------------------------------------------------------------- 24 # Helpers 25 # --------------------------------------------------------------------------- 26 27 class _FakeGateway: 28 """Minimal stand-in with just enough state for ``stop()`` to run.""" 29 30 def __init__(self): 31 self._running = True 32 self._draining = False 33 self._restart_requested = False 34 self._restart_detached = False 35 self._restart_via_service = False 36 self._stop_task = None 37 self._exit_cleanly = False 38 self._exit_with_failure = False 39 self._exit_reason = None 40 self._exit_code = None 41 self._restart_drain_timeout = 0.01 42 self._running_agents = {} 43 self._running_agents_ts = {} 44 self._agent_cache = OrderedDict() 45 self._agent_cache_lock = threading.Lock() 46 self.adapters = {} 47 self._background_tasks = set() 48 self._failed_platforms = [] 49 self._shutdown_event = asyncio.Event() 50 self._pending_messages = {} 51 self._pending_approvals = {} 52 self._busy_ack_ts = {} 53 54 def _running_agent_count(self): 55 return len(self._running_agents) 56 57 def _update_runtime_status(self, *_a, **_kw): 58 pass 59 60 async def _notify_active_sessions_of_shutdown(self): 61 pass 62 63 async def _drain_active_agents(self, timeout): 64 return {}, False 65 66 def _finalize_shutdown_agents(self, agents): 67 for agent in agents.values(): 68 self._cleanup_agent_resources(agent) 69 70 def _cleanup_agent_resources(self, agent): 71 if agent is None: 72 return 73 try: 74 if hasattr(agent, "shutdown_memory_provider"): 75 agent.shutdown_memory_provider() 76 except Exception: 77 pass 78 try: 79 if hasattr(agent, "close"): 80 agent.close() 81 except Exception: 82 pass 83 84 def _evict_cached_agent(self, key): 85 pass 86 87 88 def _make_mock_agent(): 89 a = MagicMock() 90 a.shutdown_memory_provider = MagicMock() 91 a.close = MagicMock() 92 return a 93 94 95 # --------------------------------------------------------------------------- 96 # Tests 97 # --------------------------------------------------------------------------- 98 99 class TestCachedAgentCleanupOnShutdown: 100 """Verify that ``stop()`` calls ``_cleanup_agent_resources`` on idle 101 cached agents, triggering ``shutdown_memory_provider()`` (which calls 102 ``on_session_end``).""" 103 104 @pytest.mark.asyncio 105 async def test_cached_agent_memory_provider_shut_down(self): 106 """A cached agent's shutdown_memory_provider is called during gateway stop.""" 107 gw = _FakeGateway() 108 agent = _make_mock_agent() 109 gw._agent_cache["session-1"] = (agent, "sig-123") 110 111 # Call the real stop() from GatewayRunner 112 await gw_mod.GatewayRunner.stop(gw) 113 114 agent.shutdown_memory_provider.assert_called_once() 115 116 @pytest.mark.asyncio 117 async def test_cache_cleared_after_shutdown(self): 118 """The _agent_cache dict is cleared after stop.""" 119 gw = _FakeGateway() 120 agent = _make_mock_agent() 121 gw._agent_cache["s1"] = (agent, "sig1") 122 123 await gw_mod.GatewayRunner.stop(gw) 124 125 assert len(gw._agent_cache) == 0 126 127 @pytest.mark.asyncio 128 async def test_no_cached_agents_no_error(self): 129 """stop() works fine when _agent_cache is empty.""" 130 gw = _FakeGateway() 131 132 await gw_mod.GatewayRunner.stop(gw) # Should not raise 133 134 assert len(gw._agent_cache) == 0 135 136 @pytest.mark.asyncio 137 async def test_multiple_cached_agents_all_cleaned(self): 138 """All cached agents get cleaned up.""" 139 gw = _FakeGateway() 140 agents = [] 141 for i in range(5): 142 a = _make_mock_agent() 143 agents.append(a) 144 gw._agent_cache[f"s{i}"] = (a, f"sig{i}") 145 146 await gw_mod.GatewayRunner.stop(gw) 147 148 for a in agents: 149 a.shutdown_memory_provider.assert_called_once() 150 151 @pytest.mark.asyncio 152 async def test_cleanup_survives_agent_exception(self): 153 """An exception from one agent's shutdown doesn't prevent others.""" 154 gw = _FakeGateway() 155 156 bad = _make_mock_agent() 157 bad.shutdown_memory_provider.side_effect = RuntimeError("boom") 158 bad.close.side_effect = RuntimeError("boom") 159 160 good = _make_mock_agent() 161 162 gw._agent_cache["bad"] = (bad, "sig-bad") 163 gw._agent_cache["good"] = (good, "sig-good") 164 165 await gw_mod.GatewayRunner.stop(gw) 166 167 # The good agent should still be cleaned up 168 good.shutdown_memory_provider.assert_called_once() 169 170 @pytest.mark.asyncio 171 async def test_plain_agent_not_tuple(self): 172 """Cache entries that aren't tuples (just bare agents) are also cleaned.""" 173 gw = _FakeGateway() 174 agent = _make_mock_agent() 175 gw._agent_cache["s1"] = agent # Not a tuple 176 177 await gw_mod.GatewayRunner.stop(gw) 178 179 agent.shutdown_memory_provider.assert_called_once() 180 assert len(gw._agent_cache) == 0 181 182 @pytest.mark.asyncio 183 async def test_none_entry_skipped(self): 184 """A None cache entry doesn't cause errors.""" 185 gw = _FakeGateway() 186 gw._agent_cache["s1"] = None 187 188 await gw_mod.GatewayRunner.stop(gw) 189 190 assert len(gw._agent_cache) == 0 191 192 193 class TestRunningAgentsNotDoubleCleaned: 194 """Verify behavior when agents appear in both _running_agents and _agent_cache.""" 195 196 @pytest.mark.asyncio 197 async def test_running_and_cached_agent_cleaned_at_least_once(self): 198 """An agent in both _running_agents and _agent_cache gets 199 shutdown_memory_provider called at least once.""" 200 gw = _FakeGateway() 201 shared = _make_mock_agent() 202 203 gw._running_agents["s1"] = shared 204 gw._agent_cache["s1"] = (shared, "sig1") 205 206 await gw_mod.GatewayRunner.stop(gw) 207 208 # Called at least once — either from _finalize_shutdown_agents 209 # or from the cache sweep (or both) 210 assert shared.shutdown_memory_provider.call_count >= 1