/ tests / gateway / test_agent_cache.py
test_agent_cache.py
   1  """Integration tests for gateway AIAgent caching.
   2  
   3  Verifies that the agent cache correctly:
   4  - Reuses agents across messages (same config → same instance)
   5  - Rebuilds agents when config changes (model, provider, toolsets)
   6  - Updates reasoning_config in-place without rebuilding
   7  - Evicts on session reset
   8  - Evicts on fallback activation
   9  - Preserves frozen system prompt across turns
  10  """
  11  
  12  import hashlib
  13  import json
  14  import threading
  15  from unittest.mock import MagicMock, patch
  16  
  17  import pytest
  18  
  19  
  20  def _make_runner():
  21      """Create a minimal GatewayRunner with just the cache infrastructure."""
  22      from gateway.run import GatewayRunner
  23  
  24      runner = GatewayRunner.__new__(GatewayRunner)
  25      runner._agent_cache = {}
  26      runner._agent_cache_lock = threading.Lock()
  27      return runner
  28  
  29  
  30  class TestAgentConfigSignature:
  31      """Config signature produces stable, distinct keys."""
  32  
  33      def test_same_config_same_signature(self):
  34          from gateway.run import GatewayRunner
  35  
  36          runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1",
  37                      "provider": "openrouter", "api_mode": "chat_completions"}
  38          sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "")
  39          sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "")
  40          assert sig1 == sig2
  41  
  42      def test_model_change_different_signature(self):
  43          from gateway.run import GatewayRunner
  44  
  45          runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1",
  46                      "provider": "openrouter"}
  47          sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "")
  48          sig2 = GatewayRunner._agent_config_signature("claude-opus-4.6", runtime, ["hermes-telegram"], "")
  49          assert sig1 != sig2
  50  
  51      def test_same_token_prefix_different_full_token_changes_signature(self):
  52          """Tokens sharing a JWT-style prefix must not collide."""
  53          from gateway.run import GatewayRunner
  54  
  55          rt1 = {
  56              "api_key": "eyJhbGci.token-for-account-a",
  57              "base_url": "https://chatgpt.com/backend-api/codex",
  58              "provider": "openai-codex",
  59              "api_mode": "codex_responses",
  60          }
  61          rt2 = {
  62              "api_key": "eyJhbGci.token-for-account-b",
  63              "base_url": "https://chatgpt.com/backend-api/codex",
  64              "provider": "openai-codex",
  65              "api_mode": "codex_responses",
  66          }
  67  
  68          assert rt1["api_key"][:8] == rt2["api_key"][:8]
  69          sig1 = GatewayRunner._agent_config_signature("gpt-5.3-codex", rt1, ["hermes-telegram"], "")
  70          sig2 = GatewayRunner._agent_config_signature("gpt-5.3-codex", rt2, ["hermes-telegram"], "")
  71          assert sig1 != sig2
  72  
  73      def test_provider_change_different_signature(self):
  74          from gateway.run import GatewayRunner
  75  
  76          rt1 = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"}
  77          rt2 = {"api_key": "sk-test12345678", "base_url": "https://api.anthropic.com", "provider": "anthropic"}
  78          sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", rt1, ["hermes-telegram"], "")
  79          sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", rt2, ["hermes-telegram"], "")
  80          assert sig1 != sig2
  81  
  82      def test_toolset_change_different_signature(self):
  83          from gateway.run import GatewayRunner
  84  
  85          runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"}
  86          sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "")
  87          sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-discord"], "")
  88          assert sig1 != sig2
  89  
  90      def test_reasoning_not_in_signature(self):
  91          """Reasoning config is set per-message, not part of the signature."""
  92          from gateway.run import GatewayRunner
  93  
  94          runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"}
  95          # Same config — signature should be identical regardless of what
  96          # reasoning_config the caller might have (it's not passed in)
  97          sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "")
  98          sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "")
  99          assert sig1 == sig2
 100  
 101      # ---------------------------------------------------------------
 102      # cache_keys (compression/context config cache-busting)
 103      # ---------------------------------------------------------------
 104  
 105      def test_cache_keys_default_omitted_matches_empty(self):
 106          """Omitted cache_keys must produce the same signature as empty {}."""
 107          from gateway.run import GatewayRunner
 108  
 109          runtime = {"api_key": "k", "base_url": "u", "provider": "p"}
 110          sig_omitted = GatewayRunner._agent_config_signature("m", runtime, [], "")
 111          sig_empty = GatewayRunner._agent_config_signature("m", runtime, [], "", cache_keys={})
 112          sig_none = GatewayRunner._agent_config_signature("m", runtime, [], "", cache_keys=None)
 113          assert sig_omitted == sig_empty == sig_none
 114  
 115      def test_context_length_change_busts_cache(self):
 116          """Editing model.context_length in config must produce a new signature."""
 117          from gateway.run import GatewayRunner
 118  
 119          runtime = {"api_key": "k", "base_url": "u", "provider": "p"}
 120          sig1 = GatewayRunner._agent_config_signature(
 121              "m", runtime, [], "",
 122              cache_keys={"model.context_length": 200_000},
 123          )
 124          sig2 = GatewayRunner._agent_config_signature(
 125              "m", runtime, [], "",
 126              cache_keys={"model.context_length": 400_000},
 127          )
 128          assert sig1 != sig2
 129  
 130      def test_compression_threshold_change_busts_cache(self):
 131          from gateway.run import GatewayRunner
 132  
 133          runtime = {"api_key": "k", "base_url": "u", "provider": "p"}
 134          sig1 = GatewayRunner._agent_config_signature(
 135              "m", runtime, [], "",
 136              cache_keys={"compression.threshold": 0.50},
 137          )
 138          sig2 = GatewayRunner._agent_config_signature(
 139              "m", runtime, [], "",
 140              cache_keys={"compression.threshold": 0.75},
 141          )
 142          assert sig1 != sig2
 143  
 144      def test_compression_enabled_toggle_busts_cache(self):
 145          from gateway.run import GatewayRunner
 146  
 147          runtime = {"api_key": "k", "base_url": "u", "provider": "p"}
 148          sig_on = GatewayRunner._agent_config_signature(
 149              "m", runtime, [], "",
 150              cache_keys={"compression.enabled": True},
 151          )
 152          sig_off = GatewayRunner._agent_config_signature(
 153              "m", runtime, [], "",
 154              cache_keys={"compression.enabled": False},
 155          )
 156          assert sig_on != sig_off
 157  
 158      def test_cache_keys_key_order_does_not_matter(self):
 159          """Signature must be stable regardless of dict key insertion order."""
 160          from gateway.run import GatewayRunner
 161  
 162          runtime = {"api_key": "k", "base_url": "u", "provider": "p"}
 163          sig_a = GatewayRunner._agent_config_signature(
 164              "m", runtime, [], "",
 165              cache_keys={"model.context_length": 200_000, "compression.threshold": 0.5},
 166          )
 167          sig_b = GatewayRunner._agent_config_signature(
 168              "m", runtime, [], "",
 169              cache_keys={"compression.threshold": 0.5, "model.context_length": 200_000},
 170          )
 171          assert sig_a == sig_b
 172  
 173      def test_tool_registry_generation_change_busts_cache(self):
 174          """MCP reloads mutate the tool registry, so cached agents must rebuild."""
 175          from gateway.run import GatewayRunner
 176  
 177          runtime = {"api_key": "k", "base_url": "u", "provider": "p"}
 178          sig_before = GatewayRunner._agent_config_signature(
 179              "m", runtime, ["telegram"], "",
 180              cache_keys={"tools.registry_generation": 10},
 181          )
 182          sig_after = GatewayRunner._agent_config_signature(
 183              "m", runtime, ["telegram"], "",
 184              cache_keys={"tools.registry_generation": 11},
 185          )
 186  
 187          assert sig_before != sig_after
 188  
 189  
 190  class TestExtractCacheBustingConfig:
 191      """Verify _extract_cache_busting_config pulls the documented subset of
 192      config values that must invalidate the cached agent on change."""
 193  
 194      def test_reads_model_context_length(self):
 195          from gateway.run import GatewayRunner
 196  
 197          out = GatewayRunner._extract_cache_busting_config(
 198              {"model": {"context_length": 272_000, "provider": "openrouter"}}
 199          )
 200          assert out["model.context_length"] == 272_000
 201  
 202      def test_reads_compression_subkeys(self):
 203          from gateway.run import GatewayRunner
 204  
 205          out = GatewayRunner._extract_cache_busting_config(
 206              {
 207                  "compression": {
 208                      "enabled": False,
 209                      "threshold": 0.6,
 210                      "target_ratio": 0.3,
 211                      "protect_last_n": 25,
 212                      "some_other_key": "ignored",
 213                  }
 214              }
 215          )
 216          assert out["compression.enabled"] is False
 217          assert out["compression.threshold"] == 0.6
 218          assert out["compression.target_ratio"] == 0.3
 219          assert out["compression.protect_last_n"] == 25
 220  
 221      def test_missing_keys_yield_none(self):
 222          """Absent config keys must produce None values (still contribute to signature)."""
 223          from gateway.run import GatewayRunner
 224  
 225          out = GatewayRunner._extract_cache_busting_config({})
 226          # Every documented cache-busting key must be present, even if None
 227          for section, key in GatewayRunner._CACHE_BUSTING_CONFIG_KEYS:
 228              assert f"{section}.{key}" in out
 229              assert out[f"{section}.{key}"] is None
 230  
 231      def test_non_dict_section_treated_as_missing(self):
 232          from gateway.run import GatewayRunner
 233  
 234          # compression is a string — should not crash, all compression.* keys None
 235          out = GatewayRunner._extract_cache_busting_config(
 236              {"compression": "broken", "model": {"context_length": 100_000}}
 237          )
 238          assert out["compression.enabled"] is None
 239          assert out["compression.threshold"] is None
 240          assert out["model.context_length"] == 100_000
 241  
 242      def test_none_config_is_safe(self):
 243          from gateway.run import GatewayRunner
 244  
 245          out = GatewayRunner._extract_cache_busting_config(None)
 246          for section, key in GatewayRunner._CACHE_BUSTING_CONFIG_KEYS:
 247              assert out[f"{section}.{key}"] is None
 248          assert "tools.registry_generation" in out
 249  
 250      def test_extract_includes_live_tool_registry_generation(self, monkeypatch):
 251          from gateway.run import GatewayRunner
 252          from tools.registry import registry
 253  
 254          monkeypatch.setattr(registry, "_generation", 12345)
 255  
 256          out = GatewayRunner._extract_cache_busting_config({})
 257  
 258          assert out["tools.registry_generation"] == 12345
 259  
 260      def test_full_round_trip_busts_cache_on_real_edit(self):
 261          """End-to-end: simulate a config edit on main and verify the
 262          extracted cache_keys change produces a new signature."""
 263          from gateway.run import GatewayRunner
 264  
 265          runtime = {"api_key": "k", "base_url": "u", "provider": "p"}
 266          cfg_before = {
 267              "model": {"context_length": 200_000},
 268              "compression": {"threshold": 0.50, "enabled": True},
 269          }
 270          cfg_after = {
 271              "model": {"context_length": 200_000},
 272              "compression": {"threshold": 0.75, "enabled": True},  # user raised threshold
 273          }
 274  
 275          sig_before = GatewayRunner._agent_config_signature(
 276              "m", runtime, [], "",
 277              cache_keys=GatewayRunner._extract_cache_busting_config(cfg_before),
 278          )
 279          sig_after = GatewayRunner._agent_config_signature(
 280              "m", runtime, [], "",
 281              cache_keys=GatewayRunner._extract_cache_busting_config(cfg_after),
 282          )
 283          assert sig_before != sig_after, (
 284              "Editing compression.threshold in config.yaml must bust the "
 285              "gateway's cached agent so the new threshold takes effect."
 286          )
 287  
 288  
 289  class TestAgentCacheLifecycle:
 290      """End-to-end cache behavior with real AIAgent construction."""
 291  
 292      def test_cache_hit_returns_same_agent(self):
 293          """Second message with same config reuses the cached agent instance."""
 294          from run_agent import AIAgent
 295  
 296          runner = _make_runner()
 297          session_key = "telegram:12345"
 298          runtime = {"api_key": "test", "base_url": "https://openrouter.ai/api/v1",
 299                      "provider": "openrouter", "api_mode": "chat_completions"}
 300          sig = runner._agent_config_signature("anthropic/claude-sonnet-4", runtime, ["hermes-telegram"], "")
 301  
 302          # First message — create and cache
 303          agent1 = AIAgent(
 304              model="anthropic/claude-sonnet-4", api_key="test",
 305              base_url="https://openrouter.ai/api/v1", provider="openrouter",
 306              max_iterations=5, quiet_mode=True, skip_context_files=True,
 307              skip_memory=True, platform="telegram",
 308          )
 309          with runner._agent_cache_lock:
 310              runner._agent_cache[session_key] = (agent1, sig)
 311  
 312          # Second message — cache hit
 313          with runner._agent_cache_lock:
 314              cached = runner._agent_cache.get(session_key)
 315          assert cached is not None
 316          assert cached[1] == sig
 317          assert cached[0] is agent1  # same instance
 318  
 319      def test_cache_miss_on_model_change(self):
 320          """Model change produces different signature → cache miss."""
 321          from run_agent import AIAgent
 322  
 323          runner = _make_runner()
 324          session_key = "telegram:12345"
 325          runtime = {"api_key": "test", "base_url": "https://openrouter.ai/api/v1",
 326                      "provider": "openrouter", "api_mode": "chat_completions"}
 327  
 328          old_sig = runner._agent_config_signature("anthropic/claude-sonnet-4", runtime, ["hermes-telegram"], "")
 329          agent1 = AIAgent(
 330              model="anthropic/claude-sonnet-4", api_key="test",
 331              base_url="https://openrouter.ai/api/v1", provider="openrouter",
 332              max_iterations=5, quiet_mode=True, skip_context_files=True,
 333              skip_memory=True, platform="telegram",
 334          )
 335          with runner._agent_cache_lock:
 336              runner._agent_cache[session_key] = (agent1, old_sig)
 337  
 338          # New model → different signature
 339          new_sig = runner._agent_config_signature("anthropic/claude-opus-4.6", runtime, ["hermes-telegram"], "")
 340          assert new_sig != old_sig
 341  
 342          with runner._agent_cache_lock:
 343              cached = runner._agent_cache.get(session_key)
 344          assert cached[1] != new_sig  # signature mismatch → would create new agent
 345  
 346      def test_evict_on_session_reset(self):
 347          """_evict_cached_agent removes the entry."""
 348          from run_agent import AIAgent
 349  
 350          runner = _make_runner()
 351          session_key = "telegram:12345"
 352  
 353          agent = AIAgent(
 354              model="anthropic/claude-sonnet-4", api_key="test",
 355              base_url="https://openrouter.ai/api/v1", provider="openrouter",
 356              max_iterations=5, quiet_mode=True, skip_context_files=True,
 357              skip_memory=True,
 358          )
 359          with runner._agent_cache_lock:
 360              runner._agent_cache[session_key] = (agent, "sig123")
 361  
 362          runner._evict_cached_agent(session_key)
 363  
 364          with runner._agent_cache_lock:
 365              assert session_key not in runner._agent_cache
 366  
 367      def test_evict_does_not_affect_other_sessions(self):
 368          """Evicting one session leaves other sessions cached."""
 369          runner = _make_runner()
 370          with runner._agent_cache_lock:
 371              runner._agent_cache["session-A"] = ("agent-A", "sig-A")
 372              runner._agent_cache["session-B"] = ("agent-B", "sig-B")
 373  
 374          runner._evict_cached_agent("session-A")
 375  
 376          with runner._agent_cache_lock:
 377              assert "session-A" not in runner._agent_cache
 378              assert "session-B" in runner._agent_cache
 379  
 380      def test_reasoning_config_updates_in_place(self):
 381          """Reasoning config can be set on a cached agent without eviction."""
 382          from run_agent import AIAgent
 383  
 384          agent = AIAgent(
 385              model="anthropic/claude-sonnet-4", api_key="test",
 386              base_url="https://openrouter.ai/api/v1", provider="openrouter",
 387              max_iterations=5, quiet_mode=True, skip_context_files=True,
 388              skip_memory=True,
 389              reasoning_config={"enabled": True, "effort": "medium"},
 390          )
 391  
 392          # Simulate per-message reasoning update
 393          agent.reasoning_config = {"enabled": True, "effort": "high"}
 394          assert agent.reasoning_config["effort"] == "high"
 395  
 396          # System prompt should not be affected by reasoning change
 397          prompt1 = agent._build_system_prompt()
 398          agent._cached_system_prompt = prompt1  # simulate run_conversation caching
 399          agent.reasoning_config = {"enabled": True, "effort": "low"}
 400          prompt2 = agent._cached_system_prompt
 401          assert prompt1 is prompt2  # same object — not invalidated by reasoning change
 402  
 403      def test_system_prompt_frozen_across_cache_reuse(self):
 404          """The cached agent's system prompt stays identical across turns."""
 405          from run_agent import AIAgent
 406  
 407          agent = AIAgent(
 408              model="anthropic/claude-sonnet-4", api_key="test",
 409              base_url="https://openrouter.ai/api/v1", provider="openrouter",
 410              max_iterations=5, quiet_mode=True, skip_context_files=True,
 411              skip_memory=True, platform="telegram",
 412          )
 413  
 414          # Build system prompt (simulates first run_conversation)
 415          prompt1 = agent._build_system_prompt()
 416          agent._cached_system_prompt = prompt1
 417  
 418          # Simulate second turn — prompt should be frozen
 419          prompt2 = agent._cached_system_prompt
 420          assert prompt1 is prompt2  # same object, not rebuilt
 421  
 422      def test_callbacks_update_without_cache_eviction(self):
 423          """Per-message callbacks can be set on cached agent."""
 424          from run_agent import AIAgent
 425  
 426          agent = AIAgent(
 427              model="anthropic/claude-sonnet-4", api_key="test",
 428              base_url="https://openrouter.ai/api/v1", provider="openrouter",
 429              max_iterations=5, quiet_mode=True, skip_context_files=True,
 430              skip_memory=True,
 431          )
 432  
 433          # Set callbacks like the gateway does per-message
 434          cb1 = lambda *a: None
 435          cb2 = lambda *a: None
 436          agent.tool_progress_callback = cb1
 437          agent.step_callback = cb2
 438          agent.stream_delta_callback = None
 439          agent.status_callback = None
 440  
 441          assert agent.tool_progress_callback is cb1
 442          assert agent.step_callback is cb2
 443  
 444          # Update for next message
 445          cb3 = lambda *a: None
 446          agent.tool_progress_callback = cb3
 447          assert agent.tool_progress_callback is cb3
 448  
 449  
 450  class TestAgentCacheBoundedGrowth:
 451      """LRU cap and idle-TTL eviction prevent unbounded cache growth."""
 452  
 453      def _bounded_runner(self):
 454          """Runner with an OrderedDict cache (matches real gateway init)."""
 455          from collections import OrderedDict
 456          from gateway.run import GatewayRunner
 457  
 458          runner = GatewayRunner.__new__(GatewayRunner)
 459          runner._agent_cache = OrderedDict()
 460          runner._agent_cache_lock = threading.Lock()
 461          return runner
 462  
 463      def _fake_agent(self, last_activity: float | None = None):
 464          """Lightweight stand-in; real AIAgent is heavy to construct."""
 465          m = MagicMock()
 466          if last_activity is not None:
 467              m._last_activity_ts = last_activity
 468          else:
 469              import time as _t
 470              m._last_activity_ts = _t.time()
 471          return m
 472  
 473      def test_cap_evicts_lru_when_exceeded(self, monkeypatch):
 474          """Inserting past _AGENT_CACHE_MAX_SIZE pops the oldest entry."""
 475          from gateway import run as gw_run
 476  
 477          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 3)
 478          runner = self._bounded_runner()
 479          runner._cleanup_agent_resources = MagicMock()
 480  
 481          for i in range(3):
 482              runner._agent_cache[f"s{i}"] = (self._fake_agent(), f"sig{i}")
 483  
 484          # Insert a 4th — oldest (s0) must be evicted.
 485          with runner._agent_cache_lock:
 486              runner._agent_cache["s3"] = (self._fake_agent(), "sig3")
 487              runner._enforce_agent_cache_cap()
 488  
 489          assert "s0" not in runner._agent_cache
 490          assert "s3" in runner._agent_cache
 491          assert len(runner._agent_cache) == 3
 492  
 493      def test_cap_respects_move_to_end(self, monkeypatch):
 494          """Entries refreshed via move_to_end are NOT evicted as 'oldest'."""
 495          from gateway import run as gw_run
 496  
 497          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 3)
 498          runner = self._bounded_runner()
 499          runner._cleanup_agent_resources = MagicMock()
 500  
 501          for i in range(3):
 502              runner._agent_cache[f"s{i}"] = (self._fake_agent(), f"sig{i}")
 503  
 504          # Touch s0 — it is now MRU, so s1 becomes LRU.
 505          runner._agent_cache.move_to_end("s0")
 506  
 507          with runner._agent_cache_lock:
 508              runner._agent_cache["s3"] = (self._fake_agent(), "sig3")
 509              runner._enforce_agent_cache_cap()
 510  
 511          assert "s0" in runner._agent_cache  # rescued by move_to_end
 512          assert "s1" not in runner._agent_cache  # now oldest → evicted
 513          assert "s3" in runner._agent_cache
 514  
 515      def test_cap_triggers_cleanup_thread(self, monkeypatch):
 516          """Evicted agent has release_clients() called for it (soft cleanup).
 517  
 518          Uses the soft path (_release_evicted_agent_soft), NOT the hard
 519          _cleanup_agent_resources — cache eviction must not tear down
 520          per-task state (terminal/browser/bg procs).
 521          """
 522          from gateway import run as gw_run
 523  
 524          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
 525          runner = self._bounded_runner()
 526  
 527          release_calls: list = []
 528          cleanup_calls: list = []
 529          # Intercept both paths; only release_clients path should fire.
 530          def _soft(agent):
 531              release_calls.append(agent)
 532          runner._release_evicted_agent_soft = _soft
 533          runner._cleanup_agent_resources = lambda a: cleanup_calls.append(a)
 534  
 535          old_agent = self._fake_agent()
 536          new_agent = self._fake_agent()
 537          with runner._agent_cache_lock:
 538              runner._agent_cache["old"] = (old_agent, "sig_old")
 539              runner._agent_cache["new"] = (new_agent, "sig_new")
 540              runner._enforce_agent_cache_cap()
 541  
 542          # Cleanup is dispatched to a daemon thread; join briefly to observe.
 543          import time as _t
 544          deadline = _t.time() + 2.0
 545          while _t.time() < deadline and not release_calls:
 546              _t.sleep(0.02)
 547          assert old_agent in release_calls
 548          assert new_agent not in release_calls
 549          # Hard-cleanup path must NOT have fired — that's for session expiry only.
 550          assert cleanup_calls == []
 551  
 552      def test_idle_ttl_sweep_evicts_stale_agents(self, monkeypatch):
 553          """_sweep_idle_cached_agents removes agents idle past the TTL."""
 554          from gateway import run as gw_run
 555  
 556          monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.05)
 557          runner = self._bounded_runner()
 558          runner._cleanup_agent_resources = MagicMock()
 559  
 560          import time as _t
 561          fresh = self._fake_agent(last_activity=_t.time())
 562          stale = self._fake_agent(last_activity=_t.time() - 10.0)
 563          runner._agent_cache["fresh"] = (fresh, "s1")
 564          runner._agent_cache["stale"] = (stale, "s2")
 565  
 566          evicted = runner._sweep_idle_cached_agents()
 567          assert evicted == 1
 568          assert "stale" not in runner._agent_cache
 569          assert "fresh" in runner._agent_cache
 570  
 571      def test_idle_sweep_skips_agents_without_activity_ts(self, monkeypatch):
 572          """Agents missing _last_activity_ts are left alone (defensive)."""
 573          from gateway import run as gw_run
 574  
 575          monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01)
 576          runner = self._bounded_runner()
 577          runner._cleanup_agent_resources = MagicMock()
 578  
 579          no_ts = MagicMock(spec=[])  # no _last_activity_ts attribute
 580          runner._agent_cache["s"] = (no_ts, "sig")
 581  
 582          assert runner._sweep_idle_cached_agents() == 0
 583          assert "s" in runner._agent_cache
 584  
 585      def test_plain_dict_cache_is_tolerated(self):
 586          """Test fixtures using plain {} don't crash _enforce_agent_cache_cap."""
 587          from gateway.run import GatewayRunner
 588  
 589          runner = GatewayRunner.__new__(GatewayRunner)
 590          runner._agent_cache = {}  # plain dict, not OrderedDict
 591          runner._agent_cache_lock = threading.Lock()
 592          runner._cleanup_agent_resources = MagicMock()
 593  
 594          # Should be a no-op rather than raising.
 595          with runner._agent_cache_lock:
 596              for i in range(200):
 597                  runner._agent_cache[f"s{i}"] = (MagicMock(), f"sig{i}")
 598              runner._enforce_agent_cache_cap()  # no crash, no eviction
 599  
 600          assert len(runner._agent_cache) == 200
 601  
 602      def test_main_lookup_updates_lru_order(self, monkeypatch):
 603          """Cache hit via the main-lookup path refreshes LRU position."""
 604          runner = self._bounded_runner()
 605  
 606          a0 = self._fake_agent()
 607          a1 = self._fake_agent()
 608          a2 = self._fake_agent()
 609          runner._agent_cache["s0"] = (a0, "sig0")
 610          runner._agent_cache["s1"] = (a1, "sig1")
 611          runner._agent_cache["s2"] = (a2, "sig2")
 612  
 613          # Simulate what _process_message_background does on a cache hit
 614          # (minus the agent-state reset which isn't relevant here).
 615          with runner._agent_cache_lock:
 616              cached = runner._agent_cache.get("s0")
 617              if cached and hasattr(runner._agent_cache, "move_to_end"):
 618                  runner._agent_cache.move_to_end("s0")
 619  
 620          # After the hit, insertion order should be s1, s2, s0.
 621          assert list(runner._agent_cache.keys()) == ["s1", "s2", "s0"]
 622  
 623  
 624  class TestAgentCacheActiveSafety:
 625      """Safety: eviction must not tear down agents currently mid-turn.
 626  
 627      AIAgent.close() kills process_registry entries for the task, cleans
 628      the terminal sandbox, closes the OpenAI client, and cascades
 629      .close() into active child subagents.  Calling it while the agent
 630      is still processing would crash the in-flight request.  These tests
 631      pin that eviction skips any agent present in _running_agents.
 632      """
 633  
 634      def _runner(self):
 635          from collections import OrderedDict
 636          from gateway.run import GatewayRunner
 637  
 638          runner = GatewayRunner.__new__(GatewayRunner)
 639          runner._agent_cache = OrderedDict()
 640          runner._agent_cache_lock = threading.Lock()
 641          runner._running_agents = {}
 642          return runner
 643  
 644      def _fake_agent(self, idle_seconds: float = 0.0):
 645          import time as _t
 646          m = MagicMock()
 647          m._last_activity_ts = _t.time() - idle_seconds
 648          return m
 649  
 650      def test_cap_skips_active_lru_entry(self, monkeypatch):
 651          """Active LRU entry is skipped; cache stays over cap rather than
 652          compensating by evicting a newer entry.
 653  
 654          Rationale: evicting a more-recent entry just because the oldest
 655          slot is temporarily locked would punish the most recently-
 656          inserted session (which has no cache to preserve) to protect
 657          one that happens to be mid-turn.  Better to let the cache stay
 658          transiently over cap and re-check on the next insert.
 659          """
 660          from gateway import run as gw_run
 661  
 662          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 2)
 663          runner = self._runner()
 664          runner._cleanup_agent_resources = MagicMock()
 665  
 666          active = self._fake_agent()
 667          idle_a = self._fake_agent()
 668          idle_b = self._fake_agent()
 669  
 670          # Insertion order: active (oldest), idle_a, idle_b.
 671          runner._agent_cache["session-active"] = (active, "sig")
 672          runner._agent_cache["session-idle-a"] = (idle_a, "sig")
 673          runner._agent_cache["session-idle-b"] = (idle_b, "sig")
 674  
 675          # Mark `active` as mid-turn — it's LRU, but protected.
 676          runner._running_agents["session-active"] = active
 677  
 678          with runner._agent_cache_lock:
 679              runner._enforce_agent_cache_cap()
 680  
 681          # All three remain; no eviction ran, no cleanup dispatched.
 682          assert "session-active" in runner._agent_cache
 683          assert "session-idle-a" in runner._agent_cache
 684          assert "session-idle-b" in runner._agent_cache
 685          assert runner._cleanup_agent_resources.call_count == 0
 686  
 687      def test_cap_evicts_when_multiple_excess_and_some_inactive(self, monkeypatch):
 688          """Mixed active/idle in the LRU excess window: only the idle ones go.
 689  
 690          With CAP=2 and 4 entries, excess=2 (the two oldest).  If the
 691          oldest is active and the next is idle, we evict exactly one.
 692          Cache ends at CAP+1, which is still better than unbounded.
 693          """
 694          from gateway import run as gw_run
 695  
 696          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 2)
 697          runner = self._runner()
 698          runner._cleanup_agent_resources = MagicMock()
 699  
 700          oldest_active = self._fake_agent()
 701          idle_second = self._fake_agent()
 702          idle_third = self._fake_agent()
 703          idle_fourth = self._fake_agent()
 704  
 705          runner._agent_cache["s1"] = (oldest_active, "sig")
 706          runner._agent_cache["s2"] = (idle_second, "sig")  # in excess window, idle
 707          runner._agent_cache["s3"] = (idle_third, "sig")
 708          runner._agent_cache["s4"] = (idle_fourth, "sig")
 709  
 710          runner._running_agents["s1"] = oldest_active  # oldest is mid-turn
 711  
 712          with runner._agent_cache_lock:
 713              runner._enforce_agent_cache_cap()
 714  
 715          # s1 protected (active), s2 evicted (idle + in excess window),
 716          # s3 and s4 untouched (outside excess window).
 717          assert "s1" in runner._agent_cache
 718          assert "s2" not in runner._agent_cache
 719          assert "s3" in runner._agent_cache
 720          assert "s4" in runner._agent_cache
 721  
 722      def test_cap_leaves_cache_over_limit_if_all_active(self, monkeypatch, caplog):
 723          """If every over-cap entry is mid-turn, the cache stays over cap.
 724  
 725          Better to temporarily exceed the cap than to crash an in-flight
 726          turn by tearing down its clients.
 727          """
 728          from gateway import run as gw_run
 729          import logging as _logging
 730  
 731          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
 732          runner = self._runner()
 733          runner._cleanup_agent_resources = MagicMock()
 734  
 735          a1 = self._fake_agent()
 736          a2 = self._fake_agent()
 737          a3 = self._fake_agent()
 738          runner._agent_cache["s1"] = (a1, "sig")
 739          runner._agent_cache["s2"] = (a2, "sig")
 740          runner._agent_cache["s3"] = (a3, "sig")
 741  
 742          # All three are mid-turn.
 743          runner._running_agents["s1"] = a1
 744          runner._running_agents["s2"] = a2
 745          runner._running_agents["s3"] = a3
 746  
 747          with caplog.at_level(_logging.WARNING, logger="gateway.run"):
 748              with runner._agent_cache_lock:
 749                  runner._enforce_agent_cache_cap()
 750  
 751          # Cache unchanged because eviction had to skip every candidate.
 752          assert len(runner._agent_cache) == 3
 753          # _cleanup_agent_resources must NOT have been scheduled.
 754          assert runner._cleanup_agent_resources.call_count == 0
 755          # And we logged a warning so operators can see the condition.
 756          assert any("mid-turn" in r.message for r in caplog.records)
 757  
 758      def test_cap_pending_sentinel_does_not_block_eviction(self, monkeypatch):
 759          """_AGENT_PENDING_SENTINEL in _running_agents is treated as 'not active'.
 760  
 761          The sentinel is set while an agent is being CONSTRUCTED, before the
 762          real AIAgent instance exists.  Cached agents from other sessions
 763          can still be evicted safely.
 764          """
 765          from gateway import run as gw_run
 766          from gateway.run import _AGENT_PENDING_SENTINEL
 767  
 768          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
 769          runner = self._runner()
 770          runner._cleanup_agent_resources = MagicMock()
 771  
 772          a1 = self._fake_agent()
 773          a2 = self._fake_agent()
 774          runner._agent_cache["s1"] = (a1, "sig")
 775          runner._agent_cache["s2"] = (a2, "sig")
 776          # Another session is mid-creation — sentinel, no real agent yet.
 777          runner._running_agents["s3-being-created"] = _AGENT_PENDING_SENTINEL
 778  
 779          with runner._agent_cache_lock:
 780              runner._enforce_agent_cache_cap()
 781  
 782          assert "s1" not in runner._agent_cache  # evicted normally
 783          assert "s2" in runner._agent_cache
 784  
 785      def test_idle_sweep_skips_active_agent(self, monkeypatch):
 786          """Idle-TTL sweep must not tear down an active agent even if 'stale'."""
 787          from gateway import run as gw_run
 788  
 789          monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01)
 790          runner = self._runner()
 791          runner._cleanup_agent_resources = MagicMock()
 792  
 793          old_but_active = self._fake_agent(idle_seconds=10.0)
 794          runner._agent_cache["s1"] = (old_but_active, "sig")
 795          runner._running_agents["s1"] = old_but_active
 796  
 797          evicted = runner._sweep_idle_cached_agents()
 798  
 799          assert evicted == 0
 800          assert "s1" in runner._agent_cache
 801          assert runner._cleanup_agent_resources.call_count == 0
 802  
 803      def test_eviction_does_not_close_active_agent_client(self, monkeypatch):
 804          """Live test: evicting an active agent does NOT null its .client.
 805  
 806          This reproduces the original concern — if eviction fired while an
 807          agent was mid-turn, `agent.close()` would set `self.client = None`
 808          and the next API call inside the loop would crash.  With the
 809          active-agent skip, the client stays intact.
 810          """
 811          from gateway import run as gw_run
 812  
 813          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", 1)
 814          runner = self._runner()
 815  
 816          # Build a proper fake agent whose close() matches AIAgent's contract.
 817          active = MagicMock()
 818          active._last_activity_ts = __import__("time").time()
 819          active.client = MagicMock()  # simulate an OpenAI client
 820          def _real_close():
 821              active.client = None  # mirrors run_agent.py:3299
 822          active.close = _real_close
 823          active.shutdown_memory_provider = MagicMock()
 824  
 825          idle = self._fake_agent()
 826  
 827          runner._agent_cache["active-session"] = (active, "sig")
 828          runner._agent_cache["idle-session"] = (idle, "sig")
 829          runner._running_agents["active-session"] = active
 830  
 831          # Real cleanup function, not mocked — we want to see whether close()
 832          # runs on the active agent.  (It shouldn't.)
 833          with runner._agent_cache_lock:
 834              runner._enforce_agent_cache_cap()
 835  
 836          # Let any eviction cleanup threads drain.
 837          import time as _t
 838          _t.sleep(0.2)
 839  
 840          # The ACTIVE agent's client must still be usable.
 841          assert active.client is not None, (
 842              "Active agent's client was closed by eviction — "
 843              "running turn would crash on its next API call."
 844          )
 845  
 846  
 847  class TestAgentCacheSpilloverLive:
 848      """Live E2E: fill cache with real AIAgent instances and stress it."""
 849  
 850      def _runner(self):
 851          from collections import OrderedDict
 852          from gateway.run import GatewayRunner
 853  
 854          runner = GatewayRunner.__new__(GatewayRunner)
 855          runner._agent_cache = OrderedDict()
 856          runner._agent_cache_lock = threading.Lock()
 857          runner._running_agents = {}
 858          return runner
 859  
 860      def _real_agent(self):
 861          """A genuine AIAgent; no API calls are made during these tests."""
 862          from run_agent import AIAgent
 863          return AIAgent(
 864              model="anthropic/claude-sonnet-4", api_key="test",
 865              base_url="https://openrouter.ai/api/v1", provider="openrouter",
 866              max_iterations=5, quiet_mode=True,
 867              skip_context_files=True, skip_memory=True,
 868              platform="telegram",
 869          )
 870  
 871      def test_fill_to_cap_then_spillover(self, monkeypatch):
 872          """Fill to cap with real agents, insert one more, oldest evicted."""
 873          from gateway import run as gw_run
 874  
 875          CAP = 8
 876          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP)
 877          runner = self._runner()
 878  
 879          agents = [self._real_agent() for _ in range(CAP)]
 880          for i, a in enumerate(agents):
 881              with runner._agent_cache_lock:
 882                  runner._agent_cache[f"s{i}"] = (a, "sig")
 883                  runner._enforce_agent_cache_cap()
 884          assert len(runner._agent_cache) == CAP
 885  
 886          # Spillover insertion.
 887          newcomer = self._real_agent()
 888          with runner._agent_cache_lock:
 889              runner._agent_cache["new"] = (newcomer, "sig")
 890              runner._enforce_agent_cache_cap()
 891  
 892          # Oldest (s0) evicted, cap still CAP.
 893          assert "s0" not in runner._agent_cache
 894          assert "new" in runner._agent_cache
 895          assert len(runner._agent_cache) == CAP
 896  
 897          # Clean up so pytest doesn't leak resources.
 898          for a in agents + [newcomer]:
 899              try:
 900                  a.close()
 901              except Exception:
 902                  pass
 903  
 904      def test_spillover_all_active_keeps_cache_over_cap(self, monkeypatch, caplog):
 905          """Every slot active: cache goes over cap, no one gets torn down."""
 906          from gateway import run as gw_run
 907          import logging as _logging
 908  
 909          CAP = 4
 910          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP)
 911          runner = self._runner()
 912  
 913          agents = [self._real_agent() for _ in range(CAP)]
 914          for i, a in enumerate(agents):
 915              runner._agent_cache[f"s{i}"] = (a, "sig")
 916              runner._running_agents[f"s{i}"] = a  # every session mid-turn
 917  
 918          newcomer = self._real_agent()
 919          with caplog.at_level(_logging.WARNING, logger="gateway.run"):
 920              with runner._agent_cache_lock:
 921                  runner._agent_cache["new"] = (newcomer, "sig")
 922                  runner._enforce_agent_cache_cap()
 923  
 924          assert len(runner._agent_cache) == CAP + 1  # temporarily over cap
 925          # All existing agents still usable.
 926          for i, a in enumerate(agents):
 927              assert a.client is not None, f"s{i} got closed while active!"
 928          # And we warned operators.
 929          assert any("mid-turn" in r.message for r in caplog.records)
 930  
 931          for a in agents + [newcomer]:
 932              try:
 933                  a.close()
 934              except Exception:
 935                  pass
 936  
 937      def test_concurrent_inserts_settle_at_cap(self, monkeypatch):
 938          """Many threads inserting in parallel end with len(cache) == CAP."""
 939          from gateway import run as gw_run
 940  
 941          CAP = 16
 942          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP)
 943          runner = self._runner()
 944  
 945          N_THREADS = 8
 946          PER_THREAD = 20  # 8 * 20 = 160 inserts into a 16-slot cache
 947  
 948          def worker(tid: int):
 949              for j in range(PER_THREAD):
 950                  a = self._real_agent()
 951                  key = f"t{tid}-s{j}"
 952                  with runner._agent_cache_lock:
 953                      runner._agent_cache[key] = (a, "sig")
 954                      runner._enforce_agent_cache_cap()
 955  
 956          threads = [
 957              threading.Thread(target=worker, args=(t,), daemon=True)
 958              for t in range(N_THREADS)
 959          ]
 960          for t in threads:
 961              t.start()
 962          for t in threads:
 963              t.join(timeout=30)
 964              assert not t.is_alive(), "Worker thread hung — possible deadlock?"
 965  
 966          # Let daemon cleanup threads settle.
 967          import time as _t
 968          _t.sleep(0.5)
 969  
 970          assert len(runner._agent_cache) == CAP, (
 971              f"Expected exactly {CAP} entries after concurrent inserts, "
 972              f"got {len(runner._agent_cache)}."
 973          )
 974  
 975      def test_evicted_session_next_turn_gets_fresh_agent(self, monkeypatch):
 976          """After eviction, the same session_key can insert a fresh agent.
 977  
 978          Simulates the real spillover flow: evicted session sends another
 979          message, which builds a new AIAgent and re-enters the cache.
 980          """
 981          from gateway import run as gw_run
 982  
 983          CAP = 2
 984          monkeypatch.setattr(gw_run, "_AGENT_CACHE_MAX_SIZE", CAP)
 985          runner = self._runner()
 986  
 987          a0 = self._real_agent()
 988          a1 = self._real_agent()
 989          runner._agent_cache["sA"] = (a0, "sig")
 990          runner._agent_cache["sB"] = (a1, "sig")
 991  
 992          # 3rd session forces sA (oldest) out.
 993          a2 = self._real_agent()
 994          with runner._agent_cache_lock:
 995              runner._agent_cache["sC"] = (a2, "sig")
 996              runner._enforce_agent_cache_cap()
 997          assert "sA" not in runner._agent_cache
 998  
 999          # Let the eviction cleanup thread run.
