/ tests / gateway / test_stream_consumer_fresh_final.py
test_stream_consumer_fresh_final.py
  1  """Regression tests for the fresh-final-for-long-lived-previews path.
  2  
  3  Ported from openclaw/openclaw#72038.  When a streamed preview has been
  4  visible long enough that the platform's edit timestamp would be
  5  noticeably stale by completion time, the stream consumer delivers the
  6  final reply as a brand-new message and best-effort deletes the old
  7  preview.  This makes Telegram's visible timestamp reflect completion
  8  time instead of first-token time.
  9  """
 10  
 11  from __future__ import annotations
 12  
 13  from types import SimpleNamespace
 14  from unittest.mock import AsyncMock, MagicMock
 15  
 16  import pytest
 17  
 18  from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
 19  
 20  
 21  def _make_adapter(*, supports_delete: bool = True) -> MagicMock:
 22      """Build a minimal MagicMock adapter wired for send/edit/delete."""
 23      adapter = MagicMock()
 24      adapter.REQUIRES_EDIT_FINALIZE = False
 25      adapter.MAX_MESSAGE_LENGTH = 4096
 26      adapter.send = AsyncMock(return_value=SimpleNamespace(
 27          success=True, message_id="initial_preview",
 28      ))
 29      adapter.edit_message = AsyncMock(return_value=SimpleNamespace(
 30          success=True, message_id="initial_preview",
 31      ))
 32      if supports_delete:
 33          adapter.delete_message = AsyncMock(return_value=True)
 34      else:
 35          # Adapter without the optional delete_message method — fresh-final
 36          # should still work, it just leaves the stale preview in place.
 37          del adapter.delete_message  # type: ignore[attr-defined]
 38      return adapter
 39  
 40  
 41  class TestFreshFinalForLongLivedPreviews:
 42      """openclaw#72038 port — send fresh final when preview is old."""
 43  
 44      @pytest.mark.asyncio
 45      async def test_disabled_by_default_still_edits_in_place(self):
 46          """``fresh_final_after_seconds=0`` preserves the legacy edit path."""
 47          adapter = _make_adapter()
 48          consumer = GatewayStreamConsumer(
 49              adapter=adapter,
 50              chat_id="chat",
 51              config=StreamConsumerConfig(fresh_final_after_seconds=0.0),
 52          )
 53          await consumer._send_or_edit("hello")
 54          # Pretend the preview has been visible for a long time.
 55          consumer._message_created_ts = 0.0  # far in the past
 56          await consumer._send_or_edit("hello world", finalize=True)
 57          # Should edit, not send a fresh message.
 58          assert adapter.send.call_count == 1  # only the initial send
 59          adapter.edit_message.assert_called_once()
 60  
 61      @pytest.mark.asyncio
 62      async def test_short_lived_preview_edits_in_place(self):
 63          """Finalizing a preview younger than the threshold → normal edit."""
 64          adapter = _make_adapter()
 65          consumer = GatewayStreamConsumer(
 66              adapter=adapter,
 67              chat_id="chat",
 68              config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
 69          )
 70          await consumer._send_or_edit("hello")
 71          # Preview is "new" — leave _message_created_ts at its real value.
 72          await consumer._send_or_edit("hello world", finalize=True)
 73          assert adapter.send.call_count == 1
 74          adapter.edit_message.assert_called_once()
 75  
 76      @pytest.mark.asyncio
 77      async def test_long_lived_preview_sends_fresh_final(self):
 78          """Finalizing a preview older than the threshold → fresh send."""
 79          adapter = _make_adapter()
 80          adapter.send.side_effect = [
 81              SimpleNamespace(success=True, message_id="initial_preview"),
 82              SimpleNamespace(success=True, message_id="fresh_final"),
 83          ]
 84          consumer = GatewayStreamConsumer(
 85              adapter=adapter,
 86              chat_id="chat",
 87              config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
 88          )
 89          await consumer._send_or_edit("hello")
 90          # Force the preview to look stale (visible for > 60s).
 91          consumer._message_created_ts = 0.0  # zero = ~uptime seconds old
 92          await consumer._send_or_edit("hello world", finalize=True)
 93          # Fresh send happened; no edit of the old preview.
 94          assert adapter.send.call_count == 2
 95          adapter.edit_message.assert_not_called()
 96          # The old preview was deleted as cleanup.
 97          adapter.delete_message.assert_awaited_once_with("chat", "initial_preview")
 98          # State was updated to the new message id.
 99          assert consumer._message_id == "fresh_final"
