test_agent_cache.py
1 """Integration tests for gateway AIAgent caching. 2 3 Verifies that the agent cache correctly: 4 - Reuses agents across messages (same config → same instance) 5 - Rebuilds agents when config changes (model, provider, toolsets) 6 - Updates reasoning_config in-place without rebuilding 7 - Evicts on session reset 8 - Evicts on fallback activation 9 - Preserves frozen system prompt across turns 10 """ 11 12 import hashlib 13 import json 14 import threading 15 from unittest.mock import MagicMock, patch 16 17 import pytest 18 19 20 def _make_runner(): 21 """Create a minimal GatewayRunner with just the cache infrastructure.""" 22 from gateway.run import GatewayRunner 23 24 runner = GatewayRunner.__new__(GatewayRunner) 25 runner._agent_cache = {} 26 runner._agent_cache_lock = threading.Lock() 27 return runner 28 29 30 class TestAgentConfigSignature: 31 """Config signature produces stable, distinct keys.""" 32 33 def test_same_config_same_signature(self): 34 from gateway.run import GatewayRunner 35 36 runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", 37 "provider": "openrouter", "api_mode": "chat_completions"} 38 sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") 39 sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") 40 assert sig1 == sig2 41 42 def test_model_change_different_signature(self): 43 from gateway.run import GatewayRunner 44 45 runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", 46 "provider": "openrouter"} 47 sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") 48 sig2 = GatewayRunner._agent_config_signature("claude-opus-4.6", runtime, ["hermes-telegram"], "") 49 assert sig1 != sig2 50 51 def test_same_token_prefix_different_full_token_changes_signature(self): 52 """Tokens sharing a JWT-style prefix must not collide.""" 53 from gateway.run import GatewayRunner 54 55 rt1 = { 56 "api_key": "eyJhbGci.token-for-account-a", 57 "base_url": "https://chatgpt.com/backend-api/codex", 58 "provider": "openai-codex", 59 "api_mode": "codex_responses", 60 } 61 rt2 = { 62 "api_key": "eyJhbGci.token-for-account-b", 63 "base_url": "https://chatgpt.com/backend-api/codex", 64 "provider": "openai-codex", 65 "api_mode": "codex_responses", 66 } 67 68 assert rt1["api_key"][:8] == rt2["api_key"][:8] 69 sig1 = GatewayRunner._agent_config_signature("gpt-5.3-codex", rt1, ["hermes-telegram"], "") 70 sig2 = GatewayRunner._agent_config_signature("gpt-5.3-codex", rt2, ["hermes-telegram"], "") 71 assert sig1 != sig2 72 73 def test_provider_change_different_signature(self): 74 from gateway.run import GatewayRunner 75 76 rt1 = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"} 77 rt2 = {"api_key": "sk-test12345678", "base_url": "https://api.anthropic.com", "provider": "anthropic"} 78 sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", rt1, ["hermes-telegram"], "") 79 sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", rt2, ["hermes-telegram"], "") 80 assert sig1 != sig2 81 82 def test_toolset_change_different_signature(self): 83 from gateway.run import GatewayRunner 84 85 runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"} 86 sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") 87 sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-discord"], "") 88 assert sig1 != sig2 89 90 def test_reasoning_not_in_signature(self): 91 """Reasoning config is set per-message, not part of the signature.""" 92 from gateway.run import GatewayRunner 93 94 runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"} 95 # Same config — signature should be identical regardless of what 96 # reasoning_config the caller might have (it's not passed in) 97 sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") 98 sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") 99 assert sig1 == sig2 100 101 # --------------------------------------------------------------- 102 # cache_keys (compression/context config cache-busting) 103 # --------------------------------------------------------------- 104 105 def test_cache_keys_default_omitted_matches_empty(self): 106 """Omitted cache_keys must produce the same signature as empty {}.""" 107 from gateway.run import GatewayRunner 108 109 runtime = {"api_key": "k", "base_url": "u", "provider": "p"} 110 sig_omitted = GatewayRunner._agent_config_signature("m", runtime, [], "") 111 sig_empty = GatewayRunner._agent_config_signature("m", runtime, [], "", cache_keys={}) 112 sig_none = GatewayRunner._agent_config_signature("m", runtime, [], "", cache_keys=None) 113 assert sig_omitted == sig_empty == sig_none 114 115 def test_context_length_change_busts_cache(self): 116 """Editing model.context_length in config must produce a new signature.""" 117 from gateway.run import GatewayRunner 118 119 runtime = {"api_key": "k", "base_url": "u", "provider": "p"} 120 sig1 = GatewayRunner._agent_config_signature( 121 "m", runtime, [], "", 122 cache_keys={"model.context_length": 200_000}, 123 ) 124 sig2 = GatewayRunner._agent_config_signature( 125 "m", runtime, [], "", 126 cache_keys={"model.context_length": 400_000}, 127 ) 128 assert sig1 != sig2 129 130 def test_compression_threshold_change_busts_cache(self): 131 from gateway.run import GatewayRunner 132 133 runtime = {"api_key": "k", "base_url": "u", "provider": "p"} 134 sig1 = GatewayRunner._agent_config_signature( 135 "m", runtime, [], "", 136 cache_keys={"compression.threshold": 0.50}, 137 ) 138 sig2 = GatewayRunner._agent_config_signature( 139 "m", runtime, [], "", 140 cache_keys={"compression.threshold": 0.75}, 141 ) 142 assert sig1 != sig2 143 144 def test_compression_enabled_toggle_busts_cache(self): 145 from gateway.run import GatewayRunner 146 147 runtime = {"api_key": "k", "base_url": "u", "provider": "p"} 148 sig_on = GatewayRunner._agent_config_signature( 149 "m", runtime, [], "", 150 cache_keys={"compression.enabled": True}, 151 ) 152 sig_off = GatewayRunner._agent_config_signature( 153 "m", runtime, [], "", 154 cache_keys={"compression.enabled": False}, 155 ) 156 assert sig_on != sig_off 157 158 def test_cache_keys_key_order_does_not_matter(self): 159 """Signature must be stable regardless of dict key insertion order.""" 160 from gateway.run import GatewayRunner 161 162 runtime = {"api_key": "k", "base_url": "u", "provider": "p"} 163 sig_a = GatewayRunner._agent_config_signature( 164 "m", runtime, [], "", 165 cache_keys={"model.context_length": 200_000, "compression.threshold": 0.5}, 166 ) 167 sig_b = GatewayRunner._agent_config_signature( 168 "m", runtime, [], "", 169 cache_keys={"compression.threshold": 0.5, "model.context_length": 200_000}, 170 ) 171 assert sig_a == sig_b 172 173 def test_tool_registry_generation_change_busts_cache(self): 174 """MCP reloads mutate the tool registry, so cached agents must rebuild.""" 175 from gateway.run import GatewayRunner 176 177 runtime = {"api_key": "k", "base_url": "u", "provider": "p"} 178 sig_before = GatewayRunner._agent_config_signature( 179 "m", runtime, ["telegram"], "", 180 cache_keys={"tools.registry_generation": 10}, 181 ) 182 sig_after = GatewayRunner._agent_config_signature( 183 "m", runtime, ["telegram"], "", 184 cache_keys={"tools.registry_generation": 11}, 185 ) 186 187 assert sig_before != sig_after 188 189 190 class TestExtractCacheBustingConfig: 191 """Verify _extract_cache_busting_config pulls the documented subset of 192 config values that must invalidate the cached agent on change.""" 193 194 def test_reads_model_context_length(self): 195 from gateway.run import GatewayRunner 196 197 out = GatewayRunner._extract_cache_busting_config( 198 {"model": {"context_length": 272_000, "provider": "openrouter"}} 199 ) 200 assert out["model.context_length"] == 272_000 201 202 def test_reads_compression_subkeys(self): 203 from gateway.run import GatewayRunner 204 205 out = GatewayRunner._extract_cache_busting_config( 206 { 207 "compression": { 208 "enabled": False, 209 "threshold": 0.6, 210 "target_ratio": 0.3, 211 "protect_last_n": 25, 212 "some_other_key": "ignored", 213 } 214 } 215 ) 216 assert out["compression.enabled"] is False 217 assert out["compression.threshold"] == 0.6 218 assert out["compression.target_ratio"] == 0.3 219 assert out["compression.protect_last_n"] == 25 220 221 def test_missing_keys_yield_none(self): 222 """Absent config keys must produce None values (still contribute to signature).""" 223 from gateway.run import GatewayRunner 224 225 out = GatewayRunner._extract_cache_busting_config({}) 226 # Every documented cache-busting key must be present, even if None 227 for section, key in GatewayRunner._CACHE_BUSTING_CONFIG_KEYS: 228 assert f"{section}.{key}" in out 229 assert out[f"{section}.{key}"] is None 230 231 def test_non_dict_section_treated_as_missing(self): 232 from gateway.run import GatewayRunner 233 234 # compression is a string — should not crash, all compression.* keys None 235 out = GatewayRunner._extract_cache_busting_config( 236 {"compression": "broken", "model": {"context_length": 100_000}} 237 ) 238 assert out["compression.enabled"] is None 239 assert out["compression.threshold"] is None 240 assert out["model.context_length"] == 100_000 241 242 def test_none_config_is_safe(self): 243 from gateway.run import GatewayRunner 244 245 out = GatewayRunner._extract_cache_busting_config(None) 246 for section, key in GatewayRunner._CACHE_BUSTING_CONFIG_KEYS: 247 assert out[f"{section}.{key}"] is None 248 assert "tools.registry_generation" in out 249 250 def test_extract_includes_live_tool_registry_generation(self, monkeypatch): 251 from gateway.run import GatewayRunner 252 from tools.registry import registry 253 254 monkeypatch.setattr(registry, "_generation", 12345) 255 256 out = GatewayRunner._extract_cache_busting_config({}) 257 258 assert out["tools.registry_generation"] == 12345 259 260 def test_full_round_trip_busts_cache_on_real_edit(self): 261 """End-to-end: simulate a config edit on main and verify the 262 extracted cache_keys change produces a new signature.""" 263 from gateway.run import GatewayRunner 264 265 runtime = {"api_key": "k", "base_url": "u", "provider": "p"} 266 cfg_before = { 267 "model": {"context_length": 200_000}, 268 "compression": {"threshold": 0.50, "enabled": True}, 269 } 270 cfg_after = { 271 "model": {"context_length": 200_000}, 272 "compression": {"threshold": 0.75, "enabled": True}, # user raised threshold 273 } 274 275 sig_before = GatewayRunner._agent_config_signature( 276 "m", runtime, [], "", 277 cache_keys=GatewayRunner._extract_cache_busting_config(cfg_before), 278 ) 279 sig_after = GatewayRunner._agent_config_signature( 280 "m", runtime, [], "", 281 cache_keys=GatewayRunner._extract_cache_busting_config(cfg_after), 282 ) 283 assert sig_before != sig_after, ( 284 "Editing compression.threshold in config.yaml must bust the " 285 "gateway's cached agent so the new threshold takes effect." 286 ) 287 288 289 class TestAgentCacheLifecycle: 290 """End-to-end cache behavior with real AIAgent construction.""" 291 292 def test_cache_hit_returns_same_agent(self): 293 """Second message with same config reuses the cached agent instance.""" 294 from run_agent import AIAgent 295 296 runner = _make_runner() 297 session_key = "telegram:12345" 298 runtime = {"api_key": "test", "base_url": "https://openrouter.ai/api/v1", 299 "provider": "openrouter", "api_mode": "chat_completions"} 300 sig = runner._agent_config_signature("anthropic/claude-sonnet-4", runtime, ["hermes-telegram"], "") 301 302 # First message — create and cache 303 agent1 = AIAgent( 304 model="anthropic/claude-sonnet-4", api_key="test", 305 base_url="https://openrouter.ai/api/v1", provider="openrouter", 306 max_iterations=5, quiet_mode=True, skip_context_files=True, 307 skip_memory=True, platform="telegram", 308 ) 309 with runner._agent_cache_lock: 310 runner._agent_cache[session_key] = (agent1, sig) 311 312 # Second message — cache hit 313 with runner._agent_cache_lock: 314 cached = runner._agent_cache.get(session_key) 315 assert cached is not None 316 assert cached[1] == sig 317 assert cached[0] is agent1 # same instance 318 319 def test_cache_miss_on_model_change(self): 320 """Model change produces different signature → cache miss.""" 321 from run_agent import AIAgent 322 323 runner = _make_runner() 324 session_key = "telegram:12345" 325 runtime = {"api_key": "test", "base_url": "https://openrouter.ai/api/v1", 326 "provider": "openrouter", "api_mode": "chat_completions"} 327 328 old_sig = runner._agent_config_signature("anthropic/claude-sonnet-4", runtime, ["hermes-telegram"], "") 329 agent1 = AIAgent( 330 model="anthropic/claude-sonnet-4", api_key="test", 331 base_url="https://openrouter.ai/api/v1", provider="openrouter", 332 max_iterations=5, quiet_mode=True, skip_context_files=True, 333 skip_memory=True, platform="telegram", 334 ) 335 with runner._agent_cache_lock: 336 runner._agent_cache[session_key] = (agent1, old_sig) 337 338 # New model → different signature 339 new_sig = runner._agent_config_signature("anthropic/claude-opus-4.6", runtime, ["hermes-telegram"], "") 340 assert new_sig != old_sig 341 342 with runner._agent_cache_lock: 343 cached = runner._agent_cache.get(session_key) 344 assert cached[1] != new_sig # signature mismatch → would create new agent 345 346 def test_evict_on_session_reset(self): 347 """_evict_cached_agent removes the entry.""" 348 from run_agent import AIAgent 349 350 runner = _make_runner() 351 session_key = "telegram:12345" 352 353 agent = AIAgent( 354 model="anthropic/claude-sonnet-4", api_key="test", 355 base_url="https://openrouter.ai/api/v1", provider="openrouter", 356 max_iterations=5, quiet_mode=True, skip_context_files=True, 357 skip_memory=True, 358 ) 359 with runner._agent_cache_lock: 360 runner._agent_cache[session_key] = (agent, "sig123") 361 362 runner._evict_cached_agent(session_key) 363 364 with runner._agent_cache_lock: 365 assert session_key not in runner._agent_cache 366 367 def test_evict_does_not_affect_other_sessions(self): 368 """Evicting one session leaves other sessions cached.""" 369 runner = _make_runner() 370 with runner._agent_cache_lock: 371 runner._agent_cache["session-A"] = ("agent-A", "sig-A") 372 runner._agent_cache["session-B"] = ("agent-B", "sig-B") 373 374 runner._evict_cached_agent("session-A") 375 376 with runner._agent_cache_lock: 377 assert "session-A" not in runner._agent_cache 378 assert "session-B" in runner._agent_cache 379 380 def test_reasoning_config_updates_in_place(self): 381 """Reasoning config can be set on a cached agent without eviction.""" 382 from run_agent import AIAgent 383 384 agent = AIAgent( 385 model="anthropic/claude-sonnet-4", api_key="test", 386 base_url="https://openrouter.ai/api/v1", provider="openrouter", 387 max_iterations=5, quiet_mode=True, skip_context_files=True, 388 skip_memory=True, 389 reasoning_config={"enabled": True, "effort": "medium"}, 390 ) 391 392 # Simulate per-message reasoning update 393 agent.reasoning_config = {"enabled": True, "effort": "high"} 394 assert agent.reasoning_config["effort"] == "high" 395 396 # System prompt should not be affected by reasoning change 397 prompt1 = agent._build_system_prompt() 398 agent._cached_system_prompt = prompt1 # simulate run_conversation caching 399 agent.reasoning_config = {"enabled": True, "effort": "low"} 400 prompt2 = agent._cached_system_prompt 401 assert prompt1 is prompt2 # same object — not invalidated by reasoning change 402 403 def test_system_prompt_frozen_across_cache_reuse(self): 404 """The cached agent's system prompt stays identical across turns.""" 405 from run_agent import AIAgent 406 407 agent = AIAgent( 408 model="anthropic/claude-sonnet-4", api_key="test", 409 base_url="https://openrouter.ai/api/v1", provider="openrouter", 410 max_iterations=5, quiet_mode=True, skip_context_files=True, 411 skip_memory=True, platform="telegram", 412 ) 413 414 # Build system prompt (simulates first run_conversation) 415 prompt1 = agent._build_system_prompt() 416 agent._cached_system_prompt = prompt1 417 418 # Simulate second turn — prompt should be frozen 419 prompt2 = agent._cached_system_prompt 420 assert prompt1 is prompt2 # same object, not rebuilt 421 422 def test_callbacks_update_without_cache_eviction(self): 423 """Per-message callbacks can be set on cached agent.""" 424 from run_agent import AIAgent 425 426 agent = AIAgent( 427 model="anthropic/claude-sonnet-4", api_key="test", 428 base_url="https://openrouter.ai/api/v1", provider="openrouter", 429 max_iterations=5, quiet_mode=True, skip_context_files=True, 430 skip_memory=True, 431 ) 432 433 # Set callbacks like the gateway does per-message 434 cb1 = lambda *a: None 435 cb2 = lambda *a: None 436 agent.tool_progress_callback = cb1 437 agent.step_callback = cb2 438 agent.stream_delta_callback = None 439 agent.status_callback = None 440 441 assert agent.tool_progress_callback is cb1 442 assert agent.step_callback is cb2 443 444 # Update for next message 445 cb3 = lambda *a: None 446 agent.tool_progress_callback = cb3 447 assert agent.tool_progress_callback is cb3 448 449 450 class TestAgentCacheBoundedGrowth: 451 """LRU cap and idle-TTL eviction prevent unbounded cache growth.""" 452 453 def _bounded_runner(self): 454 """Runner with an OrderedDict cache (matches real gateway init).""" 455 from collections import OrderedDict 456 from gateway.run import GatewayRunner 457 458 runner = GatewayRunner.__new__(GatewayRunner) 459 runner._agent_cache = OrderedDict() 460 runner._agent_cache_lock = threading.Lock() 461 return runner 462 463 def _fake_agent(self, last_activity: float | None = None): 464 """Lightweight stand-in; real AIAgent is heavy to construct.""" 465 m = MagicMock() 466 if last_activity is not None: 467 m._last_activity_ts = last_activity 468 else: 469 import time as _t 470 m._last_activity_ts = _t.time() 471 return m 472 473 def test_cap_evicts_lru_when_exceeded(self, monkeypatch): 474 """Inserting past _AGENT_CACHE_MAX_SIZE pops the oldest entry.""" 475 from gateway import run as gw_run 476 477 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 3) 478 runner = self._bounded_runner() 479 runner._cleanup_agent_resources = MagicMock() 480 481 for i in range(3): 482 runner._agent_cache[f"s{i}"] = (self._fake_agent(), f"sig{i}") 483 484 # Insert a 4th — oldest (s0) must be evicted. 485 with runner._agent_cache_lock: 486 runner._agent_cache["s3"] = (self._fake_agent(), "sig3") 487 runner._enforce_agent_cache_cap() 488 489 assert "s0" not in runner._agent_cache 490 assert "s3" in runner._agent_cache 491 assert len(runner._agent_cache) == 3 492 493 def test_cap_respects_move_to_end(self, monkeypatch): 494 """Entries refreshed via move_to_end are NOT evicted as 'oldest'.""" 495 from gateway import run as gw_run 496 497 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 3) 498 runner = self._bounded_runner() 499 runner._cleanup_agent_resources = MagicMock() 500 501 for i in range(3): 502 runner._agent_cache[f"s{i}"] = (self._fake_agent(), f"sig{i}") 503 504 # Touch s0 — it is now MRU, so s1 becomes LRU. 505 runner._agent_cache.move_to_end("s0") 506 507 with runner._agent_cache_lock: 508 runner._agent_cache["s3"] = (self._fake_agent(), "sig3") 509 runner._enforce_agent_cache_cap() 510 511 assert "s0" in runner._agent_cache # rescued by move_to_end 512 assert "s1" not in runner._agent_cache # now oldest → evicted 513 assert "s3" in runner._agent_cache 514 515 def test_cap_triggers_cleanup_thread(self, monkeypatch): 516 """Evicted agent has release_clients() called for it (soft cleanup). 517 518 Uses the soft path (_release_evicted_agent_soft), NOT the hard 519 _cleanup_agent_resources — cache eviction must not tear down 520 per-task state (terminal/browser/bg procs). 521 """ 522 from gateway import run as gw_run 523 524 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) 525 runner = self._bounded_runner() 526 527 release_calls: list = [] 528 cleanup_calls: list = [] 529 # Intercept both paths; only release_clients path should fire. 530 def _soft(agent): 531 release_calls.append(agent) 532 runner._release_evicted_agent_soft = _soft 533 runner._cleanup_agent_resources = lambda a: cleanup_calls.append(a) 534 535 old_agent = self._fake_agent() 536 new_agent = self._fake_agent() 537 with runner._agent_cache_lock: 538 runner._agent_cache["old"] = (old_agent, "sig_old") 539 runner._agent_cache["new"] = (new_agent, "sig_new") 540 runner._enforce_agent_cache_cap() 541 542 # Cleanup is dispatched to a daemon thread; join briefly to observe. 543 import time as _t 544 deadline = _t.time() + 2.0 545 while _t.time() < deadline and not release_calls: 546 _t.sleep(0.02) 547 assert old_agent in release_calls 548 assert new_agent not in release_calls 549 # Hard-cleanup path must NOT have fired — that's for session expiry only. 550 assert cleanup_calls == [] 551 552 def test_idle_ttl_sweep_evicts_stale_agents(self, monkeypatch): 553 """_sweep_idle_cached_agents removes agents idle past the TTL.""" 554 from gateway import run as gw_run 555 556 monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.05) 557 runner = self._bounded_runner() 558 runner._cleanup_agent_resources = MagicMock() 559 560 import time as _t 561 fresh = self._fake_agent(last_activity=_t.time()) 562 stale = self._fake_agent(last_activity=_t.time() - 10.0) 563 runner._agent_cache["fresh"] = (fresh, "s1") 564 runner._agent_cache["stale"] = (stale, "s2") 565 566 evicted = runner._sweep_idle_cached_agents() 567 assert evicted == 1 568 assert "stale" not in runner._agent_cache 569 assert "fresh" in runner._agent_cache 570 571 def test_idle_sweep_skips_agents_without_activity_ts(self, monkeypatch): 572 """Agents missing _last_activity_ts are left alone (defensive).""" 573 from gateway import run as gw_run 574 575 monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01) 576 runner = self._bounded_runner() 577 runner._cleanup_agent_resources = MagicMock() 578 579 no_ts = MagicMock(spec=[]) # no _last_activity_ts attribute 580 runner._agent_cache["s"] = (no_ts, "sig") 581 582 assert runner._sweep_idle_cached_agents() == 0 583 assert "s" in runner._agent_cache 584 585 def test_plain_dict_cache_is_tolerated(self): 586 """Test fixtures using plain {} don't crash _enforce_agent_cache_cap.""" 587 from gateway.run import GatewayRunner 588 589 runner = GatewayRunner.__new__(GatewayRunner) 590 runner._agent_cache = {} # plain dict, not OrderedDict 591 runner._agent_cache_lock = threading.Lock() 592 runner._cleanup_agent_resources = MagicMock() 593 594 # Should be a no-op rather than raising. 595 with runner._agent_cache_lock: 596 for i in range(200): 597 runner._agent_cache[f"s{i}"] = (MagicMock(), f"sig{i}") 598 runner._enforce_agent_cache_cap() # no crash, no eviction 599 600 assert len(runner._agent_cache) == 200 601 602 def test_main_lookup_updates_lru_order(self, monkeypatch): 603 """Cache hit via the main-lookup path refreshes LRU position.""" 604 runner = self._bounded_runner() 605 606 a0 = self._fake_agent() 607 a1 = self._fake_agent() 608 a2 = self._fake_agent() 609 runner._agent_cache["s0"] = (a0, "sig0") 610 runner._agent_cache["s1"] = (a1, "sig1") 611 runner._agent_cache["s2"] = (a2, "sig2") 612 613 # Simulate what _process_message_background does on a cache hit 614 # (minus the agent-state reset which isn't relevant here). 615 with runner._agent_cache_lock: 616 cached = runner._agent_cache.get("s0") 617 if cached and hasattr(runner._agent_cache, "move_to_end"): 618 runner._agent_cache.move_to_end("s0") 619 620 # After the hit, insertion order should be s1, s2, s0. 621 assert list(runner._agent_cache.keys()) == ["s1", "s2", "s0"] 622 623 624 class TestAgentCacheActiveSafety: 625 """Safety: eviction must not tear down agents currently mid-turn. 626 627 AIAgent.close() kills process_registry entries for the task, cleans 628 the terminal sandbox, closes the OpenAI client, and cascades 629 .close() into active child subagents. Calling it while the agent 630 is still processing would crash the in-flight request. These tests 631 pin that eviction skips any agent present in _running_agents. 632 """ 633 634 def _runner(self): 635 from collections import OrderedDict 636 from gateway.run import GatewayRunner 637 638 runner = GatewayRunner.__new__(GatewayRunner) 639 runner._agent_cache = OrderedDict() 640 runner._agent_cache_lock = threading.Lock() 641 runner._running_agents = {} 642 return runner 643 644 def _fake_agent(self, idle_seconds: float = 0.0): 645 import time as _t 646 m = MagicMock() 647 m._last_activity_ts = _t.time() - idle_seconds 648 return m 649 650 def test_cap_skips_active_lru_entry(self, monkeypatch): 651 """Active LRU entry is skipped; cache stays over cap rather than 652 compensating by evicting a newer entry. 653 654 Rationale: evicting a more-recent entry just because the oldest 655 slot is temporarily locked would punish the most recently- 656 inserted session (which has no cache to preserve) to protect 657 one that happens to be mid-turn. Better to let the cache stay 658 transiently over cap and re-check on the next insert. 659 """ 660 from gateway import run as gw_run 661 662 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 2) 663 runner = self._runner() 664 runner._cleanup_agent_resources = MagicMock() 665 666 active = self._fake_agent() 667 idle_a = self._fake_agent() 668 idle_b = self._fake_agent() 669 670 # Insertion order: active (oldest), idle_a, idle_b. 671 runner._agent_cache["session-active"] = (active, "sig") 672 runner._agent_cache["session-idle-a"] = (idle_a, "sig") 673 runner._agent_cache["session-idle-b"] = (idle_b, "sig") 674 675 # Mark `active` as mid-turn — it's LRU, but protected. 676 runner._running_agents["session-active"] = active 677 678 with runner._agent_cache_lock: 679 runner._enforce_agent_cache_cap() 680 681 # All three remain; no eviction ran, no cleanup dispatched. 682 assert "session-active" in runner._agent_cache 683 assert "session-idle-a" in runner._agent_cache 684 assert "session-idle-b" in runner._agent_cache 685 assert runner._cleanup_agent_resources.call_count == 0 686 687 def test_cap_evicts_when_multiple_excess_and_some_inactive(self, monkeypatch): 688 """Mixed active/idle in the LRU excess window: only the idle ones go. 689 690 With CAP=2 and 4 entries, excess=2 (the two oldest). If the 691 oldest is active and the next is idle, we evict exactly one. 692 Cache ends at CAP+1, which is still better than unbounded. 693 """ 694 from gateway import run as gw_run 695 696 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 2) 697 runner = self._runner() 698 runner._cleanup_agent_resources = MagicMock() 699 700 oldest_active = self._fake_agent() 701 idle_second = self._fake_agent() 702 idle_third = self._fake_agent() 703 idle_fourth = self._fake_agent() 704 705 runner._agent_cache["s1"] = (oldest_active, "sig") 706 runner._agent_cache["s2"] = (idle_second, "sig") # in excess window, idle 707 runner._agent_cache["s3"] = (idle_third, "sig") 708 runner._agent_cache["s4"] = (idle_fourth, "sig") 709 710 runner._running_agents["s1"] = oldest_active # oldest is mid-turn 711 712 with runner._agent_cache_lock: 713 runner._enforce_agent_cache_cap() 714 715 # s1 protected (active), s2 evicted (idle + in excess window), 716 # s3 and s4 untouched (outside excess window). 717 assert "s1" in runner._agent_cache 718 assert "s2" not in runner._agent_cache 719 assert "s3" in runner._agent_cache 720 assert "s4" in runner._agent_cache 721 722 def test_cap_leaves_cache_over_limit_if_all_active(self, monkeypatch, caplog): 723 """If every over-cap entry is mid-turn, the cache stays over cap. 724 725 Better to temporarily exceed the cap than to crash an in-flight 726 turn by tearing down its clients. 727 """ 728 from gateway import run as gw_run 729 import logging as _logging 730 731 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) 732 runner = self._runner() 733 runner._cleanup_agent_resources = MagicMock() 734 735 a1 = self._fake_agent() 736 a2 = self._fake_agent() 737 a3 = self._fake_agent() 738 runner._agent_cache["s1"] = (a1, "sig") 739 runner._agent_cache["s2"] = (a2, "sig") 740 runner._agent_cache["s3"] = (a3, "sig") 741 742 # All three are mid-turn. 743 runner._running_agents["s1"] = a1 744 runner._running_agents["s2"] = a2 745 runner._running_agents["s3"] = a3 746 747 with caplog.at_level(_logging.WARNING, logger="gateway.run"): 748 with runner._agent_cache_lock: 749 runner._enforce_agent_cache_cap() 750 751 # Cache unchanged because eviction had to skip every candidate. 752 assert len(runner._agent_cache) == 3 753 # _cleanup_agent_resources must NOT have been scheduled. 754 assert runner._cleanup_agent_resources.call_count == 0 755 # And we logged a warning so operators can see the condition. 756 assert any("mid-turn" in r.message for r in caplog.records) 757 758 def test_cap_pending_sentinel_does_not_block_eviction(self, monkeypatch): 759 """_AGENT_PENDING_SENTINEL in _running_agents is treated as 'not active'. 760 761 The sentinel is set while an agent is being CONSTRUCTED, before the 762 real AIAgent instance exists. Cached agents from other sessions 763 can still be evicted safely. 764 """ 765 from gateway import run as gw_run 766 from gateway.run import _AGENT_PENDING_SENTINEL 767 768 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) 769 runner = self._runner() 770 runner._cleanup_agent_resources = MagicMock() 771 772 a1 = self._fake_agent() 773 a2 = self._fake_agent() 774 runner._agent_cache["s1"] = (a1, "sig") 775 runner._agent_cache["s2"] = (a2, "sig") 776 # Another session is mid-creation — sentinel, no real agent yet. 777 runner._running_agents["s3-being-created"] = _AGENT_PENDING_SENTINEL 778 779 with runner._agent_cache_lock: 780 runner._enforce_agent_cache_cap() 781 782 assert "s1" not in runner._agent_cache # evicted normally 783 assert "s2" in runner._agent_cache 784 785 def test_idle_sweep_skips_active_agent(self, monkeypatch): 786 """Idle-TTL sweep must not tear down an active agent even if 'stale'.""" 787 from gateway import run as gw_run 788 789 monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01) 790 runner = self._runner() 791 runner._cleanup_agent_resources = MagicMock() 792 793 old_but_active = self._fake_agent(idle_seconds=10.0) 794 runner._agent_cache["s1"] = (old_but_active, "sig") 795 runner._running_agents["s1"] = old_but_active 796 797 evicted = runner._sweep_idle_cached_agents() 798 799 assert evicted == 0 800 assert "s1" in runner._agent_cache 801 assert runner._cleanup_agent_resources.call_count == 0 802 803 def test_eviction_does_not_close_active_agent_client(self, monkeypatch): 804 """Live test: evicting an active agent does NOT null its .client. 805 806 This reproduces the original concern — if eviction fired while an 807 agent was mid-turn, `agent.close()` would set `self.client = None` 808 and the next API call inside the loop would crash. With the 809 active-agent skip, the client stays intact. 810 """ 811 from gateway import run as gw_run 812 813 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1) 814 runner = self._runner() 815 816 # Build a proper fake agent whose close() matches AIAgent's contract. 817 active = MagicMock() 818 active._last_activity_ts = __import__("time").time() 819 active.client = MagicMock() # simulate an OpenAI client 820 def _real_close(): 821 active.client = None # mirrors run_agent.py:3299 822 active.close = _real_close 823 active.shutdown_memory_provider = MagicMock() 824 825 idle = self._fake_agent() 826 827 runner._agent_cache["active-session"] = (active, "sig") 828 runner._agent_cache["idle-session"] = (idle, "sig") 829 runner._running_agents["active-session"] = active 830 831 # Real cleanup function, not mocked — we want to see whether close() 832 # runs on the active agent. (It shouldn't.) 833 with runner._agent_cache_lock: 834 runner._enforce_agent_cache_cap() 835 836 # Let any eviction cleanup threads drain. 837 import time as _t 838 _t.sleep(0.2) 839 840 # The ACTIVE agent's client must still be usable. 841 assert active.client is not None, ( 842 "Active agent's client was closed by eviction — " 843 "running turn would crash on its next API call." 844 ) 845 846 847 class TestAgentCacheSpilloverLive: 848 """Live E2E: fill cache with real AIAgent instances and stress it.""" 849 850 def _runner(self): 851 from collections import OrderedDict 852 from gateway.run import GatewayRunner 853 854 runner = GatewayRunner.__new__(GatewayRunner) 855 runner._agent_cache = OrderedDict() 856 runner._agent_cache_lock = threading.Lock() 857 runner._running_agents = {} 858 return runner 859 860 def _real_agent(self): 861 """A genuine AIAgent; no API calls are made during these tests.""" 862 from run_agent import AIAgent 863 return AIAgent( 864 model="anthropic/claude-sonnet-4", api_key="test", 865 base_url="https://openrouter.ai/api/v1", provider="openrouter", 866 max_iterations=5, quiet_mode=True, 867 skip_context_files=True, skip_memory=True, 868 platform="telegram", 869 ) 870 871 def test_fill_to_cap_then_spillover(self, monkeypatch): 872 """Fill to cap with real agents, insert one more, oldest evicted.""" 873 from gateway import run as gw_run 874 875 CAP = 8 876 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) 877 runner = self._runner() 878 879 agents = [self._real_agent() for _ in range(CAP)] 880 for i, a in enumerate(agents): 881 with runner._agent_cache_lock: 882 runner._agent_cache[f"s{i}"] = (a, "sig") 883 runner._enforce_agent_cache_cap() 884 assert len(runner._agent_cache) == CAP 885 886 # Spillover insertion. 887 newcomer = self._real_agent() 888 with runner._agent_cache_lock: 889 runner._agent_cache["new"] = (newcomer, "sig") 890 runner._enforce_agent_cache_cap() 891 892 # Oldest (s0) evicted, cap still CAP. 893 assert "s0" not in runner._agent_cache 894 assert "new" in runner._agent_cache 895 assert len(runner._agent_cache) == CAP 896 897 # Clean up so pytest doesn't leak resources. 898 for a in agents + [newcomer]: 899 try: 900 a.close() 901 except Exception: 902 pass 903 904 def test_spillover_all_active_keeps_cache_over_cap(self, monkeypatch, caplog): 905 """Every slot active: cache goes over cap, no one gets torn down.""" 906 from gateway import run as gw_run 907 import logging as _logging 908 909 CAP = 4 910 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) 911 runner = self._runner() 912 913 agents = [self._real_agent() for _ in range(CAP)] 914 for i, a in enumerate(agents): 915 runner._agent_cache[f"s{i}"] = (a, "sig") 916 runner._running_agents[f"s{i}"] = a # every session mid-turn 917 918 newcomer = self._real_agent() 919 with caplog.at_level(_logging.WARNING, logger="gateway.run"): 920 with runner._agent_cache_lock: 921 runner._agent_cache["new"] = (newcomer, "sig") 922 runner._enforce_agent_cache_cap() 923 924 assert len(runner._agent_cache) == CAP + 1 # temporarily over cap 925 # All existing agents still usable. 926 for i, a in enumerate(agents): 927 assert a.client is not None, f"s{i} got closed while active!" 928 # And we warned operators. 929 assert any("mid-turn" in r.message for r in caplog.records) 930 931 for a in agents + [newcomer]: 932 try: 933 a.close() 934 except Exception: 935 pass 936 937 def test_concurrent_inserts_settle_at_cap(self, monkeypatch): 938 """Many threads inserting in parallel end with len(cache) == CAP.""" 939 from gateway import run as gw_run 940 941 CAP = 16 942 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) 943 runner = self._runner() 944 945 N_THREADS = 8 946 PER_THREAD = 20 # 8 * 20 = 160 inserts into a 16-slot cache 947 948 def worker(tid: int): 949 for j in range(PER_THREAD): 950 a = self._real_agent() 951 key = f"t{tid}-s{j}" 952 with runner._agent_cache_lock: 953 runner._agent_cache[key] = (a, "sig") 954 runner._enforce_agent_cache_cap() 955 956 threads = [ 957 threading.Thread(target=worker, args=(t,), daemon=True) 958 for t in range(N_THREADS) 959 ] 960 for t in threads: 961 t.start() 962 for t in threads: 963 t.join(timeout=30) 964 assert not t.is_alive(), "Worker thread hung — possible deadlock?" 965 966 # Let daemon cleanup threads settle. 967 import time as _t 968 _t.sleep(0.5) 969 970 assert len(runner._agent_cache) == CAP, ( 971 f"Expected exactly {CAP} entries after concurrent inserts, " 972 f"got {len(runner._agent_cache)}." 973 ) 974 975 def test_evicted_session_next_turn_gets_fresh_agent(self, monkeypatch): 976 """After eviction, the same session_key can insert a fresh agent. 977 978 Simulates the real spillover flow: evicted session sends another 979 message, which builds a new AIAgent and re-enters the cache. 980 """ 981 from gateway import run as gw_run 982 983 CAP = 2 984 monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP) 985 runner = self._runner() 986 987 a0 = self._real_agent() 988 a1 = self._real_agent() 989 runner._agent_cache["sA"] = (a0, "sig") 990 runner._agent_cache["sB"] = (a1, "sig") 991 992 # 3rd session forces sA (oldest) out. 993 a2 = self._real_agent() 994 with runner._agent_cache_lock: 995 runner._agent_cache["sC"] = (a2, "sig") 996 runner._enforce_agent_cache_cap() 997 assert "sA" not in runner._agent_cache 998 999 # Let the eviction cleanup thread run. 1000 import time as _t 1001 _t.sleep(0.3) 1002 1003 # Now sA's user sends another message → a fresh agent goes in. 1004 a0_new = self._real_agent() 1005 with runner._agent_cache_lock: 1006 runner._agent_cache["sA"] = (a0_new, "sig") 1007 runner._enforce_agent_cache_cap() 1008 1009 assert "sA" in runner._agent_cache 1010 assert runner._agent_cache["sA"][0] is a0_new # the new one, not stale 1011 # Fresh agent is usable. 1012 assert a0_new.client is not None 1013 1014 for a in (a0, a1, a2, a0_new): 1015 try: 1016 a.close() 1017 except Exception: 1018 pass 1019 1020 1021 class TestAgentCacheIdleResume: 1022 """End-to-end: idle-TTL-evicted session resumes cleanly with task state. 1023 1024 Real-world scenario: user leaves a Telegram session open for 2+ hours. 1025 Idle-TTL evicts their cached agent. They come back and send a message. 1026 The new agent built for the same session_id must inherit: 1027 - Conversation history (from SessionStore — outside cache concern) 1028 - Terminal sandbox (same task_id → same _active_environments entry) 1029 - Browser daemon (same task_id → same browser session) 1030 - Background processes (same task_id → same process_registry entries) 1031 The ONLY thing that should reset is the LLM client pool (rebuilt fresh). 1032 """ 1033 1034 def _runner(self): 1035 from collections import OrderedDict 1036 from gateway.run import GatewayRunner 1037 1038 runner = GatewayRunner.__new__(GatewayRunner) 1039 runner._agent_cache = OrderedDict() 1040 runner._agent_cache_lock = threading.Lock() 1041 runner._running_agents = {} 1042 return runner 1043 1044 def test_release_clients_does_not_touch_process_registry(self, monkeypatch): 1045 """release_clients must not call process_registry.kill_all for task_id.""" 1046 from run_agent import AIAgent 1047 1048 agent = AIAgent( 1049 model="anthropic/claude-sonnet-4", api_key="test", 1050 base_url="https://openrouter.ai/api/v1", provider="openrouter", 1051 max_iterations=5, quiet_mode=True, 1052 skip_context_files=True, skip_memory=True, 1053 session_id="idle-resume-test-session", 1054 ) 1055 1056 # Spy on process_registry.kill_all — it MUST NOT be called. 1057 from tools import process_registry as _pr 1058 kill_all_calls: list = [] 1059 original_kill_all = _pr.process_registry.kill_all 1060 _pr.process_registry.kill_all = lambda **kw: kill_all_calls.append(kw) 1061 try: 1062 agent.release_clients() 1063 finally: 1064 _pr.process_registry.kill_all = original_kill_all 1065 try: 1066 agent.close() 1067 except Exception: 1068 pass 1069 1070 assert kill_all_calls == [], ( 1071 f"release_clients() called process_registry.kill_all — would " 1072 f"kill user's bg processes on cache eviction. Calls: {kill_all_calls}" 1073 ) 1074 1075 def test_release_clients_does_not_touch_terminal_or_browser(self, monkeypatch): 1076 """release_clients must not call cleanup_vm or cleanup_browser.""" 1077 from run_agent import AIAgent 1078 from tools import terminal_tool as _tt 1079 from tools import browser_tool as _bt 1080 1081 agent = AIAgent( 1082 model="anthropic/claude-sonnet-4", api_key="test", 1083 base_url="https://openrouter.ai/api/v1", provider="openrouter", 1084 max_iterations=5, quiet_mode=True, 1085 skip_context_files=True, skip_memory=True, 1086 session_id="idle-resume-test-2", 1087 ) 1088 1089 vm_calls: list = [] 1090 browser_calls: list = [] 1091 original_vm = _tt.cleanup_vm 1092 original_browser = _bt.cleanup_browser 1093 _tt.cleanup_vm = lambda tid: vm_calls.append(tid) 1094 _bt.cleanup_browser = lambda tid: browser_calls.append(tid) 1095 try: 1096 agent.release_clients() 1097 finally: 1098 _tt.cleanup_vm = original_vm 1099 _bt.cleanup_browser = original_browser 1100 try: 1101 agent.close() 1102 except Exception: 1103 pass 1104 1105 assert vm_calls == [], ( 1106 f"release_clients() tore down terminal sandbox — user's cwd, " 1107 f"env, and bg shells would be gone on resume. Calls: {vm_calls}" 1108 ) 1109 assert browser_calls == [], ( 1110 f"release_clients() tore down browser session — user's open " 1111 f"tabs and cookies gone on resume. Calls: {browser_calls}" 1112 ) 1113 1114 def test_release_clients_closes_llm_client(self): 1115 """release_clients IS expected to close the OpenAI/httpx client.""" 1116 from run_agent import AIAgent 1117 1118 agent = AIAgent( 1119 model="anthropic/claude-sonnet-4", api_key="test", 1120 base_url="https://openrouter.ai/api/v1", provider="openrouter", 1121 max_iterations=5, quiet_mode=True, 1122 skip_context_files=True, skip_memory=True, 1123 ) 1124 # Clients are lazy-built; force one to exist so we can verify close. 1125 assert agent.client is not None # __init__ builds it 1126 1127 agent.release_clients() 1128 1129 # Post-release: client reference is dropped (memory freed). 1130 assert agent.client is None 1131 1132 def test_close_vs_release_full_teardown_difference(self, monkeypatch): 1133 """close() tears down task state; release_clients() does not. 1134 1135 This pins the semantic contract: session-expiry path uses close() 1136 (full teardown — session is done), cache-eviction path uses 1137 release_clients() (soft — session may resume). 1138 """ 1139 from run_agent import AIAgent 1140 import run_agent as _ra 1141 1142 # Agent A: evicted from cache (soft) — terminal survives. 1143 # Agent B: session expired (hard) — terminal torn down. 1144 agent_a = AIAgent( 1145 model="anthropic/claude-sonnet-4", api_key="test", 1146 base_url="https://openrouter.ai/api/v1", provider="openrouter", 1147 max_iterations=5, quiet_mode=True, 1148 skip_context_files=True, skip_memory=True, 1149 session_id="soft-session", 1150 ) 1151 agent_b = AIAgent( 1152 model="anthropic/claude-sonnet-4", api_key="test", 1153 base_url="https://openrouter.ai/api/v1", provider="openrouter", 1154 max_iterations=5, quiet_mode=True, 1155 skip_context_files=True, skip_memory=True, 1156 session_id="hard-session", 1157 ) 1158 1159 vm_calls: list = [] 1160 # AIAgent.close() calls the ``cleanup_vm`` name bound into 1161 # ``run_agent`` at import time, not ``tools.terminal_tool.cleanup_vm`` 1162 # directly — so patch the ``run_agent`` reference. 1163 original_vm = _ra.cleanup_vm 1164 _ra.cleanup_vm = lambda tid: vm_calls.append(tid) 1165 try: 1166 agent_a.release_clients() # cache eviction 1167 agent_b.close() # session expiry 1168 finally: 1169 _ra.cleanup_vm = original_vm 1170 try: 1171 agent_a.close() 1172 except Exception: 1173 pass 1174 1175 # Only agent_b's task_id should appear in cleanup calls. 1176 assert "hard-session" in vm_calls 1177 assert "soft-session" not in vm_calls 1178 1179 def test_idle_evicted_session_rebuild_inherits_task_id(self, monkeypatch): 1180 """After idle-TTL eviction, a fresh agent with the same session_id 1181 gets the same task_id — so tool state (terminal/browser/bg procs) 1182 that persisted across eviction is reachable via the new agent. 1183 """ 1184 from gateway import run as gw_run 1185 from run_agent import AIAgent 1186 1187 monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01) 1188 runner = self._runner() 1189 1190 # Build an agent representing a stale (idle) session. 1191 SESSION_ID = "long-lived-user-session" 1192 old = AIAgent( 1193 model="anthropic/claude-sonnet-4", api_key="test", 1194 base_url="https://openrouter.ai/api/v1", provider="openrouter", 1195 max_iterations=5, quiet_mode=True, 1196 skip_context_files=True, skip_memory=True, 1197 session_id=SESSION_ID, 1198 ) 1199 old._last_activity_ts = 0.0 # force idle 1200 runner._agent_cache["sKey"] = (old, "sig") 1201 1202 # Simulate the idle-TTL sweep firing. 1203 runner._sweep_idle_cached_agents() 1204 assert "sKey" not in runner._agent_cache 1205 1206 # Wait for the daemon thread doing release_clients() to finish. 1207 import time as _t 1208 _t.sleep(0.3) 1209 1210 # Old agent's client is gone (soft cleanup fired). 1211 assert old.client is None 1212 1213 # User comes back — new agent built for the SAME session_id. 1214 new_agent = AIAgent( 1215 model="anthropic/claude-sonnet-4", api_key="test", 1216 base_url="https://openrouter.ai/api/v1", provider="openrouter", 1217 max_iterations=5, quiet_mode=True, 1218 skip_context_files=True, skip_memory=True, 1219 session_id=SESSION_ID, 1220 ) 1221 1222 # Same session_id means same task_id routed to tools. The new 1223 # agent inherits any per-task state (terminal sandbox etc.) that 1224 # was preserved across eviction. 1225 assert new_agent.session_id == old.session_id == SESSION_ID 1226 # And it has a fresh working client. 1227 assert new_agent.client is not None 1228 1229 try: 1230 new_agent.close() 1231 except Exception: 1232 pass 1233 1234 1235 _FAKE_NOW = 10_000.0 # Fixed epoch for deterministic time assertions 1236 1237 1238 class TestCachedAgentInactivityReset: 1239 """Inactivity-clock reset must be gated on _interrupt_depth == 0. 1240 1241 On interrupt-recursive turns (_interrupt_depth > 0) the clock must 1242 keep accumulating so the inactivity watchdog can fire when a turn is 1243 stuck in an interrupt loop. Resetting unconditionally prevented the 1244 30-min timeout from triggering (#15654). The depth-0 reset is still 1245 needed: a session idle for 29 min must not trip the watchdog before 1246 the new turn makes its first API call (#9051). 1247 """ 1248 1249 def _fake_agent(self, stale_seconds: float = 1800.0): 1250 m = MagicMock() 1251 m._last_activity_ts = _FAKE_NOW - stale_seconds 1252 m._api_call_count = 10 1253 m._last_activity_desc = "previous turn activity" 1254 return m 1255 1256 def test_fresh_turn_resets_idle_clock(self): 1257 """interrupt_depth=0: clock resets so a post-idle turn gets a 1258 fresh 30-min inactivity window (guard for #9051).""" 1259 from gateway.run import GatewayRunner 1260 1261 agent = self._fake_agent(stale_seconds=1800.0) 1262 old_ts = agent._last_activity_ts 1263 1264 with patch("gateway.run.time") as mock_time: 1265 mock_time.time.return_value = _FAKE_NOW 1266 GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=0) 1267 1268 assert agent._last_activity_ts == _FAKE_NOW, ( 1269 "_last_activity_ts was not reset on a fresh turn (interrupt_depth=0)" 1270 ) 1271 assert agent._last_activity_ts > old_ts, ( 1272 "Stale idle time should be cleared so the new turn gets a fresh window" 1273 ) 1274 1275 def test_fresh_turn_resets_desc(self): 1276 """interrupt_depth=0: description is updated to reflect the new turn.""" 1277 from gateway.run import GatewayRunner 1278 1279 agent = self._fake_agent() 1280 1281 with patch("gateway.run.time") as mock_time: 1282 mock_time.time.return_value = _FAKE_NOW 1283 GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=0) 1284 1285 assert agent._last_activity_desc == "starting new turn (cached)" 1286 1287 def test_interrupt_turn_preserves_idle_clock(self): 1288 """interrupt_depth=1: clock preserved so accumulated stuck-turn 1289 idle time is not discarded by an interrupt-recursive re-entry (#15654).""" 1290 from gateway.run import GatewayRunner 1291 1292 agent = self._fake_agent(stale_seconds=1200.0) 1293 old_ts = agent._last_activity_ts 1294 1295 GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1) 1296 1297 assert agent._last_activity_ts == old_ts, ( 1298 "_last_activity_ts must not be reset on interrupt-recursive turns " 1299 "(interrupt_depth>0) — the watchdog needs the accumulated idle time" 1300 ) 1301 1302 def test_interrupt_turn_preserves_desc(self): 1303 """interrupt_depth=1: desc preserved — it is semantically paired with ts.""" 1304 from gateway.run import GatewayRunner 1305 1306 agent = self._fake_agent(stale_seconds=1200.0) 1307 1308 GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1) 1309 1310 assert agent._last_activity_desc == "previous turn activity", ( 1311 "_last_activity_desc must not change on interrupt-recursive turns; " 1312 "it describes the activity *at* _last_activity_ts" 1313 ) 1314 1315 def test_deep_interrupt_recursion_preserves_idle_clock(self): 1316 """interrupt_depth=MAX-1: clock still preserved at any non-zero depth.""" 1317 from gateway.run import GatewayRunner 1318 1319 agent = self._fake_agent(stale_seconds=600.0) 1320 old_ts = agent._last_activity_ts 1321 1322 GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=4) 1323 1324 assert agent._last_activity_ts == old_ts 1325 1326 def test_api_call_count_reset_regardless_of_depth(self): 1327 """_api_call_count is always reset to 0 for the new turn, at any depth.""" 1328 from gateway.run import GatewayRunner 1329 1330 agent_fresh = self._fake_agent() 1331 agent_interrupted = self._fake_agent() 1332 1333 with patch("gateway.run.time") as mock_time: 1334 mock_time.time.return_value = _FAKE_NOW 1335 GatewayRunner._init_cached_agent_for_turn(agent_fresh, interrupt_depth=0) 1336 GatewayRunner._init_cached_agent_for_turn(agent_interrupted, interrupt_depth=1) 1337 1338 assert agent_fresh._api_call_count == 0 1339 assert agent_interrupted._api_call_count == 0 1340 1341 def test_watchdog_accumulation_across_recursive_turns(self): 1342 """Scenario: stuck turn + user interrupt → recursive turn. 1343 1344 The idle time seen by the watchdog must reflect the full stuck 1345 duration, not restart from zero on the recursive re-entry. 1346 """ 1347 from gateway.run import GatewayRunner 1348 1349 STUCK_FOR = 1750.0 1350 agent = self._fake_agent(stale_seconds=STUCK_FOR) 1351 1352 # Simulate: user sees "Still working..." and sends another message. 1353 # That triggers an interrupt → _run_agent recurses at depth=1. 1354 GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1) 1355 1356 # Watchdog sees time.time() - _last_activity_ts ≥ STUCK_FOR. 1357 idle_secs = _FAKE_NOW - agent._last_activity_ts 1358 assert idle_secs >= STUCK_FOR - 1.0, ( 1359 f"Watchdog would see {idle_secs:.0f}s idle, expected ~{STUCK_FOR}s. " 1360 "Inactivity timeout could not fire for a stuck interrupted turn." 1361 )