/ tests / agent / test_context_compressor.py
test_context_compressor.py
   1  """Tests for agent/context_compressor.py — compression logic, thresholds, truncation fallback."""
   2  
   3  import pytest
   4  from unittest.mock import patch, MagicMock
   5  
   6  from agent.context_compressor import ContextCompressor, SUMMARY_PREFIX
   7  
   8  
   9  @pytest.fixture()
  10  def compressor():
  11      """Create a ContextCompressor with mocked dependencies."""
  12      with patch("agent.context_compressor.get_model_context_length", return_value=100000):
  13          c = ContextCompressor(
  14              model="test/model",
  15              threshold_percent=0.85,
  16              protect_first_n=2,
  17              protect_last_n=2,
  18              quiet_mode=True,
  19          )
  20          return c
  21  
  22  
  23  class TestShouldCompress:
  24      def test_below_threshold(self, compressor):
  25          compressor.last_prompt_tokens = 50000
  26          assert compressor.should_compress() is False
  27  
  28      def test_above_threshold(self, compressor):
  29          compressor.last_prompt_tokens = 90000
  30          assert compressor.should_compress() is True
  31  
  32      def test_exact_threshold(self, compressor):
  33          compressor.last_prompt_tokens = 85000
  34          assert compressor.should_compress() is True
  35  
  36      def test_explicit_tokens(self, compressor):
  37          assert compressor.should_compress(prompt_tokens=90000) is True
  38          assert compressor.should_compress(prompt_tokens=50000) is False
  39  
  40  
  41  
  42  class TestUpdateFromResponse:
  43      def test_updates_fields(self, compressor):
  44          compressor.update_from_response({
  45              "prompt_tokens": 5000,
  46              "completion_tokens": 1000,
  47              "total_tokens": 6000,
  48          })
  49          assert compressor.last_prompt_tokens == 5000
  50          assert compressor.last_completion_tokens == 1000
  51  
  52      def test_missing_fields_default_zero(self, compressor):
  53          compressor.update_from_response({})
  54          assert compressor.last_prompt_tokens == 0
  55  
  56  
  57  
  58  class TestCompress:
  59      def _make_messages(self, n):
  60          return [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(n)]
  61  
  62      def test_too_few_messages_returns_unchanged(self, compressor):
  63          msgs = self._make_messages(4)  # protect_first=2 + protect_last=2 + 1 = 5 needed
  64          result = compressor.compress(msgs)
  65          assert result == msgs
  66  
  67      def test_truncation_fallback_no_client(self, compressor):
  68          # compressor has client=None, so should use truncation fallback
  69          msgs = [{"role": "system", "content": "System prompt"}] + self._make_messages(10)
  70          result = compressor.compress(msgs)
  71          assert len(result) < len(msgs)
  72          # Should keep system message and last N
  73          assert result[0]["role"] == "system"
  74          assert compressor.compression_count == 1
  75  
  76      def test_compression_increments_count(self, compressor):
  77          msgs = self._make_messages(10)
  78          compressor.compress(msgs)
  79          assert compressor.compression_count == 1
  80          compressor.compress(msgs)
  81          assert compressor.compression_count == 2
  82  
  83      def test_protects_first_and_last(self, compressor):
  84          msgs = self._make_messages(10)
  85          result = compressor.compress(msgs)
  86          # First 2 messages should be preserved (protect_first_n=2)
  87          # Last 2 messages should be preserved (protect_last_n=2)
  88          assert result[-1]["content"] == msgs[-1]["content"]
  89          # The second-to-last tail message may have the summary merged
  90          # into it when a double-collision prevents a standalone summary
  91          # (head=assistant, tail=user in this fixture).  Verify the
  92          # original content is present in either case.
  93          assert msgs[-2]["content"] in result[-2]["content"]
  94  
  95  
  96  class TestGenerateSummaryNoneContent:
  97      """Regression: content=None (from tool-call-only assistant messages) must not crash."""
  98  
  99      def test_none_content_does_not_crash(self):
 100          mock_response = MagicMock()
 101          mock_response.choices = [MagicMock()]
 102          mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: tool calls happened"
 103  
 104          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 105              c = ContextCompressor(model="test", quiet_mode=True)
 106  
 107          messages = [
 108              {"role": "user", "content": "do something"},
 109              {"role": "assistant", "content": None, "tool_calls": [
 110                  {"function": {"name": "search"}}
 111              ]},
 112              {"role": "tool", "content": "result"},
 113              {"role": "assistant", "content": None},
 114              {"role": "user", "content": "thanks"},
 115          ]
 116  
 117          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 118              summary = c._generate_summary(messages)
 119          assert isinstance(summary, str)
 120          assert summary.startswith(SUMMARY_PREFIX)
 121  
 122      def test_none_content_in_system_message_compress(self):
 123          """System message with content=None should not crash during compress."""
 124          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 125              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 126  
 127          msgs = [{"role": "system", "content": None}] + [
 128              {"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
 129              for i in range(10)
 130          ]
 131          result = c.compress(msgs)
 132          assert len(result) < len(msgs)
 133  
 134  
 135  class TestNonStringContent:
 136      """Regression: content as dict (e.g., llama.cpp tool calls) must not crash."""
 137  
 138      def test_dict_content_coerced_to_string(self):
 139          mock_response = MagicMock()
 140          mock_response.choices = [MagicMock()]
 141          mock_response.choices[0].message.content = {"text": "some summary"}
 142  
 143          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 144              c = ContextCompressor(model="test", quiet_mode=True)
 145  
 146          messages = [
 147              {"role": "user", "content": "do something"},
 148              {"role": "assistant", "content": "ok"},
 149          ]
 150  
 151          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 152              summary = c._generate_summary(messages)
 153          assert isinstance(summary, str)
 154          assert summary.startswith(SUMMARY_PREFIX)
 155  
 156      def test_none_content_coerced_to_empty(self):
 157          mock_response = MagicMock()
 158          mock_response.choices = [MagicMock()]
 159          mock_response.choices[0].message.content = None
 160  
 161          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 162              c = ContextCompressor(model="test", quiet_mode=True)
 163  
 164          messages = [
 165              {"role": "user", "content": "do something"},
 166              {"role": "assistant", "content": "ok"},
 167          ]
 168  
 169          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 170              summary = c._generate_summary(messages)
 171          # None content → empty string → standardized compaction handoff prefix added
 172          assert summary is not None
 173          assert summary == SUMMARY_PREFIX
 174  
 175      def test_summary_call_does_not_force_temperature(self):
 176          mock_response = MagicMock()
 177          mock_response.choices = [MagicMock()]
 178          mock_response.choices[0].message.content = "ok"
 179  
 180          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 181              c = ContextCompressor(model="test", quiet_mode=True)
 182  
 183          messages = [
 184              {"role": "user", "content": "do something"},
 185              {"role": "assistant", "content": "ok"},
 186          ]
 187  
 188          with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call:
 189              c._generate_summary(messages)
 190  
 191          kwargs = mock_call.call_args.kwargs
 192          assert "temperature" not in kwargs
 193  
 194      def test_summary_call_passes_live_main_runtime(self):
 195          mock_response = MagicMock()
 196          mock_response.choices = [MagicMock()]
 197          mock_response.choices[0].message.content = "ok"
 198  
 199          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 200              c = ContextCompressor(
 201                  model="gpt-5.4",
 202                  provider="openai-codex",
 203                  base_url="https://chatgpt.com/backend-api/codex",
 204                  api_key="codex-token",
 205                  api_mode="codex_responses",
 206                  quiet_mode=True,
 207              )
 208  
 209          messages = [
 210              {"role": "user", "content": "do something"},
 211              {"role": "assistant", "content": "ok"},
 212          ]
 213  
 214          with patch("agent.context_compressor.call_llm", return_value=mock_response) as mock_call:
 215              c._generate_summary(messages)
 216  
 217          assert mock_call.call_args.kwargs["main_runtime"] == {
 218              "model": "gpt-5.4",
 219              "provider": "openai-codex",
 220              "base_url": "https://chatgpt.com/backend-api/codex",
 221              "api_key": "codex-token",
 222              "api_mode": "codex_responses",
 223          }
 224  
 225  
 226  class TestSummaryFailureCooldown:
 227      def test_summary_failure_enters_cooldown_and_skips_retry(self):
 228          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 229              c = ContextCompressor(model="test", quiet_mode=True)
 230  
 231          messages = [
 232              {"role": "user", "content": "do something"},
 233              {"role": "assistant", "content": "ok"},
 234          ]
 235  
 236          with patch("agent.context_compressor.call_llm", side_effect=Exception("boom")) as mock_call:
 237              first = c._generate_summary(messages)
 238              second = c._generate_summary(messages)
 239  
 240          assert first is None
 241          assert second is None
 242          assert mock_call.call_count == 1
 243  
 244  
 245  class TestSummaryFallbackToMainModel:
 246      """When ``summary_model`` differs from the main model and the summary LLM
 247      call fails, the compressor should retry once on the main model before
 248      giving up — losing N turns of context is almost always worse than one
 249      extra summary attempt.  Covers both the fast-path (explicit
 250      model-not-found errors) and the unknown-error best-effort retry."""
 251  
 252      def _msgs(self):
 253          return [
 254              {"role": "user", "content": "do something"},
 255              {"role": "assistant", "content": "ok"},
 256          ]
 257  
 258      def test_model_not_found_404_falls_back_to_main_and_succeeds(self):
 259          """Classic misconfiguration: ``auxiliary.compression.model`` points at
 260          a model the main provider doesn't serve → 404 → retry on main."""
 261          mock_ok = MagicMock()
 262          mock_ok.choices = [MagicMock()]
 263          mock_ok.choices[0].message.content = "summary via main model"
 264  
 265          err_404 = Exception("404 model_not_found: no such model")
 266          err_404.status_code = 404
 267  
 268          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 269              c = ContextCompressor(
 270                  model="main-model",
 271                  summary_model_override="broken-aux-model",
 272                  quiet_mode=True,
 273              )
 274  
 275          with patch(
 276              "agent.context_compressor.call_llm",
 277              side_effect=[err_404, mock_ok],
 278          ) as mock_call:
 279              result = c._generate_summary(self._msgs())
 280  
 281          assert mock_call.call_count == 2
 282          # First call used the misconfigured aux model
 283          assert mock_call.call_args_list[0].kwargs.get("model") == "broken-aux-model"
 284          # Second call used the main model (no model kwarg → call_llm uses main)
 285          assert "model" not in mock_call.call_args_list[1].kwargs
 286          assert result is not None
 287          assert "summary via main model" in result
 288          # Aux-model failure is recorded even though retry succeeded — this is
 289          # how callers (gateway /compress, CLI warning) know to tell the user
 290          # their auxiliary.compression.model setting is broken.
 291          assert c._last_aux_model_failure_model == "broken-aux-model"
 292          assert c._last_aux_model_failure_error is not None
 293          assert "404" in c._last_aux_model_failure_error
 294  
 295      def test_unknown_error_falls_back_to_main_and_succeeds(self):
 296          """Errors that don't match the 404/503/model_not_found fast-path
 297          (400s, provider-specific 'no route', aggregator rejections) should
 298          ALSO trigger a best-effort retry on main before entering cooldown."""
 299          mock_ok = MagicMock()
 300          mock_ok.choices = [MagicMock()]
 301          mock_ok.choices[0].message.content = "summary via main model"
 302  
 303          # A 400 from OpenRouter / Nous portal with an opaque message — does
 304          # NOT match _is_model_not_found, but still an unrecoverable misconfig.
 305          err_400 = Exception("400 Bad Request: provider rejected model")
 306          err_400.status_code = 400
 307  
 308          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 309              c = ContextCompressor(
 310                  model="main-model",
 311                  summary_model_override="broken-aux-model",
 312                  quiet_mode=True,
 313              )
 314  
 315          with patch(
 316              "agent.context_compressor.call_llm",
 317              side_effect=[err_400, mock_ok],
 318          ) as mock_call:
 319              result = c._generate_summary(self._msgs())
 320  
 321          assert mock_call.call_count == 2
 322          assert mock_call.call_args_list[0].kwargs.get("model") == "broken-aux-model"
 323          assert "model" not in mock_call.call_args_list[1].kwargs
 324          assert result is not None
 325          assert "summary via main model" in result
 326          # Aux-model failure recorded despite successful recovery
 327          assert c._last_aux_model_failure_model == "broken-aux-model"
 328          assert c._last_aux_model_failure_error is not None
 329          assert "400" in c._last_aux_model_failure_error
 330  
 331      def test_no_fallback_when_summary_model_equals_main_model(self):
 332          """If the aux model IS the main model, there's nowhere to fall back
 333          to — go straight to cooldown, don't loop retrying the same call."""
 334          err = Exception("500 internal error")
 335  
 336          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 337              c = ContextCompressor(
 338                  model="main-model",
 339                  summary_model_override="main-model",  # same as main
 340                  quiet_mode=True,
 341              )
 342  
 343          with patch(
 344              "agent.context_compressor.call_llm",
 345              side_effect=err,
 346          ) as mock_call:
 347              result = c._generate_summary(self._msgs())
 348  
 349          # Only one attempt — retry gate blocks fallback when models match
 350          assert mock_call.call_count == 1
 351          assert result is None
 352          # Not flagged as fallen back — the retry condition was never met
 353          assert getattr(c, "_summary_model_fallen_back", False) is False
 354  
 355      def test_fallback_only_happens_once_per_compressor(self):
 356          """If the retry-on-main ALSO fails, don't loop forever — enter
 357          cooldown like the normal failure path."""
 358          err1 = Exception("400 aux model rejected")
 359          err2 = Exception("500 main model also exploded")
 360  
 361          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 362              c = ContextCompressor(
 363                  model="main-model",
 364                  summary_model_override="broken-aux-model",
 365                  quiet_mode=True,
 366              )
 367  
 368          with patch(
 369              "agent.context_compressor.call_llm",
 370              side_effect=[err1, err2],
 371          ) as mock_call:
 372              result = c._generate_summary(self._msgs())
 373  
 374          # Exactly 2 calls: initial + one retry on main.  No further retries.
 375          assert mock_call.call_count == 2
 376          assert result is None
 377          assert c._summary_model_fallen_back is True
 378  
 379  
 380  class TestAuxModelFallbackSurfacedToCallers:
 381      """When summary_model fails but retry-on-main succeeds, compress() must
 382      expose the aux-model failure via _last_aux_model_failure_{model,error}
 383      so gateway /compress and CLI callers can warn the user about their
 384      broken auxiliary.compression.model config — silent recovery would hide
 385      a misconfiguration only the user can fix."""
 386  
 387      def _make_msgs(self):
 388          return [
 389              {"role": "system", "content": "sys"},
 390              {"role": "user", "content": "msg 1"},
 391              {"role": "assistant", "content": "msg 2"},
 392              {"role": "user", "content": "msg 3"},
 393              {"role": "assistant", "content": "msg 4"},
 394              {"role": "user", "content": "msg 5"},
 395              {"role": "assistant", "content": "msg 6"},
 396              {"role": "user", "content": "msg 7"},
 397          ]
 398  
 399      def test_compress_exposes_aux_failure_fields_after_successful_fallback(self):
 400          mock_ok = MagicMock()
 401          mock_ok.choices = [MagicMock()]
 402          mock_ok.choices[0].message.content = "summary via main"
 403          err_400 = Exception("400 provider rejected configured model")
 404          err_400.status_code = 400
 405  
 406          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 407              c = ContextCompressor(
 408                  model="main-model",
 409                  summary_model_override="broken-aux-model",
 410                  quiet_mode=True,
 411                  protect_first_n=2,
 412                  protect_last_n=2,
 413              )
 414  
 415          with patch(
 416              "agent.context_compressor.call_llm",
 417              side_effect=[err_400, mock_ok],
 418          ):
 419              result = c.compress(self._make_msgs())
 420  
 421          # Recovery succeeded → no fallback placeholder
 422          assert c._last_summary_fallback_used is False
 423          # But aux-model failure IS recorded for the gateway/CLI warning
 424          assert c._last_aux_model_failure_model == "broken-aux-model"
 425          assert c._last_aux_model_failure_error is not None
 426          assert "400" in c._last_aux_model_failure_error
 427          # Result is well-formed with a real summary, not a placeholder
 428          assert any(
 429              isinstance(m.get("content"), str) and "summary via main" in m["content"]
 430              for m in result
 431          )
 432  
 433      def test_compress_clears_aux_failure_fields_at_start_of_next_call(self):
 434          """A subsequent successful compression must clear the aux-failure
 435          fields so the warning doesn't persist forever."""
 436          mock_ok = MagicMock()
 437          mock_ok.choices = [MagicMock()]
 438          mock_ok.choices[0].message.content = "summary via main"
 439          err_400 = Exception("400 aux model busted")
 440          err_400.status_code = 400
 441  
 442          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 443              c = ContextCompressor(
 444                  model="main-model",
 445                  summary_model_override="broken-aux-model",
 446                  quiet_mode=True,
 447                  protect_first_n=2,
 448                  protect_last_n=2,
 449              )
 450  
 451          # Call 1: aux fails, retry-on-main succeeds
 452          with patch(
 453              "agent.context_compressor.call_llm",
 454              side_effect=[err_400, mock_ok],
 455          ):
 456              c.compress(self._make_msgs())
 457          assert c._last_aux_model_failure_model == "broken-aux-model"
 458  
 459          # Call 2: clean run on main (summary_model was cleared to "" after
 460          # first fallback).  Aux-failure fields MUST reset at compress() start
 461          # so the old warning state doesn't leak into this call.
 462          with patch(
 463              "agent.context_compressor.call_llm",
 464              return_value=mock_ok,
 465          ):
 466              c.compress(self._make_msgs())
 467          assert c._last_aux_model_failure_model is None
 468          assert c._last_aux_model_failure_error is None
 469  
 470  
 471  class TestSummaryFailureTrackingForGatewayWarning:
 472      """When summary generation fails, the compressor must record dropped count
 473      + fallback flag so gateway hygiene & /compress can surface a visible
 474      warning instead of silently dropping context."""
 475  
 476      def test_compress_records_fallback_and_dropped_count_on_summary_failure(self):
 477          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 478              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 479  
 480          msgs = [
 481              {"role": "system", "content": "sys"},
 482              {"role": "user", "content": "msg 1"},
 483              {"role": "assistant", "content": "msg 2"},
 484              {"role": "user", "content": "msg 3"},
 485              {"role": "assistant", "content": "msg 4"},
 486              {"role": "user", "content": "msg 5"},
 487              {"role": "assistant", "content": "msg 6"},
 488              {"role": "user", "content": "msg 7"},
 489          ]
 490  
 491          # Simulate summary LLM call failing — covers the 404 / model-not-found
 492          # case from issue (auxiliary compression model misconfigured).
 493          with patch("agent.context_compressor.call_llm", side_effect=Exception("404 model not found")):
 494              result = c.compress(msgs)
 495  
 496          assert c._last_summary_fallback_used is True
 497          assert c._last_summary_dropped_count > 0
 498          assert c._last_summary_error is not None
 499          # Result must still be well-formed (fallback summary present).
 500          assert any(
 501              isinstance(m.get("content"), str) and "Summary generation was unavailable" in m["content"]
 502              for m in result
 503          )
 504  
 505      def test_compress_clears_fallback_flag_on_subsequent_success(self):
 506          mock_response = MagicMock()
 507          mock_response.choices = [MagicMock()]
 508          mock_response.choices[0].message.content = "summary text"
 509  
 510          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 511              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 512  
 513          msgs = [
 514              {"role": "system", "content": "sys"},
 515              {"role": "user", "content": "msg 1"},
 516              {"role": "assistant", "content": "msg 2"},
 517              {"role": "user", "content": "msg 3"},
 518              {"role": "assistant", "content": "msg 4"},
 519              {"role": "user", "content": "msg 5"},
 520              {"role": "assistant", "content": "msg 6"},
 521              {"role": "user", "content": "msg 7"},
 522          ]
 523  
 524          # First call fails, second succeeds — flag must reset on second compress.
 525          with patch("agent.context_compressor.call_llm", side_effect=Exception("boom")):
 526              c.compress(msgs)
 527          assert c._last_summary_fallback_used is True
 528  
 529          # Reset cooldown to allow retry on second compress
 530          c._summary_failure_cooldown_until = 0.0
 531          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 532              c.compress(msgs)
 533          assert c._last_summary_fallback_used is False
 534          assert c._last_summary_dropped_count == 0
 535  
 536  
 537  class TestSummaryPrefixNormalization:
 538      def test_legacy_prefix_is_replaced(self):
 539          summary = ContextCompressor._with_summary_prefix("[CONTEXT SUMMARY]: did work")
 540          assert summary == f"{SUMMARY_PREFIX}\ndid work"
 541  
 542      def test_existing_new_prefix_is_not_duplicated(self):
 543          summary = ContextCompressor._with_summary_prefix(f"{SUMMARY_PREFIX}\ndid work")
 544          assert summary == f"{SUMMARY_PREFIX}\ndid work"
 545  
 546  
 547  class TestCompressWithClient:
 548      def test_system_content_list_gets_compression_note_without_crashing(self):
 549          mock_response = MagicMock()
 550          mock_response.choices = [MagicMock()]
 551          mock_response.choices[0].message.content = "summary text"
 552  
 553          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 554              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 555  
 556          msgs = [
 557              {"role": "system", "content": [{"type": "text", "text": "system prompt"}]},
 558              {"role": "user", "content": "msg 1"},
 559              {"role": "assistant", "content": "msg 2"},
 560              {"role": "user", "content": "msg 3"},
 561              {"role": "assistant", "content": "msg 4"},
 562              {"role": "user", "content": "msg 5"},
 563              {"role": "assistant", "content": "msg 6"},
 564              {"role": "user", "content": "msg 7"},
 565          ]
 566  
 567          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 568              result = c.compress(msgs)
 569  
 570          assert isinstance(result[0]["content"], list)
 571          assert any(
 572              isinstance(block, dict)
 573              and "compacted into a handoff summary" in block.get("text", "")
 574              for block in result[0]["content"]
 575          )
 576  
 577      def test_summarization_path(self):
 578          mock_client = MagicMock()
 579          mock_response = MagicMock()
 580          mock_response.choices = [MagicMock()]
 581          mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
 582          mock_client.chat.completions.create.return_value = mock_response
 583  
 584          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 585              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 586  
 587          msgs = [{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} for i in range(10)]
 588          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 589              result = c.compress(msgs)
 590  
 591          # Should have summary message in the middle
 592          contents = [m.get("content", "") for m in result]
 593          assert any(c.startswith(SUMMARY_PREFIX) for c in contents)
 594          assert len(result) < len(msgs)
 595  
 596      def test_summarization_does_not_split_tool_call_pairs(self):
 597          mock_client = MagicMock()
 598          mock_response = MagicMock()
 599          mock_response.choices = [MagicMock()]
 600          mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle"
 601          mock_client.chat.completions.create.return_value = mock_response
 602  
 603          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 604              c = ContextCompressor(
 605                  model="test",
 606                  quiet_mode=True,
 607                  protect_first_n=3,
 608                  protect_last_n=4,
 609              )
 610  
 611          msgs = [
 612              {"role": "user", "content": "Could you address the reviewer comments in PR#71"},
 613              {
 614                  "role": "assistant",
 615                  "content": "",
 616                  "tool_calls": [
 617                      {"id": "call_a", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}},
 618                      {"id": "call_b", "type": "function", "function": {"name": "skill_view", "arguments": "{}"}},
 619                  ],
 620              },
 621              {"role": "tool", "tool_call_id": "call_a", "content": "output a"},
 622              {"role": "tool", "tool_call_id": "call_b", "content": "output b"},
 623              {"role": "user", "content": "later 1"},
 624              {"role": "assistant", "content": "later 2"},
 625              {"role": "tool", "tool_call_id": "call_x", "content": "later output"},
 626              {"role": "assistant", "content": "later 3"},
 627              {"role": "user", "content": "later 4"},
 628          ]
 629  
 630          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 631              result = c.compress(msgs)
 632  
 633          answered_ids = {
 634              msg.get("tool_call_id")
 635              for msg in result
 636              if msg.get("role") == "tool" and msg.get("tool_call_id")
 637          }
 638          for msg in result:
 639              if msg.get("role") == "assistant" and msg.get("tool_calls"):
 640                  for tc in msg["tool_calls"]:
 641                      assert tc["id"] in answered_ids
 642  
 643      def test_sanitizer_matches_responses_call_id_when_id_differs(self, compressor):
 644          msgs = [
 645              {
 646                  "role": "assistant",
 647                  "content": "",
 648                  "tool_calls": [
 649                      {
 650                          "id": "fc_123",
 651                          "call_id": "call_123",
 652                          "response_item_id": "fc_123",
 653                          "type": "function",
 654                          "function": {"name": "search_files", "arguments": "{}"},
 655                      }
 656                  ],
 657              },
 658              {"role": "tool", "tool_call_id": "call_123", "content": "result"},
 659          ]
 660  
 661          sanitized = compressor._sanitize_tool_pairs(msgs)
 662  
 663          assert [m.get("tool_call_id") for m in sanitized if m.get("role") == "tool"] == [
 664              "call_123"
 665          ]
 666  
 667      def test_summary_role_avoids_consecutive_user_messages(self):
 668          """Summary role should alternate with the last head message to avoid consecutive same-role messages."""
 669          mock_client = MagicMock()
 670          mock_response = MagicMock()
 671          mock_response.choices = [MagicMock()]
 672          mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
 673          mock_client.chat.completions.create.return_value = mock_response
 674  
 675          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 676              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 677  
 678          # Last head message (index 1) is "assistant" → summary should be "user".
 679          # With min_tail=3, tail = last 3 messages (indices 5-7).
 680          # head_last=assistant, tail_first=assistant → summary_role="user", no collision.
 681          # Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6.
 682          msgs = [
 683              {"role": "user", "content": "msg 0"},
 684              {"role": "assistant", "content": "msg 1"},
 685              {"role": "user", "content": "msg 2"},
 686              {"role": "assistant", "content": "msg 3"},
 687              {"role": "user", "content": "msg 4"},
 688              {"role": "assistant", "content": "msg 5"},
 689              {"role": "user", "content": "msg 6"},
 690              {"role": "assistant", "content": "msg 7"},
 691          ]
 692          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 693              result = c.compress(msgs)
 694          summary_msg = [
 695              m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)
 696          ]
 697          assert len(summary_msg) == 1
 698          assert summary_msg[0]["role"] == "user"
 699  
 700      def test_summary_role_avoids_consecutive_user_when_head_ends_with_user(self):
 701          """When last head message is 'user', summary must be 'assistant' to avoid two consecutive user messages."""
 702          mock_client = MagicMock()
 703          mock_response = MagicMock()
 704          mock_response.choices = [MagicMock()]
 705          mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: stuff happened"
 706          mock_client.chat.completions.create.return_value = mock_response
 707  
 708          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 709              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=2)
 710  
 711          # Last head message (index 2) is "user" → summary should be "assistant"
 712          msgs = [
 713              {"role": "system", "content": "system prompt"},
 714              {"role": "user", "content": "msg 1"},
 715              {"role": "user", "content": "msg 2"},  # last head — user
 716              {"role": "assistant", "content": "msg 3"},
 717              {"role": "user", "content": "msg 4"},
 718              {"role": "assistant", "content": "msg 5"},
 719              {"role": "user", "content": "msg 6"},
 720              {"role": "assistant", "content": "msg 7"},
 721          ]
 722          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 723              result = c.compress(msgs)
 724          summary_msg = [
 725              m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)
 726          ]
 727          assert len(summary_msg) == 1
 728          assert summary_msg[0]["role"] == "assistant"
 729  
 730      def test_summary_role_flips_to_avoid_tail_collision(self):
 731          """When summary role collides with the first tail message but flipping
 732          doesn't collide with head, the role should be flipped."""
 733          mock_response = MagicMock()
 734          mock_response.choices = [MagicMock()]
 735          mock_response.choices[0].message.content = "summary text"
 736  
 737          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 738              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 739  
 740          # Head ends with tool (index 1), tail starts with user (index 6).
 741          # Default: tool → summary_role="user" → collides with tail.
 742          # Flip to "assistant" → tool→assistant is fine.
 743          msgs = [
 744              {"role": "user", "content": "msg 0"},
 745              {"role": "assistant", "content": "", "tool_calls": [
 746                  {"id": "call_1", "type": "function", "function": {"name": "t", "arguments": "{}"}},
 747              ]},
 748              {"role": "tool", "tool_call_id": "call_1", "content": "result 1"},
 749              {"role": "assistant", "content": "msg 3"},
 750              {"role": "user", "content": "msg 4"},
 751              {"role": "assistant", "content": "msg 5"},
 752              {"role": "user", "content": "msg 6"},
 753              {"role": "assistant", "content": "msg 7"},
 754          ]
 755          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 756              result = c.compress(msgs)
 757          # Verify no consecutive user or assistant messages
 758          for i in range(1, len(result)):
 759              r1 = result[i - 1].get("role")
 760              r2 = result[i].get("role")
 761              if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
 762                  assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
 763  
 764      def test_double_collision_merges_summary_into_tail(self):
 765          """When neither role avoids collision with both neighbors, the summary
 766          should be merged into the first tail message rather than creating a
 767          standalone message that breaks role alternation.
 768  
 769          Common scenario: head ends with 'assistant', tail starts with 'user'.
 770          summary='user' collides with tail, summary='assistant' collides with head.
 771          """
 772          mock_response = MagicMock()
 773          mock_response.choices = [MagicMock()]
 774          mock_response.choices[0].message.content = "summary text"
 775  
 776          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 777              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=3)
 778  
 779          # Head: [system, user, assistant]  →  last head = assistant
 780          # Tail: [user, assistant, user]    →  first tail = user
 781          # summary_role="user" collides with tail, "assistant" collides with head → merge
 782          msgs = [
 783              {"role": "system", "content": "system prompt"},
 784              {"role": "user", "content": "msg 1"},
 785              {"role": "assistant", "content": "msg 2"},
 786              {"role": "user", "content": "msg 3"},      # compressed
 787              {"role": "assistant", "content": "msg 4"},  # compressed
 788              {"role": "user", "content": "msg 5"},       # compressed
 789              {"role": "user", "content": "msg 6"},       # tail start
 790              {"role": "assistant", "content": "msg 7"},
 791              {"role": "user", "content": "msg 8"},
 792          ]
 793          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 794              result = c.compress(msgs)
 795  
 796          # Verify no consecutive user or assistant messages
 797          for i in range(1, len(result)):
 798              r1 = result[i - 1].get("role")
 799              r2 = result[i].get("role")
 800              if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
 801                  assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
 802  
 803          # The summary text should be merged into the first tail message
 804          first_tail = [m for m in result if "msg 6" in (m.get("content") or "")]
 805          assert len(first_tail) == 1
 806          assert "summary text" in first_tail[0]["content"]
 807  
 808      def test_double_collision_merges_summary_into_list_tail_content(self):
 809          """Structured tail content should accept a merged summary without TypeError."""
 810          mock_response = MagicMock()
 811          mock_response.choices = [MagicMock()]
 812          mock_response.choices[0].message.content = "summary text"
 813  
 814          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 815              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=3, protect_last_n=3)
 816  
 817          msgs = [
 818              {"role": "system", "content": "system prompt"},
 819              {"role": "user", "content": "msg 1"},
 820              {"role": "assistant", "content": "msg 2"},
 821              {"role": "user", "content": "msg 3"},
 822              {"role": "assistant", "content": "msg 4"},
 823              {"role": "user", "content": "msg 5"},
 824              {"role": "user", "content": [{"type": "text", "text": "msg 6"}]},
 825              {"role": "assistant", "content": "msg 7"},
 826              {"role": "user", "content": "msg 8"},
 827          ]
 828  
 829          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 830              result = c.compress(msgs)
 831  
 832          merged_tail = next(
 833              m for m in result
 834              if m.get("role") == "user" and isinstance(m.get("content"), list)
 835          )
 836          assert isinstance(merged_tail["content"], list)
 837          assert "summary text" in merged_tail["content"][0]["text"]
 838          assert any(
 839              isinstance(block, dict) and block.get("text") == "msg 6"
 840              for block in merged_tail["content"]
 841          )
 842  
 843      def test_double_collision_user_head_assistant_tail(self):
 844          """Reverse double collision: head ends with 'user', tail starts with 'assistant'.
 845          summary='assistant' collides with tail, 'user' collides with head → merge."""
 846          mock_response = MagicMock()
 847          mock_response.choices = [MagicMock()]
 848          mock_response.choices[0].message.content = "summary text"
 849  
 850          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 851              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 852  
 853          # Head: [system, user]        → last head = user
 854          # Tail: [assistant, user, assistant] → first tail = assistant
 855          # summary_role="assistant" collides with tail, "user" collides with head → merge
 856          # With min_tail=3, tail = last 3 messages (indices 5-7).
 857          # Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6.
 858          msgs = [
 859              {"role": "system", "content": "system prompt"},
 860              {"role": "user", "content": "msg 1"},
 861              {"role": "assistant", "content": "msg 2"},   # compressed
 862              {"role": "user", "content": "msg 3"},        # compressed
 863              {"role": "assistant", "content": "msg 4"},   # compressed
 864              {"role": "assistant", "content": "msg 5"},   # tail start
 865              {"role": "user", "content": "msg 6"},
 866              {"role": "assistant", "content": "msg 7"},
 867          ]
 868          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 869              result = c.compress(msgs)
 870  
 871          # Verify no consecutive user or assistant messages
 872          for i in range(1, len(result)):
 873              r1 = result[i - 1].get("role")
 874              r2 = result[i].get("role")
 875              if r1 in ("user", "assistant") and r2 in ("user", "assistant"):
 876                  assert r1 != r2, f"consecutive {r1} at indices {i-1},{i}"
 877  
 878          # The summary should be merged into the first tail message (assistant at index 5)
 879          first_tail = [m for m in result if "msg 5" in (m.get("content") or "")]
 880          assert len(first_tail) == 1
 881          assert "summary text" in first_tail[0]["content"]
 882  
 883      def test_no_collision_scenarios_still_work(self):
 884          """Verify that the common no-collision cases (head=assistant/tail=assistant,
 885          head=user/tail=user) still produce a standalone summary message."""
 886          mock_response = MagicMock()
 887          mock_response.choices = [MagicMock()]
 888          mock_response.choices[0].message.content = "summary text"
 889  
 890          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 891              c = ContextCompressor(model="test", quiet_mode=True, protect_first_n=2, protect_last_n=2)
 892  
 893          # Head=assistant, Tail=assistant → summary_role="user", no collision.
 894          # With min_tail=3, tail = last 3 messages (indices 5-7).
 895          # Need 8 messages: min_for_compress = 2+3+1 = 6, must have > 6.
 896          msgs = [
 897              {"role": "user", "content": "msg 0"},
 898              {"role": "assistant", "content": "msg 1"},
 899              {"role": "user", "content": "msg 2"},
 900              {"role": "assistant", "content": "msg 3"},
 901              {"role": "user", "content": "msg 4"},
 902              {"role": "assistant", "content": "msg 5"},
 903              {"role": "user", "content": "msg 6"},
 904              {"role": "assistant", "content": "msg 7"},
 905          ]
 906          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 907              result = c.compress(msgs)
 908          summary_msgs = [m for m in result if (m.get("content") or "").startswith(SUMMARY_PREFIX)]
 909          assert len(summary_msgs) == 1, "should have a standalone summary message"
 910          assert summary_msgs[0]["role"] == "user"
 911  
 912      def test_summarization_does_not_start_tail_with_tool_outputs(self):
 913          mock_response = MagicMock()
 914          mock_response.choices = [MagicMock()]
 915          mock_response.choices[0].message.content = "[CONTEXT SUMMARY]: compressed middle"
 916  
 917          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
 918              c = ContextCompressor(
 919                  model="test",
 920                  quiet_mode=True,
 921                  protect_first_n=2,
 922                  protect_last_n=3,
 923              )
 924  
 925          msgs = [
 926              {"role": "user", "content": "earlier 1"},
 927              {"role": "assistant", "content": "earlier 2"},
 928              {"role": "user", "content": "earlier 3"},
 929              {
 930                  "role": "assistant",
 931                  "content": "",
 932                  "tool_calls": [
 933                      {"id": "call_c", "type": "function", "function": {"name": "search_files", "arguments": "{}"}},
 934                  ],
 935              },
 936              {"role": "tool", "tool_call_id": "call_c", "content": "output c"},
 937              {"role": "user", "content": "latest user"},
 938          ]
 939  
 940          with patch("agent.context_compressor.call_llm", return_value=mock_response):
 941              result = c.compress(msgs)
 942  
 943          called_ids = {
 944              tc["id"]
 945              for msg in result
 946              if msg.get("role") == "assistant" and msg.get("tool_calls")
 947              for tc in msg["tool_calls"]
 948          }
 949          for msg in result:
 950              if msg.get("role") == "tool" and msg.get("tool_call_id"):
 951                  assert msg["tool_call_id"] in called_ids
 952  
 953  
 954  class TestSummaryTargetRatio:
 955      """Verify that summary_target_ratio properly scales budgets with context window."""
 956  
 957      def test_tail_budget_scales_with_context(self):
 958          """Tail token budget should be threshold_tokens * summary_target_ratio."""
 959          with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
 960              c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40)
 961          # 200K * 0.50 threshold * 0.40 ratio = 40K
 962          assert c.tail_token_budget == 40_000
 963  
 964          with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000):
 965              c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.40)
 966          # 1M * 0.50 threshold * 0.40 ratio = 200K
 967          assert c.tail_token_budget == 200_000
 968  
 969      def test_summary_cap_scales_with_context(self):
 970          """Max summary tokens should be 5% of context, capped at 12K."""
 971          with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
 972              c = ContextCompressor(model="test", quiet_mode=True)
 973          assert c.max_summary_tokens == 10_000  # 200K * 0.05
 974  
 975          with patch("agent.context_compressor.get_model_context_length", return_value=1_000_000):
 976              c = ContextCompressor(model="test", quiet_mode=True)
 977          assert c.max_summary_tokens == 12_000  # capped at 12K ceiling
 978  
 979      def test_ratio_clamped(self):
 980          """Ratio should be clamped to [0.10, 0.80]."""
 981          with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
 982              c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.05)
 983          assert c.summary_target_ratio == 0.10
 984  
 985          with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
 986              c = ContextCompressor(model="test", quiet_mode=True, summary_target_ratio=0.95)
 987          assert c.summary_target_ratio == 0.80
 988  
 989      def test_default_threshold_is_50_percent(self):
 990          """Default compression threshold should be 50%, with a 64K floor."""
 991          with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
 992              c = ContextCompressor(model="test", quiet_mode=True)
 993          assert c.threshold_percent == 0.50
 994          # 50% of 100K = 50K, but the floor is 64K
 995          assert c.threshold_tokens == 64_000
 996  
 997      def test_threshold_floor_does_not_apply_above_128k(self):
 998          """On large-context models the 50% percentage is used directly."""
 999          with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
