/ tests / run_agent / test_413_compression.py
test_413_compression.py
  1  """Tests for payload/context-length → compression retry logic in AIAgent.
  2  
  3  Verifies that:
  4  - HTTP 413 errors trigger history compression and retry
  5  - HTTP 400 context-length errors trigger compression (not generic 4xx abort)
  6  - Preflight compression proactively compresses oversized sessions before API calls
  7  """
  8  
  9  import pytest
 10  #pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
 11  
 12  
 13  
 14  import uuid
 15  from types import SimpleNamespace
 16  from unittest.mock import MagicMock, patch
 17  
 18  import pytest
 19  
 20  from agent.context_compressor import SUMMARY_PREFIX
 21  from run_agent import AIAgent
 22  import run_agent
 23  
 24  
 25  # ---------------------------------------------------------------------------
 26  # Fast backoff for compression retry tests
 27  # ---------------------------------------------------------------------------
 28  
 29  
 30  @pytest.fixture(autouse=True)
 31  def _no_compression_sleep(monkeypatch):
 32      """Short-circuit the 2s time.sleep between compression retries.
 33  
 34      Production code has ``time.sleep(2)`` in multiple places after a 413/context
 35      compression, for rate-limit smoothing. Tests assert behavior, not timing.
 36      """
 37      import time as _time
 38      monkeypatch.setattr(_time, "sleep", lambda *_a, **_k: None)
 39      monkeypatch.setattr(run_agent, "jittered_backoff", lambda *a, **k: 0.0)
 40  
 41  
 42  # ---------------------------------------------------------------------------
 43  # Helpers
 44  # ---------------------------------------------------------------------------
 45  
 46  def _make_tool_defs(*names: str) -> list:
 47      return [
 48          {
 49              "type": "function",
 50              "function": {
 51                  "name": n,
 52                  "description": f"{n} tool",
 53                  "parameters": {"type": "object", "properties": {}},
 54              },
 55          }
 56          for n in names
 57      ]
 58  
 59  
 60  def _mock_response(content="Hello", finish_reason="stop", tool_calls=None, usage=None):
 61      msg = SimpleNamespace(
 62          content=content,
 63          tool_calls=tool_calls,
 64          reasoning_content=None,
 65          reasoning=None,
 66      )
 67      choice = SimpleNamespace(message=msg, finish_reason=finish_reason)
 68      resp = SimpleNamespace(choices=[choice], model="test/model")
 69      resp.usage = SimpleNamespace(**usage) if usage else None
 70      return resp
 71  
 72  
 73  def _make_413_error(*, use_status_code=True, message="Request entity too large"):
 74      """Create an exception that mimics a 413 HTTP error."""
 75      err = Exception(message)
 76      if use_status_code:
 77          err.status_code = 413
 78      return err
 79  
 80  
 81  @pytest.fixture()
 82  def agent():
 83      with (
 84          patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
 85          patch("run_agent.check_toolset_requirements", return_value={}),
 86          patch("run_agent.OpenAI"),
 87      ):
 88          a = AIAgent(
 89              api_key="test-key-1234567890",
 90              base_url="https://openrouter.ai/api/v1",
 91              quiet_mode=True,
 92              skip_context_files=True,
 93              skip_memory=True,
 94          )
 95          a.client = MagicMock()
 96          a._cached_system_prompt = "You are helpful."
 97          a._use_prompt_caching = False
 98          a.tool_delay = 0
 99          a.compression_enabled = False
