/ tests / run_agent / test_memory_sync_interrupted.py
test_memory_sync_interrupted.py
  1  """Regression guard for #15218 — external memory sync must skip interrupted turns.
  2  
  3  Before this fix, ``run_conversation`` called
  4  ``memory_manager.sync_all(original_user_message, final_response)`` at the
  5  end of every turn where both args were present.  That gate didn't check
  6  the ``interrupted`` flag, so an external memory backend received partial
  7  assistant output, aborted tool chains, or mid-stream resets as durable
  8  conversational truth.  Downstream recall then treated that not-yet-real
  9  state as if the user had seen it complete.
 10  
 11  The fix is ``AIAgent._sync_external_memory_for_turn`` — a small helper
 12  that replaces the inline block and returns early when ``interrupted``
 13  is True (regardless of whether ``final_response`` and
 14  ``original_user_message`` happen to be populated).
 15  
 16  These tests exercise the helper directly on a bare ``AIAgent`` built
 17  via ``__new__`` so the full ``run_conversation`` machinery isn't needed
 18  — the method is pure logic and three state arguments.
 19  """
 20  from unittest.mock import MagicMock
 21  
 22  import pytest
 23  
 24  
 25  def _bare_agent():
 26      """Build an ``AIAgent`` with only the attributes
 27      ``_sync_external_memory_for_turn`` touches — matches the bare-agent
 28      pattern used across ``tests/run_agent/test_interrupt_propagation.py``.
 29      """
 30      from run_agent import AIAgent
 31  
 32      agent = AIAgent.__new__(AIAgent)
 33      agent._memory_manager = MagicMock()
 34      # session_id is now propagated into sync_all / queue_prefetch_all so
 35      # providers that cache per-session state can update it mid-process
 36      # (see #6672).
 37      agent.session_id = "test_session_001"
 38      return agent
 39  
 40  
 41  class TestSyncExternalMemoryForTurn:
 42      # --- Interrupt guard (the #15218 fix) -------------------------------
 43  
 44      def test_interrupted_turn_does_not_sync(self):
 45          """The whole point of #15218: even with a final_response and a
 46          user message, an interrupted turn must NOT reach the memory
 47          backend."""
 48          agent = _bare_agent()
 49          agent._sync_external_memory_for_turn(
 50              original_user_message="What time is it?",
 51              final_response="It is 3pm.",  # looks complete — but partial
 52              interrupted=True,
 53          )
 54          agent._memory_manager.sync_all.assert_not_called()
 55          agent._memory_manager.queue_prefetch_all.assert_not_called()
 56  
 57      def test_interrupted_turn_skips_even_when_response_is_full(self):
 58          """A long, seemingly-complete assistant response is still
 59          partial if ``interrupted`` is True — an interrupt may have
 60          landed between the streamed reply and the next tool call.  The
 61          memory backend has no way to distinguish on its own, so we must
 62          gate at the source."""
 63          agent = _bare_agent()
 64          agent._sync_external_memory_for_turn(
 65              original_user_message="Plan a trip to Lisbon",
 66              final_response="Here's a detailed 7-day itinerary: [...]",
 67              interrupted=True,
 68          )
 69          agent._memory_manager.sync_all.assert_not_called()
 70  
 71      # --- Normal completed turn still syncs ------------------------------
 72  
 73      def test_completed_turn_syncs_and_queues_prefetch(self):
 74          """Regression guard for the positive path: a normal completed
 75          turn must still trigger both ``sync_all`` AND
 76          ``queue_prefetch_all`` — otherwise the external memory backend
 77          never learns about anything and every user complains.
 78          """
 79          agent = _bare_agent()
 80          agent._sync_external_memory_for_turn(
 81              original_user_message="What's the weather in Paris?",
 82              final_response="It's sunny and 22°C.",
 83              interrupted=False,
 84          )
 85          agent._memory_manager.sync_all.assert_called_once_with(
 86              "What's the weather in Paris?", "It's sunny and 22°C.",
 87              session_id="test_session_001",
 88          )
 89          agent._memory_manager.queue_prefetch_all.assert_called_once_with(
 90              "What's the weather in Paris?",
 91              session_id="test_session_001",
 92          )
 93  
 94      # --- Edge cases (pre-existing behaviour preserved) ------------------
 95  
 96      def test_no_final_response_skips(self):
 97          """If the model produced no final_response (e.g. tool-only turn
 98          that never resolved), we must not fabricate an empty sync."""
 99          agent = _bare_agent()
