/ tests / cli / test_manual_compress.py
test_manual_compress.py
  1  """Tests for CLI manual compression messaging."""
  2  
  3  from unittest.mock import MagicMock, patch
  4  
  5  from tests.cli.test_cli_init import _make_cli
  6  
  7  
  8  def _make_history() -> list[dict[str, str]]:
  9      return [
 10          {"role": "user", "content": "one"},
 11          {"role": "assistant", "content": "two"},
 12          {"role": "user", "content": "three"},
 13          {"role": "assistant", "content": "four"},
 14      ]
 15  
 16  
 17  def test_manual_compress_reports_noop_without_success_banner(capsys):
 18      shell = _make_cli()
 19      history = _make_history()
 20      shell.conversation_history = history
 21      shell.agent = MagicMock()
 22      shell.agent.compression_enabled = True
 23      shell.agent._cached_system_prompt = ""
 24      shell.agent.tools = None
 25      shell.agent.session_id = shell.session_id  # no-op compression: no split
 26      shell.agent._compress_context.return_value = (list(history), "")
 27  
 28      def _estimate(messages, **_kwargs):
 29          assert messages == history
 30          return 100
 31  
 32      with patch("agent.model_metadata.estimate_request_tokens_rough", side_effect=_estimate):
 33          shell._manual_compress()
 34  
 35      output = capsys.readouterr().out
 36      assert "No changes from compression" in output
 37      assert "✅ Compressed" not in output
 38      assert "Approx request size: ~100 tokens (unchanged)" in output
 39  
 40  
 41  def test_manual_compress_explains_when_token_estimate_rises(capsys):
 42      shell = _make_cli()
 43      history = _make_history()
 44      compressed = [
 45          history[0],
 46          {"role": "assistant", "content": "Dense summary that still counts as more tokens."},
 47          history[-1],
 48      ]
 49      shell.conversation_history = history
 50      shell.agent = MagicMock()
 51      shell.agent.compression_enabled = True
 52      shell.agent._cached_system_prompt = ""
 53      shell.agent.tools = None
 54      shell.agent.session_id = shell.session_id  # no-op: no split
 55      shell.agent._compress_context.return_value = (compressed, "")
 56  
 57      def _estimate(messages, **_kwargs):
 58          if messages == history:
 59              return 100
 60          if messages == compressed:
 61              return 120
 62          raise AssertionError(f"unexpected transcript: {messages!r}")
 63  
 64      with patch("agent.model_metadata.estimate_request_tokens_rough", side_effect=_estimate):
 65          shell._manual_compress()
 66  
 67      output = capsys.readouterr().out
 68      assert "✅ Compressed: 4 → 3 messages" in output
 69      assert "Approx request size: ~100 → ~120 tokens" in output
 70      assert "denser summaries" in output
 71  
 72  
 73  def test_manual_compress_syncs_session_id_after_split():
 74      """Regression for cli.session_id desync after /compress.
 75  
 76      _compress_context ends the parent session and creates a new child session,
 77      mutating agent.session_id. Without syncing, cli.session_id still points
 78      at the ended parent — causing /status, /resume, exit summary, and the
 79      next end_session() call (e.g. from /resume <id>) to target the wrong row.
 80      """
 81      shell = _make_cli()
 82      history = _make_history()
 83      old_id = shell.session_id
 84      new_child_id = "20260101_000000_child1"
 85  
 86      compressed = [
 87          {"role": "user", "content": "[summary]"},
 88          history[-1],
 89      ]
 90      shell.conversation_history = history
 91      shell.agent = MagicMock()
 92      shell.agent.compression_enabled = True
 93      shell.agent._cached_system_prompt = ""
 94      shell.agent.tools = None
 95      # Simulate _compress_context mutating agent.session_id as a side effect.
 96      def _fake_compress(*args, **kwargs):
 97          shell.agent.session_id = new_child_id
 98          return (compressed, "")
 99      shell.agent._compress_context.side_effect = _fake_compress
100      shell.agent.session_id = old_id  # starts in sync
101      shell._pending_title = "stale title"
102  
103      with patch("agent.model_metadata.estimate_request_tokens_rough", return_value=100):
104          shell._manual_compress()
105  
106      # CLI session_id must now point at the continuation child, not the parent.
107      assert shell.session_id == new_child_id
108      assert shell.session_id != old_id
109      # Pending title must be cleared — titles belong to the parent lineage and
110      # get regenerated for the continuation.
111      assert shell._pending_title is None
112  
113  
114  def test_manual_compress_no_sync_when_session_id_unchanged():
115      """If compression is a no-op (agent.session_id didn't change), the CLI
116      must NOT clear _pending_title or otherwise disturb session state.
117      """
118      shell = _make_cli()
119      history = _make_history()
120      shell.conversation_history = history
121      shell.agent = MagicMock()
122      shell.agent.compression_enabled = True
123      shell.agent._cached_system_prompt = ""
124      shell.agent.tools = None
125      shell.agent.session_id = shell.session_id
126      shell.agent._compress_context.return_value = (list(history), "")
127      shell._pending_title = "keep me"
128  
129      with patch("agent.model_metadata.estimate_request_tokens_rough", return_value=100):
130          shell._manual_compress()
131  
132      # No split → pending title untouched.
133      assert shell._pending_title == "keep me"