test_413_compression.py
1 """Tests for payload/context-length → compression retry logic in AIAgent. 2 3 Verifies that: 4 - HTTP 413 errors trigger history compression and retry 5 - HTTP 400 context-length errors trigger compression (not generic 4xx abort) 6 - Preflight compression proactively compresses oversized sessions before API calls 7 """ 8 9 import pytest 10 #pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments") 11 12 13 14 import uuid 15 from types import SimpleNamespace 16 from unittest.mock import MagicMock, patch 17 18 import pytest 19 20 from agent.context_compressor import SUMMARY_PREFIX 21 from run_agent import AIAgent 22 import run_agent 23 24 25 # --------------------------------------------------------------------------- 26 # Fast backoff for compression retry tests 27 # --------------------------------------------------------------------------- 28 29 30 @pytest.fixture(autouse=True) 31 def _no_compression_sleep(monkeypatch): 32 """Short-circuit the 2s time.sleep between compression retries. 33 34 Production code has ``time.sleep(2)`` in multiple places after a 413/context 35 compression, for rate-limit smoothing. Tests assert behavior, not timing. 36 """ 37 import time as _time 38 monkeypatch.setattr(_time, "sleep", lambda *_a, **_k: None) 39 monkeypatch.setattr(run_agent, "jittered_backoff", lambda *a, **k: 0.0) 40 41 42 # --------------------------------------------------------------------------- 43 # Helpers 44 # --------------------------------------------------------------------------- 45 46 def _make_tool_defs(*names: str) -> list: 47 return [ 48 { 49 "type": "function", 50 "function": { 51 "name": n, 52 "description": f"{n} tool", 53 "parameters": {"type": "object", "properties": {}}, 54 }, 55 } 56 for n in names 57 ] 58 59 60 def _mock_response(content="Hello", finish_reason="stop", tool_calls=None, usage=None): 61 msg = SimpleNamespace( 62 content=content, 63 tool_calls=tool_calls, 64 reasoning_content=None, 65 reasoning=None, 66 ) 67 choice = SimpleNamespace(message=msg, finish_reason=finish_reason) 68 resp = SimpleNamespace(choices=[choice], model="test/model") 69 resp.usage = SimpleNamespace(**usage) if usage else None 70 return resp 71 72 73 def _make_413_error(*, use_status_code=True, message="Request entity too large"): 74 """Create an exception that mimics a 413 HTTP error.""" 75 err = Exception(message) 76 if use_status_code: 77 err.status_code = 413 78 return err 79 80 81 @pytest.fixture() 82 def agent(): 83 with ( 84 patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")), 85 patch("run_agent.check_toolset_requirements", return_value={}), 86 patch("run_agent.OpenAI"), 87 ): 88 a = AIAgent( 89 api_key="test-key-1234567890", 90 base_url="https://openrouter.ai/api/v1", 91 quiet_mode=True, 92 skip_context_files=True, 93 skip_memory=True, 94 ) 95 a.client = MagicMock() 96 a._cached_system_prompt = "You are helpful." 97 a._use_prompt_caching = False 98 a.tool_delay = 0 99 a.compression_enabled = False 100 a.save_trajectories = False 101 return a 102 103 104 # --------------------------------------------------------------------------- 105 # Tests 106 # --------------------------------------------------------------------------- 107 108 class TestHTTP413Compression: 109 """413 errors should trigger compression, not abort as generic 4xx.""" 110 111 def test_413_triggers_compression(self, agent): 112 """A 413 error should call _compress_context and retry, not abort.""" 113 # First call raises 413; second call succeeds after compression. 114 err_413 = _make_413_error() 115 ok_resp = _mock_response(content="Success after compression", finish_reason="stop") 116 agent.client.chat.completions.create.side_effect = [err_413, ok_resp] 117 118 # Prefill so there are multiple messages for compression to reduce 119 prefill = [ 120 {"role": "user", "content": "previous question"}, 121 {"role": "assistant", "content": "previous answer"}, 122 ] 123 124 with ( 125 patch.object(agent, "_compress_context") as mock_compress, 126 patch.object(agent, "_persist_session"), 127 patch.object(agent, "_save_trajectory"), 128 patch.object(agent, "_cleanup_task_resources"), 129 ): 130 # Compression reduces 3 messages down to 1 131 mock_compress.return_value = ( 132 [{"role": "user", "content": "hello"}], 133 "compressed prompt", 134 ) 135 result = agent.run_conversation("hello", conversation_history=prefill) 136 137 mock_compress.assert_called_once() 138 assert result["completed"] is True 139 assert result["final_response"] == "Success after compression" 140 141 def test_413_not_treated_as_generic_4xx(self, agent): 142 """413 must NOT hit the generic 4xx abort path; it should attempt compression.""" 143 err_413 = _make_413_error() 144 ok_resp = _mock_response(content="Recovered", finish_reason="stop") 145 agent.client.chat.completions.create.side_effect = [err_413, ok_resp] 146 147 prefill = [ 148 {"role": "user", "content": "previous question"}, 149 {"role": "assistant", "content": "previous answer"}, 150 ] 151 152 with ( 153 patch.object(agent, "_compress_context") as mock_compress, 154 patch.object(agent, "_persist_session"), 155 patch.object(agent, "_save_trajectory"), 156 patch.object(agent, "_cleanup_task_resources"), 157 ): 158 mock_compress.return_value = ( 159 [{"role": "user", "content": "hello"}], 160 "compressed", 161 ) 162 result = agent.run_conversation("hello", conversation_history=prefill) 163 164 # If 413 were treated as generic 4xx, result would have "failed": True 165 assert result.get("failed") is not True 166 assert result["completed"] is True 167 168 def test_413_error_message_detection(self, agent): 169 """413 detected via error message string (no status_code attr).""" 170 err = _make_413_error(use_status_code=False, message="error code: 413") 171 ok_resp = _mock_response(content="OK", finish_reason="stop") 172 agent.client.chat.completions.create.side_effect = [err, ok_resp] 173 174 prefill = [ 175 {"role": "user", "content": "previous question"}, 176 {"role": "assistant", "content": "previous answer"}, 177 ] 178 179 with ( 180 patch.object(agent, "_compress_context") as mock_compress, 181 patch.object(agent, "_persist_session"), 182 patch.object(agent, "_save_trajectory"), 183 patch.object(agent, "_cleanup_task_resources"), 184 ): 185 mock_compress.return_value = ( 186 [{"role": "user", "content": "hello"}], 187 "compressed", 188 ) 189 result = agent.run_conversation("hello", conversation_history=prefill) 190 191 mock_compress.assert_called_once() 192 assert result["completed"] is True 193 194 def test_413_clears_conversation_history_on_persist(self, agent): 195 """After 413-triggered compression, _persist_session must receive None history. 196 197 Bug: _compress_context() creates a new session and resets _last_flushed_db_idx=0, 198 but if conversation_history still holds the original (pre-compression) list, 199 _flush_messages_to_session_db computes flush_from = max(len(history), 0) which 200 exceeds len(compressed_messages), so messages[flush_from:] is empty and nothing 201 is written to the new session → "Session found but has no messages" on resume. 202 """ 203 err_413 = _make_413_error() 204 ok_resp = _mock_response(content="OK", finish_reason="stop") 205 agent.client.chat.completions.create.side_effect = [err_413, ok_resp] 206 207 big_history = [ 208 {"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} 209 for i in range(200) 210 ] 211 212 persist_calls = [] 213 214 with ( 215 patch.object(agent, "_compress_context") as mock_compress, 216 patch.object( 217 agent, "_persist_session", 218 side_effect=lambda msgs, hist: persist_calls.append(hist), 219 ), 220 patch.object(agent, "_save_trajectory"), 221 patch.object(agent, "_cleanup_task_resources"), 222 ): 223 mock_compress.return_value = ( 224 [{"role": "user", "content": "summary"}], 225 "compressed prompt", 226 ) 227 agent.run_conversation("hello", conversation_history=big_history) 228 229 assert len(persist_calls) >= 1, "Expected at least one _persist_session call" 230 for hist in persist_calls: 231 assert hist is None, ( 232 f"conversation_history should be None after mid-loop compression, " 233 f"got list with {len(hist)} items" 234 ) 235 236 def test_context_overflow_clears_conversation_history_on_persist(self, agent): 237 """After context-overflow compression, _persist_session must receive None history.""" 238 err_400 = Exception( 239 "Error code: 400 - This endpoint's maximum context length is 128000 tokens. " 240 "However, you requested about 270460 tokens." 241 ) 242 err_400.status_code = 400 243 ok_resp = _mock_response(content="OK", finish_reason="stop") 244 agent.client.chat.completions.create.side_effect = [err_400, ok_resp] 245 246 big_history = [ 247 {"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"} 248 for i in range(200) 249 ] 250 251 persist_calls = [] 252 253 with ( 254 patch.object(agent, "_compress_context") as mock_compress, 255 patch.object( 256 agent, "_persist_session", 257 side_effect=lambda msgs, hist: persist_calls.append(hist), 258 ), 259 patch.object(agent, "_save_trajectory"), 260 patch.object(agent, "_cleanup_task_resources"), 261 ): 262 mock_compress.return_value = ( 263 [{"role": "user", "content": "summary"}], 264 "compressed prompt", 265 ) 266 agent.run_conversation("hello", conversation_history=big_history) 267 268 assert len(persist_calls) >= 1 269 for hist in persist_calls: 270 assert hist is None, ( 271 f"conversation_history should be None after context-overflow compression, " 272 f"got list with {len(hist)} items" 273 ) 274 275 def test_400_context_length_triggers_compression(self, agent): 276 """A 400 with 'maximum context length' should trigger compression, not abort as generic 4xx. 277 278 OpenRouter returns HTTP 400 (not 413) for context-length errors. Before 279 the fix, this was caught by the generic 4xx handler which aborted 280 immediately — now it correctly triggers compression+retry. 281 """ 282 err_400 = Exception( 283 "Error code: 400 - {'error': {'message': " 284 "\"This endpoint's maximum context length is 204800 tokens. " 285 "However, you requested about 270460 tokens.\", 'code': 400}}" 286 ) 287 err_400.status_code = 400 288 ok_resp = _mock_response(content="Recovered after compression", finish_reason="stop") 289 agent.client.chat.completions.create.side_effect = [err_400, ok_resp] 290 291 prefill = [ 292 {"role": "user", "content": "previous question"}, 293 {"role": "assistant", "content": "previous answer"}, 294 ] 295 296 with ( 297 patch.object(agent, "_compress_context") as mock_compress, 298 patch.object(agent, "_persist_session"), 299 patch.object(agent, "_save_trajectory"), 300 patch.object(agent, "_cleanup_task_resources"), 301 ): 302 mock_compress.return_value = ( 303 [{"role": "user", "content": "hello"}], 304 "compressed prompt", 305 ) 306 result = agent.run_conversation("hello", conversation_history=prefill) 307 308 mock_compress.assert_called_once() 309 # Must NOT have "failed": True (which would mean the generic 4xx handler caught it) 310 assert result.get("failed") is not True 311 assert result["completed"] is True 312 assert result["final_response"] == "Recovered after compression" 313 314 def test_400_reduce_length_triggers_compression(self, agent): 315 """A 400 with 'reduce the length' should trigger compression.""" 316 err_400 = Exception( 317 "Error code: 400 - Please reduce the length of the messages" 318 ) 319 err_400.status_code = 400 320 ok_resp = _mock_response(content="OK", finish_reason="stop") 321 agent.client.chat.completions.create.side_effect = [err_400, ok_resp] 322 323 prefill = [ 324 {"role": "user", "content": "previous question"}, 325 {"role": "assistant", "content": "previous answer"}, 326 ] 327 328 with ( 329 patch.object(agent, "_compress_context") as mock_compress, 330 patch.object(agent, "_persist_session"), 331 patch.object(agent, "_save_trajectory"), 332 patch.object(agent, "_cleanup_task_resources"), 333 ): 334 mock_compress.return_value = ( 335 [{"role": "user", "content": "hello"}], 336 "compressed", 337 ) 338 result = agent.run_conversation("hello", conversation_history=prefill) 339 340 mock_compress.assert_called_once() 341 assert result["completed"] is True 342 343 def test_context_length_retry_rebuilds_request_after_compression(self, agent): 344 """Retry must send the compressed transcript, not the stale oversized payload.""" 345 err_400 = Exception( 346 "Error code: 400 - {'error': {'message': " 347 "\"This endpoint's maximum context length is 128000 tokens. " 348 "Please reduce the length of the messages.\"}}" 349 ) 350 err_400.status_code = 400 351 ok_resp = _mock_response(content="Recovered after real compression", finish_reason="stop") 352 353 request_payloads = [] 354 355 def _side_effect(**kwargs): 356 request_payloads.append(kwargs) 357 if len(request_payloads) == 1: 358 raise err_400 359 return ok_resp 360 361 agent.client.chat.completions.create.side_effect = _side_effect 362 363 prefill = [ 364 {"role": "user", "content": "previous question"}, 365 {"role": "assistant", "content": "previous answer"}, 366 ] 367 368 with ( 369 patch.object(agent, "_compress_context") as mock_compress, 370 patch.object(agent, "_persist_session"), 371 patch.object(agent, "_save_trajectory"), 372 patch.object(agent, "_cleanup_task_resources"), 373 ): 374 mock_compress.return_value = ( 375 [{"role": "user", "content": "compressed summary"}], 376 "compressed prompt", 377 ) 378 result = agent.run_conversation("hello", conversation_history=prefill) 379 380 assert result["completed"] is True 381 assert len(request_payloads) == 2 382 assert len(request_payloads[1]["messages"]) < len(request_payloads[0]["messages"]) 383 assert request_payloads[1]["messages"][0] == { 384 "role": "system", 385 "content": "compressed prompt", 386 } 387 assert request_payloads[1]["messages"][1] == { 388 "role": "user", 389 "content": "compressed summary", 390 } 391 392 def test_413_cannot_compress_further(self, agent): 393 """When compression can't reduce messages, return partial result.""" 394 err_413 = _make_413_error() 395 agent.client.chat.completions.create.side_effect = [err_413] 396 397 with ( 398 patch.object(agent, "_compress_context") as mock_compress, 399 patch.object(agent, "_persist_session"), 400 patch.object(agent, "_save_trajectory"), 401 patch.object(agent, "_cleanup_task_resources"), 402 ): 403 # Compression returns same number of messages → can't compress further 404 mock_compress.return_value = ( 405 [{"role": "user", "content": "hello"}], 406 "same prompt", 407 ) 408 result = agent.run_conversation("hello") 409 410 assert result["completed"] is False 411 assert result.get("partial") is True 412 assert "413" in result["error"] 413 414 415 class TestPreflightCompression: 416 """Preflight compression should compress history before the first API call.""" 417 418 def test_preflight_compresses_oversized_history(self, agent): 419 """When loaded history exceeds the model's context threshold, compress before API call.""" 420 agent.compression_enabled = True 421 # Set a small context so the history is "oversized", but large enough 422 # that the compressed result (2 short messages) fits in a single pass. 423 agent.context_compressor.context_length = 2000 424 agent.context_compressor.threshold_tokens = 200 425 426 # Build a history that will be large enough to trigger preflight 427 # (each message ~50 chars ≈ 13 tokens, 40 messages ≈ 520 tokens > 200 threshold) 428 big_history = [] 429 for i in range(20): 430 big_history.append({"role": "user", "content": f"Message number {i} with some extra text padding"}) 431 big_history.append({"role": "assistant", "content": f"Response number {i} with extra padding here"}) 432 433 ok_resp = _mock_response(content="After preflight", finish_reason="stop") 434 agent.client.chat.completions.create.side_effect = [ok_resp] 435 status_messages = [] 436 agent.status_callback = lambda ev, msg: status_messages.append((ev, msg)) 437 438 with ( 439 patch.object(agent, "_compress_context") as mock_compress, 440 patch.object(agent, "_persist_session"), 441 patch.object(agent, "_save_trajectory"), 442 patch.object(agent, "_cleanup_task_resources"), 443 ): 444 # Simulate compression reducing messages to a small set that fits 445 mock_compress.return_value = ( 446 [ 447 {"role": "user", "content": f"{SUMMARY_PREFIX}\nPrevious conversation"}, 448 {"role": "user", "content": "hello"}, 449 ], 450 "new system prompt", 451 ) 452 result = agent.run_conversation("hello", conversation_history=big_history) 453 454 # Preflight compression is a multi-pass loop (up to 3 passes for very 455 # large sessions, breaking when no further reduction is possible). 456 # First pass must have received the full oversized history. 457 assert mock_compress.call_count >= 1, "Preflight compression never ran" 458 first_call_messages = mock_compress.call_args_list[0].args[0] 459 assert len(first_call_messages) >= 40, ( 460 f"First preflight pass should see the full history, got " 461 f"{len(first_call_messages)} messages" 462 ) 463 assert result["completed"] is True 464 assert result["final_response"] == "After preflight" 465 assert any( 466 ev == "lifecycle" and "Preflight compression" in msg 467 for ev, msg in status_messages 468 ) 469 470 def test_no_preflight_when_under_threshold(self, agent): 471 """When history fits within context, no preflight compression needed.""" 472 agent.compression_enabled = True 473 # Large context — history easily fits 474 agent.context_compressor.context_length = 1000000 475 agent.context_compressor.threshold_tokens = 850000 476 477 small_history = [ 478 {"role": "user", "content": "hi"}, 479 {"role": "assistant", "content": "hello"}, 480 ] 481 482 ok_resp = _mock_response(content="No compression needed", finish_reason="stop") 483 agent.client.chat.completions.create.side_effect = [ok_resp] 484 485 with ( 486 patch.object(agent, "_compress_context") as mock_compress, 487 patch.object(agent, "_persist_session"), 488 patch.object(agent, "_save_trajectory"), 489 patch.object(agent, "_cleanup_task_resources"), 490 ): 491 result = agent.run_conversation("hello", conversation_history=small_history) 492 493 mock_compress.assert_not_called() 494 assert result["completed"] is True 495 496 def test_no_preflight_when_compression_disabled(self, agent): 497 """Preflight should not run when compression is disabled.""" 498 agent.compression_enabled = False 499 agent.context_compressor.context_length = 100 500 agent.context_compressor.threshold_tokens = 85 501 502 big_history = [ 503 {"role": "user", "content": "x" * 1000}, 504 {"role": "assistant", "content": "y" * 1000}, 505 ] * 10 506 507 ok_resp = _mock_response(content="OK", finish_reason="stop") 508 agent.client.chat.completions.create.side_effect = [ok_resp] 509 510 with ( 511 patch.object(agent, "_compress_context") as mock_compress, 512 patch.object(agent, "_persist_session"), 513 patch.object(agent, "_save_trajectory"), 514 patch.object(agent, "_cleanup_task_resources"), 515 ): 516 result = agent.run_conversation("hello", conversation_history=big_history) 517 518 mock_compress.assert_not_called() 519 520 521 class TestToolResultPreflightCompression: 522 """Compression should trigger when tool results push context past the threshold.""" 523 524 def test_large_tool_results_trigger_compression(self, agent): 525 """When tool results push estimated tokens past threshold, compress before next call.""" 526 agent.compression_enabled = True 527 agent.context_compressor.context_length = 200_000 528 agent.context_compressor.threshold_tokens = 130_000 # below the 135k reported usage 529 agent.context_compressor.last_prompt_tokens = 130_000 530 agent.context_compressor.last_completion_tokens = 5_000 531 532 tc = SimpleNamespace( 533 id="tc1", type="function", 534 function=SimpleNamespace(name="web_search", arguments='{"query":"test"}'), 535 ) 536 tool_resp = _mock_response( 537 content=None, finish_reason="stop", tool_calls=[tc], 538 usage={"prompt_tokens": 130_000, "completion_tokens": 5_000, "total_tokens": 135_000}, 539 ) 540 ok_resp = _mock_response( 541 content="Done after compression", finish_reason="stop", 542 usage={"prompt_tokens": 50_000, "completion_tokens": 100, "total_tokens": 50_100}, 543 ) 544 agent.client.chat.completions.create.side_effect = [tool_resp, ok_resp] 545 large_result = "x" * 100_000 546 547 with ( 548 patch("run_agent.handle_function_call", return_value=large_result), 549 patch.object(agent, "_compress_context") as mock_compress, 550 patch.object(agent, "_persist_session"), 551 patch.object(agent, "_save_trajectory"), 552 patch.object(agent, "_cleanup_task_resources"), 553 ): 554 mock_compress.return_value = ( 555 [{"role": "user", "content": "hello"}], "compressed prompt", 556 ) 557 result = agent.run_conversation("hello") 558 559 mock_compress.assert_called_once() 560 assert result["completed"] is True 561 562 def test_anthropic_prompt_too_long_safety_net(self, agent): 563 """Anthropic 'prompt is too long' error triggers compression as safety net.""" 564 err_400 = Exception( 565 "Error code: 400 - {'type': 'error', 'error': {'type': 'invalid_request_error', " 566 "'message': 'prompt is too long: 233153 tokens > 200000 maximum'}}" 567 ) 568 err_400.status_code = 400 569 ok_resp = _mock_response(content="Recovered", finish_reason="stop") 570 agent.client.chat.completions.create.side_effect = [err_400, ok_resp] 571 prefill = [ 572 {"role": "user", "content": "previous"}, 573 {"role": "assistant", "content": "answer"}, 574 ] 575 576 with ( 577 patch.object(agent, "_compress_context") as mock_compress, 578 patch.object(agent, "_persist_session"), 579 patch.object(agent, "_save_trajectory"), 580 patch.object(agent, "_cleanup_task_resources"), 581 ): 582 mock_compress.return_value = ( 583 [{"role": "user", "content": "hello"}], "compressed", 584 ) 585 result = agent.run_conversation("hello", conversation_history=prefill) 586 587 mock_compress.assert_called_once() 588 assert result["completed"] is True