100          agent._sync_external_memory_for_turn(
101              original_user_message="Hello",
102              final_response=None,
103              interrupted=False,
104          )
105          agent._memory_manager.sync_all.assert_not_called()
106  
107      def test_no_original_user_message_skips(self):
108          """No user-origin message means this wasn't a user turn (e.g.
109          a system-initiated refresh).  Don't sync an assistant-only
110          exchange as if a user said something."""
111          agent = _bare_agent()
112          agent._sync_external_memory_for_turn(
113              original_user_message=None,
114              final_response="Proactive notification text",
115              interrupted=False,
116          )
117          agent._memory_manager.sync_all.assert_not_called()
118  
119      def test_no_memory_manager_is_a_no_op(self):
120          """Sessions without an external memory manager must not crash
121          or try to call .sync_all on None."""
122          from run_agent import AIAgent
123  
124          agent = AIAgent.__new__(AIAgent)
125          agent._memory_manager = None
126  
127          # Must not raise.
128          agent._sync_external_memory_for_turn(
129              original_user_message="hi",
130              final_response="hey",
131              interrupted=False,
132          )
133  
134      # --- Exception safety ----------------------------------------------
135  
136      def test_sync_exception_is_swallowed(self):
137          """External memory providers are best-effort; a misconfigured
138          or offline backend must not block the user from seeing their
139          response by propagating the exception up."""
140          agent = _bare_agent()
141          agent._memory_manager.sync_all.side_effect = RuntimeError(
142              "backend unreachable"
143          )
144  
145          # Must not raise.
146          agent._sync_external_memory_for_turn(
147              original_user_message="hi",
148              final_response="hey",
149              interrupted=False,
150          )
151          # sync_all was attempted.
152          agent._memory_manager.sync_all.assert_called_once()
153  
154      def test_prefetch_exception_is_swallowed(self):
155          """Same best-effort contract applies to the prefetch step — a
156          failure in queue_prefetch_all must not bubble out."""
157          agent = _bare_agent()
158          agent._memory_manager.queue_prefetch_all.side_effect = RuntimeError(
159              "prefetch worker dead"
160          )
161  
162          # Must not raise.
163          agent._sync_external_memory_for_turn(
164              original_user_message="hi",
165              final_response="hey",
166              interrupted=False,
167          )
168          # sync_all still happened before the prefetch blew up.
169          agent._memory_manager.sync_all.assert_called_once()
170  
171      # --- The specific matrix the reporter asked about ------------------
172  
173      @pytest.mark.parametrize("interrupted,final,user,expect_sync", [
174          (False, "resp", "user",  True),   # normal completed → sync
175          (True,  "resp", "user",  False),  # interrupted → skip (the fix)
176          (False, None,   "user",  False),  # no response → skip
177          (False, "resp", None,    False),  # no user msg → skip
178          (True,  None,   "user",  False),  # interrupted + no response → skip
179          (True,  "resp", None,    False),  # interrupted + no user → skip
180          (False, None,   None,    False),  # nothing → skip
181          (True,  None,   None,    False),  # interrupted + nothing → skip
182      ])
183      def test_sync_matrix(self, interrupted, final, user, expect_sync):
184          agent = _bare_agent()
185          agent._sync_external_memory_for_turn(
186              original_user_message=user,
187              final_response=final,
188              interrupted=interrupted,
189          )
190          if expect_sync:
191              agent._memory_manager.sync_all.assert_called_once()
192              agent._memory_manager.queue_prefetch_all.assert_called_once()
193          else:
194              agent._memory_manager.sync_all.assert_not_called()
195              agent._memory_manager.queue_prefetch_all.assert_not_called()