1000              c = ContextCompressor(model="test", quiet_mode=True)
1001          # 50% of 200K = 100K, which is above the 64K floor
1002          assert c.threshold_tokens == 100_000
1003  
1004      def test_default_protect_last_n_is_20(self):
1005          """Default protect_last_n should be 20."""
1006          with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
1007              c = ContextCompressor(model="test", quiet_mode=True)
1008          assert c.protect_last_n == 20
1009  
1010  
1011  class TestTokenBudgetTailProtection:
1012      """Tests for token-budget-based tail protection (PR #6240).
1013  
1014      The core change: tail protection is now based on a token budget rather
1015      than a fixed message count.  This prevents large tool outputs from
1016      blocking compaction.
1017      """
1018  
1019      @pytest.fixture()
1020      def budget_compressor(self):
1021          """Compressor with known token budget for tail protection tests."""
1022          with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
1023              c = ContextCompressor(
1024                  model="test/model",
1025                  threshold_percent=0.50,  # 100K threshold
1026                  protect_first_n=2,
1027                  protect_last_n=20,
1028                  quiet_mode=True,
1029              )
1030              return c
1031  
1032      def test_large_tool_outputs_no_longer_block_compaction(self, budget_compressor):
1033          """The motivating scenario: 20 messages with large tool outputs should
1034          NOT prevent compaction.  With message-count tail protection they would
1035          all be protected, leaving nothing to summarize."""
1036          c = budget_compressor
1037          messages = [
1038              {"role": "user", "content": "Start task"},
1039              {"role": "assistant", "content": "On it"},
1040          ]
1041          # Add 20 messages with large tool outputs (~5K chars each ≈ 1250 tokens)
1042          for i in range(10):
1043              messages.append({
1044                  "role": "assistant", "content": None,
1045                  "tool_calls": [{"function": {"name": f"tool_{i}", "arguments": "{}"}}],
1046              })
1047              messages.append({
1048                  "role": "tool", "content": "x" * 5000,
1049                  "tool_call_id": f"call_{i}",
1050              })
1051          # Add 3 recent small messages
1052          messages.append({"role": "user", "content": "What's the status?"})
1053          messages.append({"role": "assistant", "content": "Here's what I found..."})
1054          messages.append({"role": "user", "content": "Continue"})
1055  
1056          # The tail cut should NOT protect all 20 tool messages
1057          head_end = c.protect_first_n
1058          cut = c._find_tail_cut_by_tokens(messages, head_end)
1059          tail_size = len(messages) - cut
1060          # With token budget, the tail should be much smaller than 20+
1061          assert tail_size < 20, f"Tail {tail_size} messages — large tool outputs are blocking compaction"
1062          # But at least 3 (hard minimum)
1063          assert tail_size >= 3
1064  
1065      def test_min_tail_always_3_messages(self, budget_compressor):
1066          """Even with a tiny token budget, at least 3 messages are protected."""
1067          c = budget_compressor
1068          # Override to a tiny budget
1069          c.tail_token_budget = 10
1070          messages = [
1071              {"role": "user", "content": "hello"},
1072              {"role": "assistant", "content": "hi"},
1073              {"role": "user", "content": "do something"},
1074              {"role": "assistant", "content": "working on it"},
1075              {"role": "user", "content": "more work"},
1076              {"role": "assistant", "content": "done"},
1077              {"role": "user", "content": "thanks"},
1078          ]
1079          head_end = 2
1080          cut = c._find_tail_cut_by_tokens(messages, head_end)
1081          tail_size = len(messages) - cut
1082          assert tail_size >= 3, f"Tail is only {tail_size} messages, min should be 3"
1083  
1084      def test_soft_ceiling_allows_oversized_message(self, budget_compressor):
1085          """The 1.5x soft ceiling allows an oversized message to be included
1086          rather than splitting it."""
1087          c = budget_compressor
1088          # Set a small budget — 500 tokens
1089          c.tail_token_budget = 500
1090          messages = [
1091              {"role": "user", "content": "hello"},
1092              {"role": "assistant", "content": "hi"},
1093              {"role": "user", "content": "read the file"},
1094              # This message is ~600 tokens (> budget of 500, but < 1.5x = 750)
1095              {"role": "assistant", "content": "a" * 2400},
1096              {"role": "user", "content": "short"},
1097              {"role": "assistant", "content": "short reply"},
1098              {"role": "user", "content": "continue"},
1099          ]
1100          head_end = 2
1101          cut = c._find_tail_cut_by_tokens(messages, head_end)
1102          # The oversized message at index 3 should NOT be the cut point
1103          # because 1.5x ceiling = 750 tokens and accumulated would be ~610
1104          # (short msgs + oversized msg) which is < 750
1105          tail_size = len(messages) - cut
1106          assert tail_size >= 3
1107  
1108      def test_small_conversation_still_compresses(self, budget_compressor):
1109          """With the new min of 8 messages (head=2 + 3 + 1 guard + 2 middle),
1110          a small but compressible conversation should still compress."""
1111          c = budget_compressor
1112          # 9 messages: head(2) + 4 middle + 3 tail = compressible
1113          messages = []
1114          for i in range(9):
1115              role = "user" if i % 2 == 0 else "assistant"
1116              messages.append({"role": role, "content": f"Message {i}"})
1117  
1118          # Should not early-return (needs > protect_first_n + 3 + 1 = 6)
1119          # Mock the summary generation to avoid real API call
1120          with patch.object(c, "_generate_summary", return_value="Summary of conversation"):
1121              result = c.compress(messages, current_tokens=90_000)
1122          # Should have compressed (fewer messages than original)
1123          assert len(result) < len(messages)
1124  
1125      def test_prune_with_token_budget(self, budget_compressor):
1126          """_prune_old_tool_results with protect_tail_tokens respects the budget."""
1127          c = budget_compressor
1128          messages = [
1129              {"role": "user", "content": "start"},
1130              {"role": "assistant", "content": None,
1131               "tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "big.txt"}'}}]},
1132              {"role": "tool", "content": "x" * 10000, "tool_call_id": "c1"},  # ~2500 tokens
1133              {"role": "assistant", "content": None,
1134               "tool_calls": [{"function": {"name": "read_file", "arguments": '{"path": "small.txt"}'}}]},
1135              {"role": "tool", "content": "y" * 10000, "tool_call_id": "c2"},  # ~2500 tokens
1136              {"role": "user", "content": "short recent message"},
1137              {"role": "assistant", "content": "short reply"},
1138          ]
1139          # With a 1000-token budget, only the last couple messages should be protected
1140          result, pruned = c._prune_old_tool_results(
1141              messages, protect_tail_count=2, protect_tail_tokens=1000,
1142          )
1143          # At least one old tool result should have been pruned
1144          assert pruned >= 1
1145  
1146      def test_prune_short_conv_protects_entire_tail(self, budget_compressor):
1147          """Regression guard for PR #17025.
1148  
1149          When ``len(messages) <= protect_tail_count`` and a token budget is
1150          also set, every message must be protected. The previous code used
1151          ``min(protect_tail_count, len(result) - 1)`` which capped the floor
1152          one below the full length, leaving the oldest message eligible for
1153          pruning.
1154          """
1155          c = budget_compressor
1156          # 4 messages, protect_tail_count=4 -- nothing should be pruned.
1157          # Oldest message is a large tool result; on the buggy path it falls
1158          # outside the protected window and gets summarized.
1159          messages = [
1160              {"role": "tool", "content": "x" * 5000, "tool_call_id": "c0"},
1161              {"role": "assistant", "content": "ack"},
1162              {"role": "user", "content": "recent"},
1163              {"role": "assistant", "content": "reply"},
1164          ]
1165          result, pruned = c._prune_old_tool_results(
1166              messages,
1167              protect_tail_count=4,
1168              protect_tail_tokens=1_000_000,  # budget large enough to protect all
1169          )
1170          assert pruned == 0
1171          # Tool result at index 0 must be preserved verbatim
1172          assert result[0]["content"] == "x" * 5000
1173  
1174      def test_prune_without_token_budget_uses_message_count(self, budget_compressor):
1175          """Without protect_tail_tokens, falls back to message-count behavior."""
1176          c = budget_compressor
1177          messages = [
1178              {"role": "user", "content": "start"},
1179              {"role": "assistant", "content": None,
1180               "tool_calls": [{"function": {"name": "tool", "arguments": "{}"}}]},
1181              {"role": "tool", "content": "x" * 5000, "tool_call_id": "c1"},
1182              {"role": "user", "content": "recent"},
1183              {"role": "assistant", "content": "reply"},
1184          ]
1185          # protect_tail_count=3 means last 3 messages protected
1186          result, pruned = c._prune_old_tool_results(
1187              messages, protect_tail_count=3,
1188          )
1189          # Tool at index 2 is outside the protected tail (last 3 = indices 2,3,4)
1190          # so it might or might not be pruned depending on boundary
1191          assert isinstance(pruned, int)
1192  
1193      def test_multimodal_message_accumulates_text_chars_not_block_count(self, budget_compressor):
1194          """_find_tail_cut_by_tokens must use text char count, not list length,
1195          for multimodal content. Regression guard for #16087.
1196  
1197          Setup: 6 messages, budget=80 (soft_ceiling=120).  The multimodal message
1198          at index 1 has 500 chars of text → 135 tokens (correct) or 10 tokens (bug).
1199  
1200          Fixed path: walk stops at the multimodal (44+135=179 > 120), cut stays at 2,
1201          tail = messages[2:] = 4 messages.
1202  
1203          Bug path: walk counts only 10 tokens for the multimodal, exhausts to head_end,
1204          the head_end safeguard forces cut = n - min_tail = 3, tail = only 3 messages.
1205          """
1206          c = budget_compressor
1207          # 500 chars → 500//4 + 10 = 135 tokens; len([text, image]) // 4 + 10 = 10 (bug)
1208          big_text = "x" * 500
1209          multimodal_content = [
1210              {"type": "text", "text": big_text},
1211              {"type": "image_url", "image_url": {"url": "https://example.com/img.jpg"}},
1212          ]
1213          messages = [
1214              {"role": "user", "content": "head1"},               # 0
1215              {"role": "user", "content": multimodal_content},    # 1: BIG (index under test)
1216              {"role": "assistant", "content": "tail1"},           # 2
1217              {"role": "user", "content": "tail2"},                # 3
1218              {"role": "assistant", "content": "tail3"},           # 4
1219              {"role": "user", "content": "tail4"},                # 5
1220          ]
1221          c.tail_token_budget = 80  # soft_ceiling = 120
1222          head_end = 0
1223          cut = c._find_tail_cut_by_tokens(messages, head_end)
1224          # With the fix: cut=2, tail has 4 messages (soft_ceiling not exceeded by tail1-4).
1225          # With the bug: head_end safeguard fires → cut = n - min_tail = 3, only 3 in tail.
1226          assert len(messages) - cut >= 4, (
1227              f"Expected ≥4 messages in tail (got {len(messages) - cut}, cut={cut}). "
1228              "The multimodal message was underestimated — len(list) used instead of text chars."
1229          )
1230  
1231      def test_plain_string_content_unchanged(self, budget_compressor):
1232          """Plain string content must still be estimated correctly after the fix."""
1233          c = budget_compressor
1234          # Same layout as the multimodal test but with a plain 500-char string.
1235          # Both buggy and fixed code count plain strings the same way (len(str)).
1236          # With 135 tokens the plain string also exceeds soft_ceiling=120, so
1237          # the walk stops at index 1 and tail has 4 messages — same as the fix path.
1238          big_plain = "x" * 500
1239          messages = [
1240              {"role": "user", "content": "head1"},
1241              {"role": "user", "content": big_plain},   # 1: 135 tokens, plain string
1242              {"role": "assistant", "content": "tail1"},
1243              {"role": "user", "content": "tail2"},
1244              {"role": "assistant", "content": "tail3"},
1245              {"role": "user", "content": "tail4"},
1246          ]
1247          c.tail_token_budget = 80
1248          head_end = 0
1249          cut = c._find_tail_cut_by_tokens(messages, head_end)
1250          assert len(messages) - cut >= 4, (
1251              f"Plain string regression: expected ≥4 messages in tail, got {len(messages) - cut}"
1252          )
1253  
1254      def test_image_only_block_contributes_zero_text_chars(self, budget_compressor):
1255          """Image-only content blocks (no 'text' key) contribute 0 chars + base overhead."""
1256          c = budget_compressor
1257          c.tail_token_budget = 500
1258          image_only = [{"type": "image_url", "image_url": {"url": "https://example.com/x.jpg"}}]
1259          messages = [
1260              {"role": "user", "content": "a" * 4000},
1261              {"role": "user", "content": image_only},   # 0 text chars → 10 tokens overhead
1262              {"role": "assistant", "content": "ok"},
1263          ]
1264          head_end = 0
1265          cut = c._find_tail_cut_by_tokens(messages, head_end)
1266          assert isinstance(cut, int)
1267          assert 0 <= cut <= len(messages)
1268  
1269      def test_mixed_list_with_bare_strings_does_not_crash(self, budget_compressor):
1270          """Content list may contain bare strings (not dicts) — must not raise AttributeError."""
1271          c = budget_compressor
1272          c.tail_token_budget = 500
1273          # Bare string item alongside a dict item — normalisation elsewhere allows this.
1274          mixed_content = ["Hello, world!", {"type": "text", "text": "extra text"}]
1275          messages = [
1276              {"role": "user", "content": mixed_content},
1277              {"role": "assistant", "content": "ok"},
1278          ]
1279          head_end = 0
1280          cut = c._find_tail_cut_by_tokens(messages, head_end)
1281          assert isinstance(cut, int)
1282          assert 0 <= cut <= len(messages)
1283  
1284      def test_generous_budget_protects_everything_floor_does_not_override(
1285          self, budget_compressor
1286      ):
1287          """A budget that covers the whole transcript must prune nothing —
1288          ``protect_tail_count`` is a minimum floor, not a ceiling."""
1289          c = budget_compressor
1290  
1291          # 100 alternating assistant/tool messages.  Each tool result has
1292          # *unique* content so the dedup pass (Pass 1, which is independent
1293          # of prune_boundary) is a no-op and we isolate the boundary logic.
1294          messages = []
1295          for i in range(50):
1296              messages.append({
1297                  "role": "assistant", "content": None,
1298                  "tool_calls": [{
1299                      "id": f"c{i}",
1300                      "type": "function",
1301                      "function": {"name": "noop", "arguments": "{}"},
1302                  }],
1303              })
1304              messages.append({
1305                  "role": "tool",
1306                  "tool_call_id": f"c{i}",
1307                  "content": f"unique-tool-output-{i:03d}-" + ("x" * 250),
1308              })
1309  
1310          # Budget large enough to cover the whole transcript many times over,
1311          # so the budget walk completes without hitting its break condition
1312          # and the boundary lands at 0 ("protect everything").
1313          _, pruned = c._prune_old_tool_results(
1314              messages,
1315              protect_tail_count=20,
1316              protect_tail_tokens=10_000_000,
1317          )
1318  
1319          assert pruned == 0, (
1320              "budget said protect everything, but the floor still pruned "
1321              f"{pruned} messages — protect_tail_count is acting as a ceiling, "
1322              "not a minimum floor"
1323          )
1324  
1325  
1326  class TestUpdateModelBudgets:
1327      """Regression: update_model() must recalculate token budgets."""
1328  
1329      def test_tail_budget_recalculated(self):
1330          """tail_token_budget must change after switching to a different context length."""
1331          from unittest.mock import patch
1332          with patch("agent.context_compressor.get_model_context_length", return_value=200_000):
1333              comp = ContextCompressor("model-a", threshold_percent=0.50, quiet_mode=True)
1334          old_tail = comp.tail_token_budget
1335          old_max_summary = comp.max_summary_tokens
1336  
1337          comp.update_model("model-b", context_length=32_000)
1338          assert comp.tail_token_budget != old_tail, "tail_token_budget should change"
1339          assert comp.tail_token_budget < old_tail, "smaller context → smaller budget"
1340          assert comp.max_summary_tokens != old_max_summary, "max_summary_tokens should change"
1341  
1342      def test_budgets_proportional(self):
1343          """Budgets should be proportional to context_length after update."""
1344          from unittest.mock import patch
1345          with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
1346              comp = ContextCompressor("model-a", threshold_percent=0.50, quiet_mode=True)
1347          comp.update_model("model-b", context_length=10_000)
1348          assert comp.tail_token_budget == int(comp.threshold_tokens * comp.summary_target_ratio)
1349          assert comp.max_summary_tokens == min(int(10_000 * 0.05), 4000)
1350  
1351  
1352  class TestTruncateToolCallArgsJson:
1353      """Regression tests for #11762.
1354  
1355      The previous implementation produced invalid JSON by slicing
1356      ``function.arguments`` mid-string, which caused non-retryable 400s from
1357      strict providers (observed on MiniMax) and stuck long sessions in a
1358      re-send loop. The helper here must always emit parseable JSON whose
1359      shape matches the original — shrunken, not corrupted.
1360      """
1361  
1362      def _helper(self):
1363          from agent.context_compressor import _truncate_tool_call_args_json
1364          return _truncate_tool_call_args_json
1365  
1366      def test_shrunken_args_remain_valid_json(self):
1367          import json as _json
1368          shrink = self._helper()
1369          original = _json.dumps({
1370              "path": "~/.hermes/skills/shopping/browser-setup-notes.md",
1371              "content": "# Shopping Browser Setup Notes\n\n" + "abc " * 400,
1372          })
1373          assert len(original) > 500
1374          shrunk = shrink(original)
1375          parsed = _json.loads(shrunk)  # must not raise
1376          assert parsed["path"] == "~/.hermes/skills/shopping/browser-setup-notes.md"
1377          assert parsed["content"].endswith("...[truncated]")
1378          assert len(shrunk) < len(original)
1379  
1380      def test_non_json_arguments_pass_through(self):
1381          shrink = self._helper()
1382          not_json = "this is not json at all, " * 50
1383          assert shrink(not_json) == not_json
1384  
1385      def test_short_string_leaves_unchanged(self):
1386          import json as _json
1387          shrink = self._helper()
1388          payload = _json.dumps({"command": "ls -la", "cwd": "/tmp"})
1389          assert _json.loads(shrink(payload)) == {"command": "ls -la", "cwd": "/tmp"}
1390  
1391      def test_nested_structures_are_walked(self):
1392          import json as _json
1393          shrink = self._helper()
1394          payload = _json.dumps({
1395              "messages": [
1396                  {"role": "user", "content": "x" * 500},
1397                  {"role": "assistant", "content": "ok"},
1398              ],
1399              "meta": {"note": "y" * 500},
1400          })
1401          parsed = _json.loads(shrink(payload))
1402          assert parsed["messages"][0]["content"].endswith("...[truncated]")
1403          assert parsed["messages"][1]["content"] == "ok"
1404          assert parsed["meta"]["note"].endswith("...[truncated]")
1405  
1406      def test_non_string_leaves_preserved(self):
1407          import json as _json
1408          shrink = self._helper()
1409          payload = _json.dumps({
1410              "retries": 3,
1411              "enabled": True,
1412              "timeout": None,
1413              "items": [1, 2, 3],
1414              "note": "z" * 500,
1415          })
1416          parsed = _json.loads(shrink(payload))
1417          assert parsed["retries"] == 3
1418          assert parsed["enabled"] is True
1419          assert parsed["timeout"] is None
1420          assert parsed["items"] == [1, 2, 3]
1421          assert parsed["note"].endswith("...[truncated]")
1422  
1423      def test_scalar_json_string_gets_shrunk(self):
1424          import json as _json
1425          shrink = self._helper()
1426          payload = _json.dumps("q" * 500)
1427          parsed = _json.loads(shrink(payload))
1428          assert isinstance(parsed, str)
1429          assert parsed.endswith("...[truncated]")
1430  
1431      def test_unicode_preserved(self):
1432          import json as _json
1433          shrink = self._helper()
1434          payload = _json.dumps({"content": "非德满" + ("a" * 500)})
1435          out = shrink(payload)
1436          # ensure_ascii=False keeps CJK intact rather than emitting \uXXXX
1437          assert "非德满" in out
1438  
1439      def test_pass3_emits_valid_json_for_downstream_provider(self):
1440          """End-to-end: Pass 3 must never produce the exact failure payload
1441          that caused the 400 loop (unterminated string, missing brace)."""
1442          import json as _json
1443          with patch("agent.context_compressor.get_model_context_length", return_value=100000):
1444              c = ContextCompressor(
1445                  model="test/model",
1446                  threshold_percent=0.85,
1447                  protect_first_n=1,
1448                  protect_last_n=1,
1449                  quiet_mode=True,
1450              )
1451          huge_content = "# Shopping Browser Setup Notes\n\n## Overview\n" + "x " * 400
1452          args_payload = _json.dumps({
1453              "path": "~/.hermes/skills/shopping/browser-setup-notes.md",
1454              "content": huge_content,
1455          })
1456          assert len(args_payload) > 500  # triggers the Pass-3 shrink
1457          messages = [
1458              {"role": "user", "content": "please write two files"},
1459              {"role": "assistant", "content": None, "tool_calls": [
1460                  {"id": "call_1", "type": "function",
1461                   "function": {"name": "write_file", "arguments": args_payload}},
1462              ]},
1463              {"role": "tool", "tool_call_id": "call_1",
1464               "content": '{"bytes_written": 727}'},
1465              {"role": "user", "content": "ok"},
1466              {"role": "assistant", "content": "done"},
1467          ]
1468          result, _ = c._prune_old_tool_results(messages, protect_tail_count=2)
1469          shrunk = result[1]["tool_calls"][0]["function"]["arguments"]
1470          # Must parse — otherwise downstream provider returns 400
1471          parsed = _json.loads(shrunk)
1472          assert parsed["path"] == "~/.hermes/skills/shopping/browser-setup-notes.md"
1473          assert parsed["content"].endswith("...[truncated]")