test_memory_sync_interrupted.py
1 """Regression guard for #15218 — external memory sync must skip interrupted turns. 2 3 Before this fix, ``run_conversation`` called 4 ``memory_manager.sync_all(original_user_message, final_response)`` at the 5 end of every turn where both args were present. That gate didn't check 6 the ``interrupted`` flag, so an external memory backend received partial 7 assistant output, aborted tool chains, or mid-stream resets as durable 8 conversational truth. Downstream recall then treated that not-yet-real 9 state as if the user had seen it complete. 10 11 The fix is ``AIAgent._sync_external_memory_for_turn`` — a small helper 12 that replaces the inline block and returns early when ``interrupted`` 13 is True (regardless of whether ``final_response`` and 14 ``original_user_message`` happen to be populated). 15 16 These tests exercise the helper directly on a bare ``AIAgent`` built 17 via ``__new__`` so the full ``run_conversation`` machinery isn't needed 18 — the method is pure logic and three state arguments. 19 """ 20 from unittest.mock import MagicMock 21 22 import pytest 23 24 25 def _bare_agent(): 26 """Build an ``AIAgent`` with only the attributes 27 ``_sync_external_memory_for_turn`` touches — matches the bare-agent 28 pattern used across ``tests/run_agent/test_interrupt_propagation.py``. 29 """ 30 from run_agent import AIAgent 31 32 agent = AIAgent.__new__(AIAgent) 33 agent._memory_manager = MagicMock() 34 # session_id is now propagated into sync_all / queue_prefetch_all so 35 # providers that cache per-session state can update it mid-process 36 # (see #6672). 37 agent.session_id = "test_session_001" 38 return agent 39 40 41 class TestSyncExternalMemoryForTurn: 42 # --- Interrupt guard (the #15218 fix) ------------------------------- 43 44 def test_interrupted_turn_does_not_sync(self): 45 """The whole point of #15218: even with a final_response and a 46 user message, an interrupted turn must NOT reach the memory 47 backend.""" 48 agent = _bare_agent() 49 agent._sync_external_memory_for_turn( 50 original_user_message="What time is it?", 51 final_response="It is 3pm.", # looks complete — but partial 52 interrupted=True, 53 ) 54 agent._memory_manager.sync_all.assert_not_called() 55 agent._memory_manager.queue_prefetch_all.assert_not_called() 56 57 def test_interrupted_turn_skips_even_when_response_is_full(self): 58 """A long, seemingly-complete assistant response is still 59 partial if ``interrupted`` is True — an interrupt may have 60 landed between the streamed reply and the next tool call. The 61 memory backend has no way to distinguish on its own, so we must 62 gate at the source.""" 63 agent = _bare_agent() 64 agent._sync_external_memory_for_turn( 65 original_user_message="Plan a trip to Lisbon", 66 final_response="Here's a detailed 7-day itinerary: [...]", 67 interrupted=True, 68 ) 69 agent._memory_manager.sync_all.assert_not_called() 70 71 # --- Normal completed turn still syncs ------------------------------ 72 73 def test_completed_turn_syncs_and_queues_prefetch(self): 74 """Regression guard for the positive path: a normal completed 75 turn must still trigger both ``sync_all`` AND 76 ``queue_prefetch_all`` — otherwise the external memory backend 77 never learns about anything and every user complains. 78 """ 79 agent = _bare_agent() 80 agent._sync_external_memory_for_turn( 81 original_user_message="What's the weather in Paris?", 82 final_response="It's sunny and 22°C.", 83 interrupted=False, 84 ) 85 agent._memory_manager.sync_all.assert_called_once_with( 86 "What's the weather in Paris?", "It's sunny and 22°C.", 87 session_id="test_session_001", 88 ) 89 agent._memory_manager.queue_prefetch_all.assert_called_once_with( 90 "What's the weather in Paris?", 91 session_id="test_session_001", 92 ) 93 94 # --- Edge cases (pre-existing behaviour preserved) ------------------ 95 96 def test_no_final_response_skips(self): 97 """If the model produced no final_response (e.g. tool-only turn 98 that never resolved), we must not fabricate an empty sync.""" 99 agent = _bare_agent() 100 agent._sync_external_memory_for_turn( 101 original_user_message="Hello", 102 final_response=None, 103 interrupted=False, 104 ) 105 agent._memory_manager.sync_all.assert_not_called() 106 107 def test_no_original_user_message_skips(self): 108 """No user-origin message means this wasn't a user turn (e.g. 109 a system-initiated refresh). Don't sync an assistant-only 110 exchange as if a user said something.""" 111 agent = _bare_agent() 112 agent._sync_external_memory_for_turn( 113 original_user_message=None, 114 final_response="Proactive notification text", 115 interrupted=False, 116 ) 117 agent._memory_manager.sync_all.assert_not_called() 118 119 def test_no_memory_manager_is_a_no_op(self): 120 """Sessions without an external memory manager must not crash 121 or try to call .sync_all on None.""" 122 from run_agent import AIAgent 123 124 agent = AIAgent.__new__(AIAgent) 125 agent._memory_manager = None 126 127 # Must not raise. 128 agent._sync_external_memory_for_turn( 129 original_user_message="hi", 130 final_response="hey", 131 interrupted=False, 132 ) 133 134 # --- Exception safety ---------------------------------------------- 135 136 def test_sync_exception_is_swallowed(self): 137 """External memory providers are best-effort; a misconfigured 138 or offline backend must not block the user from seeing their 139 response by propagating the exception up.""" 140 agent = _bare_agent() 141 agent._memory_manager.sync_all.side_effect = RuntimeError( 142 "backend unreachable" 143 ) 144 145 # Must not raise. 146 agent._sync_external_memory_for_turn( 147 original_user_message="hi", 148 final_response="hey", 149 interrupted=False, 150 ) 151 # sync_all was attempted. 152 agent._memory_manager.sync_all.assert_called_once() 153 154 def test_prefetch_exception_is_swallowed(self): 155 """Same best-effort contract applies to the prefetch step — a 156 failure in queue_prefetch_all must not bubble out.""" 157 agent = _bare_agent() 158 agent._memory_manager.queue_prefetch_all.side_effect = RuntimeError( 159 "prefetch worker dead" 160 ) 161 162 # Must not raise. 163 agent._sync_external_memory_for_turn( 164 original_user_message="hi", 165 final_response="hey", 166 interrupted=False, 167 ) 168 # sync_all still happened before the prefetch blew up. 169 agent._memory_manager.sync_all.assert_called_once() 170 171 # --- The specific matrix the reporter asked about ------------------ 172 173 @pytest.mark.parametrize("interrupted,final,user,expect_sync", [ 174 (False, "resp", "user", True), # normal completed → sync 175 (True, "resp", "user", False), # interrupted → skip (the fix) 176 (False, None, "user", False), # no response → skip 177 (False, "resp", None, False), # no user msg → skip 178 (True, None, "user", False), # interrupted + no response → skip 179 (True, "resp", None, False), # interrupted + no user → skip 180 (False, None, None, False), # nothing → skip 181 (True, None, None, False), # interrupted + nothing → skip 182 ]) 183 def test_sync_matrix(self, interrupted, final, user, expect_sync): 184 agent = _bare_agent() 185 agent._sync_external_memory_for_turn( 186 original_user_message=user, 187 final_response=final, 188 interrupted=interrupted, 189 ) 190 if expect_sync: 191 agent._memory_manager.sync_all.assert_called_once() 192 agent._memory_manager.queue_prefetch_all.assert_called_once() 193 else: 194 agent._memory_manager.sync_all.assert_not_called() 195 agent._memory_manager.queue_prefetch_all.assert_not_called()