1000          import time as _t
1001          _t.sleep(0.3)
1002  
1003          # Now sA's user sends another message → a fresh agent goes in.
1004          a0_new = self._real_agent()
1005          with runner._agent_cache_lock:
1006              runner._agent_cache["sA"] = (a0_new, "sig")
1007              runner._enforce_agent_cache_cap()
1008  
1009          assert "sA" in runner._agent_cache
1010          assert runner._agent_cache["sA"][0] is a0_new  # the new one, not stale
1011          # Fresh agent is usable.
1012          assert a0_new.client is not None
1013  
1014          for a in (a0, a1, a2, a0_new):
1015              try:
1016                  a.close()
1017              except Exception:
1018                  pass
1019  
1020  
1021  class TestAgentCacheIdleResume:
1022      """End-to-end: idle-TTL-evicted session resumes cleanly with task state.
1023  
1024      Real-world scenario: user leaves a Telegram session open for 2+ hours.
1025      Idle-TTL evicts their cached agent.  They come back and send a message.
1026      The new agent built for the same session_id must inherit:
1027        - Conversation history (from SessionStore — outside cache concern)
1028        - Terminal sandbox (same task_id → same _active_environments entry)
1029        - Browser daemon (same task_id → same browser session)
1030        - Background processes (same task_id → same process_registry entries)
1031      The ONLY thing that should reset is the LLM client pool (rebuilt fresh).
1032      """
1033  
1034      def _runner(self):
1035          from collections import OrderedDict
1036          from gateway.run import GatewayRunner
1037  
1038          runner = GatewayRunner.__new__(GatewayRunner)
1039          runner._agent_cache = OrderedDict()
1040          runner._agent_cache_lock = threading.Lock()
1041          runner._running_agents = {}
1042          return runner
1043  
1044      def test_release_clients_does_not_touch_process_registry(self, monkeypatch):
1045          """release_clients must not call process_registry.kill_all for task_id."""
1046          from run_agent import AIAgent
1047  
1048          agent = AIAgent(
1049              model="anthropic/claude-sonnet-4", api_key="test",
1050              base_url="https://openrouter.ai/api/v1", provider="openrouter",
1051              max_iterations=5, quiet_mode=True,
1052              skip_context_files=True, skip_memory=True,
1053              session_id="idle-resume-test-session",
1054          )
1055  
1056          # Spy on process_registry.kill_all — it MUST NOT be called.
1057          from tools import process_registry as _pr
1058          kill_all_calls: list = []
1059          original_kill_all = _pr.process_registry.kill_all
1060          _pr.process_registry.kill_all = lambda **kw: kill_all_calls.append(kw)
1061          try:
1062              agent.release_clients()
1063          finally:
1064              _pr.process_registry.kill_all = original_kill_all
1065              try:
1066                  agent.close()
1067              except Exception:
1068                  pass
1069  
1070          assert kill_all_calls == [], (
1071              f"release_clients() called process_registry.kill_all — would "
1072              f"kill user's bg processes on cache eviction. Calls: {kill_all_calls}"
1073          )
1074  
1075      def test_release_clients_does_not_touch_terminal_or_browser(self, monkeypatch):
1076          """release_clients must not call cleanup_vm or cleanup_browser."""
1077          from run_agent import AIAgent
1078          from tools import terminal_tool as _tt
1079          from tools import browser_tool as _bt
1080  
1081          agent = AIAgent(
1082              model="anthropic/claude-sonnet-4", api_key="test",
1083              base_url="https://openrouter.ai/api/v1", provider="openrouter",
1084              max_iterations=5, quiet_mode=True,
1085              skip_context_files=True, skip_memory=True,
1086              session_id="idle-resume-test-2",
1087          )
1088  
1089          vm_calls: list = []
1090          browser_calls: list = []
1091          original_vm = _tt.cleanup_vm
1092          original_browser = _bt.cleanup_browser
1093          _tt.cleanup_vm = lambda tid: vm_calls.append(tid)
1094          _bt.cleanup_browser = lambda tid: browser_calls.append(tid)
1095          try:
1096              agent.release_clients()
1097          finally:
1098              _tt.cleanup_vm = original_vm
1099              _bt.cleanup_browser = original_browser
1100              try:
1101                  agent.close()
1102              except Exception:
1103                  pass
1104  
1105          assert vm_calls == [], (
1106              f"release_clients() tore down terminal sandbox — user's cwd, "
1107              f"env, and bg shells would be gone on resume. Calls: {vm_calls}"
1108          )
1109          assert browser_calls == [], (
1110              f"release_clients() tore down browser session — user's open "
1111              f"tabs and cookies gone on resume. Calls: {browser_calls}"
1112          )
1113  
1114      def test_release_clients_closes_llm_client(self):
1115          """release_clients IS expected to close the OpenAI/httpx client."""
1116          from run_agent import AIAgent
1117  
1118          agent = AIAgent(
1119              model="anthropic/claude-sonnet-4", api_key="test",
1120              base_url="https://openrouter.ai/api/v1", provider="openrouter",
1121              max_iterations=5, quiet_mode=True,
1122              skip_context_files=True, skip_memory=True,
1123          )
1124          # Clients are lazy-built; force one to exist so we can verify close.
1125          assert agent.client is not None  # __init__ builds it
1126  
1127          agent.release_clients()
1128  
1129          # Post-release: client reference is dropped (memory freed).
1130          assert agent.client is None
1131  
1132      def test_close_vs_release_full_teardown_difference(self, monkeypatch):
1133          """close() tears down task state; release_clients() does not.
1134  
1135          This pins the semantic contract: session-expiry path uses close()
1136          (full teardown — session is done), cache-eviction path uses
1137          release_clients() (soft — session may resume).
1138          """
1139          from run_agent import AIAgent
1140          import run_agent as _ra
1141  
1142          # Agent A: evicted from cache (soft) — terminal survives.
1143          # Agent B: session expired (hard) — terminal torn down.
1144          agent_a = AIAgent(
1145              model="anthropic/claude-sonnet-4", api_key="test",
1146              base_url="https://openrouter.ai/api/v1", provider="openrouter",
1147              max_iterations=5, quiet_mode=True,
1148              skip_context_files=True, skip_memory=True,
1149              session_id="soft-session",
1150          )
1151          agent_b = AIAgent(
1152              model="anthropic/claude-sonnet-4", api_key="test",
1153              base_url="https://openrouter.ai/api/v1", provider="openrouter",
1154              max_iterations=5, quiet_mode=True,
1155              skip_context_files=True, skip_memory=True,
1156              session_id="hard-session",
1157          )
1158  
1159          vm_calls: list = []
1160          # AIAgent.close() calls the ``cleanup_vm`` name bound into
1161          # ``run_agent`` at import time, not ``tools.terminal_tool.cleanup_vm``
1162          # directly — so patch the ``run_agent`` reference.
1163          original_vm = _ra.cleanup_vm
1164          _ra.cleanup_vm = lambda tid: vm_calls.append(tid)
1165          try:
1166              agent_a.release_clients()   # cache eviction
1167              agent_b.close()              # session expiry
1168          finally:
1169              _ra.cleanup_vm = original_vm
1170              try:
1171                  agent_a.close()
1172              except Exception:
1173                  pass
1174  
1175          # Only agent_b's task_id should appear in cleanup calls.
1176          assert "hard-session" in vm_calls
1177          assert "soft-session" not in vm_calls
1178  
1179      def test_idle_evicted_session_rebuild_inherits_task_id(self, monkeypatch):
1180          """After idle-TTL eviction, a fresh agent with the same session_id
1181          gets the same task_id — so tool state (terminal/browser/bg procs)
1182          that persisted across eviction is reachable via the new agent.
1183          """
1184          from gateway import run as gw_run
1185          from run_agent import AIAgent
1186  
1187          monkeypatch.setattr(gw_run, "_AGENT_CACHE_IDLE_TTL_SECS", 0.01)
1188          runner = self._runner()
1189  
1190          # Build an agent representing a stale (idle) session.
1191          SESSION_ID = "long-lived-user-session"
1192          old = AIAgent(
1193              model="anthropic/claude-sonnet-4", api_key="test",
1194              base_url="https://openrouter.ai/api/v1", provider="openrouter",
1195              max_iterations=5, quiet_mode=True,
1196              skip_context_files=True, skip_memory=True,
1197              session_id=SESSION_ID,
1198          )
1199          old._last_activity_ts = 0.0  # force idle
1200          runner._agent_cache["sKey"] = (old, "sig")
1201  
1202          # Simulate the idle-TTL sweep firing.
1203          runner._sweep_idle_cached_agents()
1204          assert "sKey" not in runner._agent_cache
1205  
1206          # Wait for the daemon thread doing release_clients() to finish.
1207          import time as _t
1208          _t.sleep(0.3)
1209  
1210          # Old agent's client is gone (soft cleanup fired).
1211          assert old.client is None
1212  
1213          # User comes back — new agent built for the SAME session_id.
1214          new_agent = AIAgent(
1215              model="anthropic/claude-sonnet-4", api_key="test",
1216              base_url="https://openrouter.ai/api/v1", provider="openrouter",
1217              max_iterations=5, quiet_mode=True,
1218              skip_context_files=True, skip_memory=True,
1219              session_id=SESSION_ID,
1220          )
1221  
1222          # Same session_id means same task_id routed to tools.  The new
1223          # agent inherits any per-task state (terminal sandbox etc.) that
1224          # was preserved across eviction.
1225          assert new_agent.session_id == old.session_id == SESSION_ID
1226          # And it has a fresh working client.
1227          assert new_agent.client is not None
1228  
1229          try:
1230              new_agent.close()
1231          except Exception:
1232              pass
1233  
1234  
1235  _FAKE_NOW = 10_000.0  # Fixed epoch for deterministic time assertions
1236  
1237  
1238  class TestCachedAgentInactivityReset:
1239      """Inactivity-clock reset must be gated on _interrupt_depth == 0.
1240  
1241      On interrupt-recursive turns (_interrupt_depth > 0) the clock must
1242      keep accumulating so the inactivity watchdog can fire when a turn is
1243      stuck in an interrupt loop.  Resetting unconditionally prevented the
1244      30-min timeout from triggering (#15654).  The depth-0 reset is still
1245      needed: a session idle for 29 min must not trip the watchdog before
1246      the new turn makes its first API call (#9051).
1247      """
1248  
1249      def _fake_agent(self, stale_seconds: float = 1800.0):
1250          m = MagicMock()
1251          m._last_activity_ts = _FAKE_NOW - stale_seconds
1252          m._api_call_count = 10
1253          m._last_activity_desc = "previous turn activity"
1254          return m
1255  
1256      def test_fresh_turn_resets_idle_clock(self):
1257          """interrupt_depth=0: clock resets so a post-idle turn gets a
1258          fresh 30-min inactivity window (guard for #9051)."""
1259          from gateway.run import GatewayRunner
1260  
1261          agent = self._fake_agent(stale_seconds=1800.0)
1262          old_ts = agent._last_activity_ts
1263  
1264          with patch("gateway.run.time") as mock_time:
1265              mock_time.time.return_value = _FAKE_NOW
1266              GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=0)
1267  
1268          assert agent._last_activity_ts == _FAKE_NOW, (
1269              "_last_activity_ts was not reset on a fresh turn (interrupt_depth=0)"
1270          )
1271          assert agent._last_activity_ts > old_ts, (
1272              "Stale idle time should be cleared so the new turn gets a fresh window"
1273          )
1274  
1275      def test_fresh_turn_resets_desc(self):
1276          """interrupt_depth=0: description is updated to reflect the new turn."""
1277          from gateway.run import GatewayRunner
1278  
1279          agent = self._fake_agent()
1280  
1281          with patch("gateway.run.time") as mock_time:
1282              mock_time.time.return_value = _FAKE_NOW
1283              GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=0)
1284  
1285          assert agent._last_activity_desc == "starting new turn (cached)"
1286  
1287      def test_interrupt_turn_preserves_idle_clock(self):
1288          """interrupt_depth=1: clock preserved so accumulated stuck-turn
1289          idle time is not discarded by an interrupt-recursive re-entry (#15654)."""
1290          from gateway.run import GatewayRunner
1291  
1292          agent = self._fake_agent(stale_seconds=1200.0)
1293          old_ts = agent._last_activity_ts
1294  
1295          GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1)
1296  
1297          assert agent._last_activity_ts == old_ts, (
1298              "_last_activity_ts must not be reset on interrupt-recursive turns "
1299              "(interrupt_depth>0) — the watchdog needs the accumulated idle time"
1300          )
1301  
1302      def test_interrupt_turn_preserves_desc(self):
1303          """interrupt_depth=1: desc preserved — it is semantically paired with ts."""
1304          from gateway.run import GatewayRunner
1305  
1306          agent = self._fake_agent(stale_seconds=1200.0)
1307  
1308          GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1)
1309  
1310          assert agent._last_activity_desc == "previous turn activity", (
1311              "_last_activity_desc must not change on interrupt-recursive turns; "
1312              "it describes the activity *at* _last_activity_ts"
1313          )
1314  
1315      def test_deep_interrupt_recursion_preserves_idle_clock(self):
1316          """interrupt_depth=MAX-1: clock still preserved at any non-zero depth."""
1317          from gateway.run import GatewayRunner
1318  
1319          agent = self._fake_agent(stale_seconds=600.0)
1320          old_ts = agent._last_activity_ts
1321  
1322          GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=4)
1323  
1324          assert agent._last_activity_ts == old_ts
1325  
1326      def test_api_call_count_reset_regardless_of_depth(self):
1327          """_api_call_count is always reset to 0 for the new turn, at any depth."""
1328          from gateway.run import GatewayRunner
1329  
1330          agent_fresh = self._fake_agent()
1331          agent_interrupted = self._fake_agent()
1332  
1333          with patch("gateway.run.time") as mock_time:
1334              mock_time.time.return_value = _FAKE_NOW
1335              GatewayRunner._init_cached_agent_for_turn(agent_fresh, interrupt_depth=0)
1336          GatewayRunner._init_cached_agent_for_turn(agent_interrupted, interrupt_depth=1)
1337  
1338          assert agent_fresh._api_call_count == 0
1339          assert agent_interrupted._api_call_count == 0
1340  
1341      def test_watchdog_accumulation_across_recursive_turns(self):
1342          """Scenario: stuck turn + user interrupt → recursive turn.
1343  
1344          The idle time seen by the watchdog must reflect the full stuck
1345          duration, not restart from zero on the recursive re-entry.
1346          """
1347          from gateway.run import GatewayRunner
1348  
1349          STUCK_FOR = 1750.0
1350          agent = self._fake_agent(stale_seconds=STUCK_FOR)
1351  
1352          # Simulate: user sees "Still working..." and sends another message.
1353          # That triggers an interrupt → _run_agent recurses at depth=1.
1354          GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1)
1355  
1356          # Watchdog sees time.time() - _last_activity_ts ≥ STUCK_FOR.
1357          idle_secs = _FAKE_NOW - agent._last_activity_ts
1358          assert idle_secs >= STUCK_FOR - 1.0, (
1359              f"Watchdog would see {idle_secs:.0f}s idle, expected ~{STUCK_FOR}s. "
1360              "Inactivity timeout could not fire for a stuck interrupted turn."
1361          )