100          assert consumer._final_response_sent is True
101  
102      @pytest.mark.asyncio
103      async def test_fresh_final_without_delete_support_is_best_effort(self):
104          """Adapter lacking ``delete_message`` still gets the fresh send."""
105          adapter = _make_adapter(supports_delete=False)
106          adapter.send.side_effect = [
107              SimpleNamespace(success=True, message_id="initial_preview"),
108              SimpleNamespace(success=True, message_id="fresh_final"),
109          ]
110          consumer = GatewayStreamConsumer(
111              adapter=adapter,
112              chat_id="chat",
113              config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
114          )
115          await consumer._send_or_edit("hello")
116          consumer._message_created_ts = 0.0
117          await consumer._send_or_edit("hello world", finalize=True)
118          assert adapter.send.call_count == 2
119          adapter.edit_message.assert_not_called()
120          # No delete attempt — just the fresh send.
121          assert consumer._message_id == "fresh_final"
122  
123      @pytest.mark.asyncio
124      async def test_fresh_final_fallback_to_edit_on_send_failure(self):
125          """If the fresh send fails, fall back to the normal edit path."""
126          adapter = _make_adapter()
127          adapter.send.side_effect = [
128              SimpleNamespace(success=True, message_id="initial_preview"),
129              SimpleNamespace(success=False, error="network"),
130          ]
131          consumer = GatewayStreamConsumer(
132              adapter=adapter,
133              chat_id="chat",
134              config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
135          )
136          await consumer._send_or_edit("hello")
137          consumer._message_created_ts = 0.0
138          ok = await consumer._send_or_edit("hello world", finalize=True)
139          # Fresh send was attempted and failed → edit happened instead.
140          assert adapter.send.call_count == 2
141          adapter.edit_message.assert_called_once()
142          assert ok is True
143  
144      @pytest.mark.asyncio
145      async def test_only_finalize_triggers_fresh_final(self):
146          """Intermediate edits (``finalize=False``) never switch to fresh send."""
147          adapter = _make_adapter()
148          consumer = GatewayStreamConsumer(
149              adapter=adapter,
150              chat_id="chat",
151              config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
152          )
153          await consumer._send_or_edit("hello")
154          consumer._message_created_ts = 0.0  # stale
155          await consumer._send_or_edit("hello partial")  # no finalize
156          assert adapter.send.call_count == 1
157          adapter.edit_message.assert_called_once()
158  
159      @pytest.mark.asyncio
160      async def test_no_edit_sentinel_is_not_affected(self):
161          """Platforms with the ``__no_edit__`` sentinel never go fresh-final."""
162          adapter = _make_adapter()
163          adapter.send.return_value = SimpleNamespace(success=True, message_id=None)
164          consumer = GatewayStreamConsumer(
165              adapter=adapter,
166              chat_id="chat",
167              config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
168          )
169          await consumer._send_or_edit("hello")
170          assert consumer._message_id == "__no_edit__"
171          assert consumer._message_created_ts is None
172          # Even with finalize=True, no fresh send — the sentinel gates it.
173          assert consumer._should_send_fresh_final() is False
174  
175  
176  class TestStreamConsumerConfigFreshFinalField:
177      """The dataclass field must exist and default to 0 (disabled)."""
178  
179      def test_default_is_disabled(self):
180          cfg = StreamConsumerConfig()
181          assert cfg.fresh_final_after_seconds == 0.0
182  
183      def test_field_is_configurable(self):
184          cfg = StreamConsumerConfig(fresh_final_after_seconds=120.0)
185          assert cfg.fresh_final_after_seconds == 120.0
186  
187  
188  class TestStreamingConfigFreshFinalField:
189      """The gateway-level StreamingConfig carries the setting."""
190  
191      def test_default_enables_with_60s(self):
192          from gateway.config import StreamingConfig
193          cfg = StreamingConfig()
194          assert cfg.fresh_final_after_seconds == 60.0
195  
196      def test_from_dict_uses_default_when_missing(self):
197          from gateway.config import StreamingConfig
198          cfg = StreamingConfig.from_dict({"enabled": True})
199          assert cfg.fresh_final_after_seconds == 60.0
200  
201      def test_from_dict_respects_explicit_zero(self):
202          from gateway.config import StreamingConfig
203          cfg = StreamingConfig.from_dict({
204              "enabled": True,
205              "fresh_final_after_seconds": 0,
206          })
207          assert cfg.fresh_final_after_seconds == 0.0
208  
209      def test_to_dict_round_trip(self):
210          from gateway.config import StreamingConfig
211          original = StreamingConfig(fresh_final_after_seconds=90.0)
212          restored = StreamingConfig.from_dict(original.to_dict())
213          assert restored.fresh_final_after_seconds == 90.0
214  
215  
216  class TestTelegramAdapterDeleteMessage:
217      """Contract: Telegram adapter implements ``delete_message``."""
218  
219      def test_delete_message_method_exists(self):
220          telegram = pytest.importorskip("gateway.platforms.telegram")
221          import inspect
222          cls = telegram.TelegramAdapter
223          assert hasattr(cls, "delete_message"), (
224              "TelegramAdapter.delete_message is required for the fresh-final "
225              "cleanup path (openclaw/openclaw#72038 port)."
226          )
227          sig = inspect.signature(cls.delete_message)
228          params = list(sig.parameters)
229          assert params[:3] == ["self", "chat_id", "message_id"]
230  
231      def test_base_adapter_default_returns_false(self):
232          """BasePlatformAdapter.delete_message default = no-op returning False."""
233          from gateway.platforms.base import BasePlatformAdapter
234          import inspect
235          sig = inspect.signature(BasePlatformAdapter.delete_message)
236          assert list(sig.parameters)[:3] == ["self", "chat_id", "message_id"]