100          a.save_trajectories = False
101          return a
102  
103  
104  # ---------------------------------------------------------------------------
105  # Tests
106  # ---------------------------------------------------------------------------
107  
108  class TestHTTP413Compression:
109      """413 errors should trigger compression, not abort as generic 4xx."""
110  
111      def test_413_triggers_compression(self, agent):
112          """A 413 error should call _compress_context and retry, not abort."""
113          # First call raises 413; second call succeeds after compression.
114          err_413 = _make_413_error()
115          ok_resp = _mock_response(content="Success after compression", finish_reason="stop")
116          agent.client.chat.completions.create.side_effect = [err_413, ok_resp]
117  
118          # Prefill so there are multiple messages for compression to reduce
119          prefill = [
120              {"role": "user", "content": "previous question"},
121              {"role": "assistant", "content": "previous answer"},
122          ]
123  
124          with (
125              patch.object(agent, "_compress_context") as mock_compress,
126              patch.object(agent, "_persist_session"),
127              patch.object(agent, "_save_trajectory"),
128              patch.object(agent, "_cleanup_task_resources"),
129          ):
130              # Compression reduces 3 messages down to 1
131              mock_compress.return_value = (
132                  [{"role": "user", "content": "hello"}],
133                  "compressed prompt",
134              )
135              result = agent.run_conversation("hello", conversation_history=prefill)
136  
137          mock_compress.assert_called_once()
138          assert result["completed"] is True
139          assert result["final_response"] == "Success after compression"
140  
141      def test_413_not_treated_as_generic_4xx(self, agent):
142          """413 must NOT hit the generic 4xx abort path; it should attempt compression."""
143          err_413 = _make_413_error()
144          ok_resp = _mock_response(content="Recovered", finish_reason="stop")
145          agent.client.chat.completions.create.side_effect = [err_413, ok_resp]
146  
147          prefill = [
148              {"role": "user", "content": "previous question"},
149              {"role": "assistant", "content": "previous answer"},
150          ]
151  
152          with (
153              patch.object(agent, "_compress_context") as mock_compress,
154              patch.object(agent, "_persist_session"),
155              patch.object(agent, "_save_trajectory"),
156              patch.object(agent, "_cleanup_task_resources"),
157          ):
158              mock_compress.return_value = (
159                  [{"role": "user", "content": "hello"}],
160                  "compressed",
161              )
162              result = agent.run_conversation("hello", conversation_history=prefill)
163  
164          # If 413 were treated as generic 4xx, result would have "failed": True
165          assert result.get("failed") is not True
166          assert result["completed"] is True
167  
168      def test_413_error_message_detection(self, agent):
169          """413 detected via error message string (no status_code attr)."""
170          err = _make_413_error(use_status_code=False, message="error code: 413")
171          ok_resp = _mock_response(content="OK", finish_reason="stop")
172          agent.client.chat.completions.create.side_effect = [err, ok_resp]
173  
174          prefill = [
175              {"role": "user", "content": "previous question"},
176              {"role": "assistant", "content": "previous answer"},
177          ]
178  
179          with (
180              patch.object(agent, "_compress_context") as mock_compress,
181              patch.object(agent, "_persist_session"),
182              patch.object(agent, "_save_trajectory"),
183              patch.object(agent, "_cleanup_task_resources"),
184          ):
185              mock_compress.return_value = (
186                  [{"role": "user", "content": "hello"}],
187                  "compressed",
188              )
189              result = agent.run_conversation("hello", conversation_history=prefill)
190  
191          mock_compress.assert_called_once()
192          assert result["completed"] is True
193  
194      def test_413_clears_conversation_history_on_persist(self, agent):
195          """After 413-triggered compression, _persist_session must receive None history.
196  
197          Bug: _compress_context() creates a new session and resets _last_flushed_db_idx=0,
198          but if conversation_history still holds the original (pre-compression) list,
199          _flush_messages_to_session_db computes flush_from = max(len(history), 0) which
200          exceeds len(compressed_messages), so messages[flush_from:] is empty and nothing
201          is written to the new session → "Session found but has no messages" on resume.
202          """
203          err_413 = _make_413_error()
204          ok_resp = _mock_response(content="OK", finish_reason="stop")
205          agent.client.chat.completions.create.side_effect = [err_413, ok_resp]
206  
207          big_history = [
208              {"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
209              for i in range(200)
210          ]
211  
212          persist_calls = []
213  
214          with (
215              patch.object(agent, "_compress_context") as mock_compress,
216              patch.object(
217                  agent, "_persist_session",
218                  side_effect=lambda msgs, hist: persist_calls.append(hist),
219              ),
220              patch.object(agent, "_save_trajectory"),
221              patch.object(agent, "_cleanup_task_resources"),
222          ):
223              mock_compress.return_value = (
224                  [{"role": "user", "content": "summary"}],
225                  "compressed prompt",
226              )
227              agent.run_conversation("hello", conversation_history=big_history)
228  
229          assert len(persist_calls) >= 1, "Expected at least one _persist_session call"
230          for hist in persist_calls:
231              assert hist is None, (
232                  f"conversation_history should be None after mid-loop compression, "
233                  f"got list with {len(hist)} items"
234              )
235  
236      def test_context_overflow_clears_conversation_history_on_persist(self, agent):
237          """After context-overflow compression, _persist_session must receive None history."""
238          err_400 = Exception(
239              "Error code: 400 - This endpoint's maximum context length is 128000 tokens. "
240              "However, you requested about 270460 tokens."
241          )
242          err_400.status_code = 400
243          ok_resp = _mock_response(content="OK", finish_reason="stop")
244          agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
245  
246          big_history = [
247              {"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
248              for i in range(200)
249          ]
250  
251          persist_calls = []
252  
253          with (
254              patch.object(agent, "_compress_context") as mock_compress,
255              patch.object(
256                  agent, "_persist_session",
257                  side_effect=lambda msgs, hist: persist_calls.append(hist),
258              ),
259              patch.object(agent, "_save_trajectory"),
260              patch.object(agent, "_cleanup_task_resources"),
261          ):
262              mock_compress.return_value = (
263                  [{"role": "user", "content": "summary"}],
264                  "compressed prompt",
265              )
266              agent.run_conversation("hello", conversation_history=big_history)
267  
268          assert len(persist_calls) >= 1
269          for hist in persist_calls:
270              assert hist is None, (
271                  f"conversation_history should be None after context-overflow compression, "
272                  f"got list with {len(hist)} items"
273              )
274  
275      def test_400_context_length_triggers_compression(self, agent):
276          """A 400 with 'maximum context length' should trigger compression, not abort as generic 4xx.
277  
278          OpenRouter returns HTTP 400 (not 413) for context-length errors. Before
279          the fix, this was caught by the generic 4xx handler which aborted
280          immediately — now it correctly triggers compression+retry.
281          """
282          err_400 = Exception(
283              "Error code: 400 - {'error': {'message': "
284              "\"This endpoint's maximum context length is 204800 tokens. "
285              "However, you requested about 270460 tokens.\", 'code': 400}}"
286          )
287          err_400.status_code = 400
288          ok_resp = _mock_response(content="Recovered after compression", finish_reason="stop")
289          agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
290  
291          prefill = [
292              {"role": "user", "content": "previous question"},
293              {"role": "assistant", "content": "previous answer"},
294          ]
295  
296          with (
297              patch.object(agent, "_compress_context") as mock_compress,
298              patch.object(agent, "_persist_session"),
299              patch.object(agent, "_save_trajectory"),
300              patch.object(agent, "_cleanup_task_resources"),
301          ):
302              mock_compress.return_value = (
303                  [{"role": "user", "content": "hello"}],
304                  "compressed prompt",
305              )
306              result = agent.run_conversation("hello", conversation_history=prefill)
307  
308          mock_compress.assert_called_once()
309          # Must NOT have "failed": True (which would mean the generic 4xx handler caught it)
310          assert result.get("failed") is not True
311          assert result["completed"] is True
312          assert result["final_response"] == "Recovered after compression"
313  
314      def test_400_reduce_length_triggers_compression(self, agent):
315          """A 400 with 'reduce the length' should trigger compression."""
316          err_400 = Exception(
317              "Error code: 400 - Please reduce the length of the messages"
318          )
319          err_400.status_code = 400
320          ok_resp = _mock_response(content="OK", finish_reason="stop")
321          agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
322  
323          prefill = [
324              {"role": "user", "content": "previous question"},
325              {"role": "assistant", "content": "previous answer"},
326          ]
327  
328          with (
329              patch.object(agent, "_compress_context") as mock_compress,
330              patch.object(agent, "_persist_session"),
331              patch.object(agent, "_save_trajectory"),
332              patch.object(agent, "_cleanup_task_resources"),
333          ):
334              mock_compress.return_value = (
335                  [{"role": "user", "content": "hello"}],
336                  "compressed",
337              )
338              result = agent.run_conversation("hello", conversation_history=prefill)
339  
340          mock_compress.assert_called_once()
341          assert result["completed"] is True
342  
343      def test_context_length_retry_rebuilds_request_after_compression(self, agent):
344          """Retry must send the compressed transcript, not the stale oversized payload."""
345          err_400 = Exception(
346              "Error code: 400 - {'error': {'message': "
347              "\"This endpoint's maximum context length is 128000 tokens. "
348              "Please reduce the length of the messages.\"}}"
349          )
350          err_400.status_code = 400
351          ok_resp = _mock_response(content="Recovered after real compression", finish_reason="stop")
352  
353          request_payloads = []
354  
355          def _side_effect(**kwargs):
356              request_payloads.append(kwargs)
357              if len(request_payloads) == 1:
358                  raise err_400
359              return ok_resp
360  
361          agent.client.chat.completions.create.side_effect = _side_effect
362  
363          prefill = [
364              {"role": "user", "content": "previous question"},
365              {"role": "assistant", "content": "previous answer"},
366          ]
367  
368          with (
369              patch.object(agent, "_compress_context") as mock_compress,
370              patch.object(agent, "_persist_session"),
371              patch.object(agent, "_save_trajectory"),
372              patch.object(agent, "_cleanup_task_resources"),
373          ):
374              mock_compress.return_value = (
375                  [{"role": "user", "content": "compressed summary"}],
376                  "compressed prompt",
377              )
378              result = agent.run_conversation("hello", conversation_history=prefill)
379  
380          assert result["completed"] is True
381          assert len(request_payloads) == 2
382          assert len(request_payloads[1]["messages"]) < len(request_payloads[0]["messages"])
383          assert request_payloads[1]["messages"][0] == {
384              "role": "system",
385              "content": "compressed prompt",
386          }
387          assert request_payloads[1]["messages"][1] == {
388              "role": "user",
389              "content": "compressed summary",
390          }
391  
392      def test_413_cannot_compress_further(self, agent):
393          """When compression can't reduce messages, return partial result."""
394          err_413 = _make_413_error()
395          agent.client.chat.completions.create.side_effect = [err_413]
396  
397          with (
398              patch.object(agent, "_compress_context") as mock_compress,
399              patch.object(agent, "_persist_session"),
400              patch.object(agent, "_save_trajectory"),
401              patch.object(agent, "_cleanup_task_resources"),
402          ):
403              # Compression returns same number of messages → can't compress further
404              mock_compress.return_value = (
405                  [{"role": "user", "content": "hello"}],
406                  "same prompt",
407              )
408              result = agent.run_conversation("hello")
409  
410          assert result["completed"] is False
411          assert result.get("partial") is True
412          assert "413" in result["error"]
413  
414  
415  class TestPreflightCompression:
416      """Preflight compression should compress history before the first API call."""
417  
418      def test_preflight_compresses_oversized_history(self, agent):
419          """When loaded history exceeds the model's context threshold, compress before API call."""
420          agent.compression_enabled = True
421          # Set a small context so the history is "oversized", but large enough
422          # that the compressed result (2 short messages) fits in a single pass.
423          agent.context_compressor.context_length = 2000
424          agent.context_compressor.threshold_tokens = 200
425  
426          # Build a history that will be large enough to trigger preflight
427          # (each message ~50 chars ≈ 13 tokens, 40 messages ≈ 520 tokens > 200 threshold)
428          big_history = []
429          for i in range(20):
430              big_history.append({"role": "user", "content": f"Message number {i} with some extra text padding"})
431              big_history.append({"role": "assistant", "content": f"Response number {i} with extra padding here"})
432  
433          ok_resp = _mock_response(content="After preflight", finish_reason="stop")
434          agent.client.chat.completions.create.side_effect = [ok_resp]
435          status_messages = []
436          agent.status_callback = lambda ev, msg: status_messages.append((ev, msg))
437  
438          with (
439              patch.object(agent, "_compress_context") as mock_compress,
440              patch.object(agent, "_persist_session"),
441              patch.object(agent, "_save_trajectory"),
442              patch.object(agent, "_cleanup_task_resources"),
443          ):
444              # Simulate compression reducing messages to a small set that fits
445              mock_compress.return_value = (
446                  [
447                      {"role": "user", "content": f"{SUMMARY_PREFIX}\nPrevious conversation"},
448                      {"role": "user", "content": "hello"},
449                  ],
450                  "new system prompt",
451              )
452              result = agent.run_conversation("hello", conversation_history=big_history)
453  
454          # Preflight compression is a multi-pass loop (up to 3 passes for very
455          # large sessions, breaking when no further reduction is possible).
456          # First pass must have received the full oversized history.
457          assert mock_compress.call_count >= 1, "Preflight compression never ran"
458          first_call_messages = mock_compress.call_args_list[0].args[0]
459          assert len(first_call_messages) >= 40, (
460              f"First preflight pass should see the full history, got "
461              f"{len(first_call_messages)} messages"
462          )
463          assert result["completed"] is True
464          assert result["final_response"] == "After preflight"
465          assert any(
466              ev == "lifecycle" and "Preflight compression" in msg
467              for ev, msg in status_messages
468          )
469  
470      def test_no_preflight_when_under_threshold(self, agent):
471          """When history fits within context, no preflight compression needed."""
472          agent.compression_enabled = True
473          # Large context — history easily fits
474          agent.context_compressor.context_length = 1000000
475          agent.context_compressor.threshold_tokens = 850000
476  
477          small_history = [
478              {"role": "user", "content": "hi"},
479              {"role": "assistant", "content": "hello"},
480          ]
481  
482          ok_resp = _mock_response(content="No compression needed", finish_reason="stop")
483          agent.client.chat.completions.create.side_effect = [ok_resp]
484  
485          with (
486              patch.object(agent, "_compress_context") as mock_compress,
487              patch.object(agent, "_persist_session"),
488              patch.object(agent, "_save_trajectory"),
489              patch.object(agent, "_cleanup_task_resources"),
490          ):
491              result = agent.run_conversation("hello", conversation_history=small_history)
492  
493          mock_compress.assert_not_called()
494          assert result["completed"] is True
495  
496      def test_no_preflight_when_compression_disabled(self, agent):
497          """Preflight should not run when compression is disabled."""
498          agent.compression_enabled = False
499          agent.context_compressor.context_length = 100
500          agent.context_compressor.threshold_tokens = 85
501  
502          big_history = [
503              {"role": "user", "content": "x" * 1000},
504              {"role": "assistant", "content": "y" * 1000},
505          ] * 10
506  
507          ok_resp = _mock_response(content="OK", finish_reason="stop")
508          agent.client.chat.completions.create.side_effect = [ok_resp]
509  
510          with (
511              patch.object(agent, "_compress_context") as mock_compress,
512              patch.object(agent, "_persist_session"),
513              patch.object(agent, "_save_trajectory"),
514              patch.object(agent, "_cleanup_task_resources"),
515          ):
516              result = agent.run_conversation("hello", conversation_history=big_history)
517  
518          mock_compress.assert_not_called()
519  
520  
521  class TestToolResultPreflightCompression:
522      """Compression should trigger when tool results push context past the threshold."""
523  
524      def test_large_tool_results_trigger_compression(self, agent):
525          """When tool results push estimated tokens past threshold, compress before next call."""
526          agent.compression_enabled = True
527          agent.context_compressor.context_length = 200_000
528          agent.context_compressor.threshold_tokens = 130_000  # below the 135k reported usage
529          agent.context_compressor.last_prompt_tokens = 130_000
530          agent.context_compressor.last_completion_tokens = 5_000
531  
532          tc = SimpleNamespace(
533              id="tc1", type="function",
534              function=SimpleNamespace(name="web_search", arguments='{"query":"test"}'),
535          )
536          tool_resp = _mock_response(
537              content=None, finish_reason="stop", tool_calls=[tc],
538              usage={"prompt_tokens": 130_000, "completion_tokens": 5_000, "total_tokens": 135_000},
539          )
540          ok_resp = _mock_response(
541              content="Done after compression", finish_reason="stop",
542              usage={"prompt_tokens": 50_000, "completion_tokens": 100, "total_tokens": 50_100},
543          )
544          agent.client.chat.completions.create.side_effect = [tool_resp, ok_resp]
545          large_result = "x" * 100_000
546  
547          with (
548              patch("run_agent.handle_function_call", return_value=large_result),
549              patch.object(agent, "_compress_context") as mock_compress,
550              patch.object(agent, "_persist_session"),
551              patch.object(agent, "_save_trajectory"),
552              patch.object(agent, "_cleanup_task_resources"),
553          ):
554              mock_compress.return_value = (
555                  [{"role": "user", "content": "hello"}], "compressed prompt",
556              )
557              result = agent.run_conversation("hello")
558  
559          mock_compress.assert_called_once()
560          assert result["completed"] is True
561  
562      def test_anthropic_prompt_too_long_safety_net(self, agent):
563          """Anthropic 'prompt is too long' error triggers compression as safety net."""
564          err_400 = Exception(
565              "Error code: 400 - {'type': 'error', 'error': {'type': 'invalid_request_error', "
566              "'message': 'prompt is too long: 233153 tokens > 200000 maximum'}}"
567          )
568          err_400.status_code = 400
569          ok_resp = _mock_response(content="Recovered", finish_reason="stop")
570          agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
571          prefill = [
572              {"role": "user", "content": "previous"},
573              {"role": "assistant", "content": "answer"},
574          ]
575  
576          with (
577              patch.object(agent, "_compress_context") as mock_compress,
578              patch.object(agent, "_persist_session"),
579              patch.object(agent, "_save_trajectory"),
580              patch.object(agent, "_cleanup_task_resources"),
581          ):
582              mock_compress.return_value = (
583                  [{"role": "user", "content": "hello"}], "compressed",
584              )
585              result = agent.run_conversation("hello", conversation_history=prefill)
586  
587          mock_compress.assert_called_once()
588          assert result["completed"] is True