test_stream_consumer_fresh_final.py
1 """Regression tests for the fresh-final-for-long-lived-previews path. 2 3 Ported from openclaw/openclaw#72038. When a streamed preview has been 4 visible long enough that the platform's edit timestamp would be 5 noticeably stale by completion time, the stream consumer delivers the 6 final reply as a brand-new message and best-effort deletes the old 7 preview. This makes Telegram's visible timestamp reflect completion 8 time instead of first-token time. 9 """ 10 11 from __future__ import annotations 12 13 from types import SimpleNamespace 14 from unittest.mock import AsyncMock, MagicMock 15 16 import pytest 17 18 from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig 19 20 21 def _make_adapter(*, supports_delete: bool = True) -> MagicMock: 22 """Build a minimal MagicMock adapter wired for send/edit/delete.""" 23 adapter = MagicMock() 24 adapter.REQUIRES_EDIT_FINALIZE = False 25 adapter.MAX_MESSAGE_LENGTH = 4096 26 adapter.send = AsyncMock(return_value=SimpleNamespace( 27 success=True, message_id="initial_preview", 28 )) 29 adapter.edit_message = AsyncMock(return_value=SimpleNamespace( 30 success=True, message_id="initial_preview", 31 )) 32 if supports_delete: 33 adapter.delete_message = AsyncMock(return_value=True) 34 else: 35 # Adapter without the optional delete_message method — fresh-final 36 # should still work, it just leaves the stale preview in place. 37 del adapter.delete_message # type: ignore[attr-defined] 38 return adapter 39 40 41 class TestFreshFinalForLongLivedPreviews: 42 """openclaw#72038 port — send fresh final when preview is old.""" 43 44 @pytest.mark.asyncio 45 async def test_disabled_by_default_still_edits_in_place(self): 46 """``fresh_final_after_seconds=0`` preserves the legacy edit path.""" 47 adapter = _make_adapter() 48 consumer = GatewayStreamConsumer( 49 adapter=adapter, 50 chat_id="chat", 51 config=StreamConsumerConfig(fresh_final_after_seconds=0.0), 52 ) 53 await consumer._send_or_edit("hello") 54 # Pretend the preview has been visible for a long time. 55 consumer._message_created_ts = 0.0 # far in the past 56 await consumer._send_or_edit("hello world", finalize=True) 57 # Should edit, not send a fresh message. 58 assert adapter.send.call_count == 1 # only the initial send 59 adapter.edit_message.assert_called_once() 60 61 @pytest.mark.asyncio 62 async def test_short_lived_preview_edits_in_place(self): 63 """Finalizing a preview younger than the threshold → normal edit.""" 64 adapter = _make_adapter() 65 consumer = GatewayStreamConsumer( 66 adapter=adapter, 67 chat_id="chat", 68 config=StreamConsumerConfig(fresh_final_after_seconds=60.0), 69 ) 70 await consumer._send_or_edit("hello") 71 # Preview is "new" — leave _message_created_ts at its real value. 72 await consumer._send_or_edit("hello world", finalize=True) 73 assert adapter.send.call_count == 1 74 adapter.edit_message.assert_called_once() 75 76 @pytest.mark.asyncio 77 async def test_long_lived_preview_sends_fresh_final(self): 78 """Finalizing a preview older than the threshold → fresh send.""" 79 adapter = _make_adapter() 80 adapter.send.side_effect = [ 81 SimpleNamespace(success=True, message_id="initial_preview"), 82 SimpleNamespace(success=True, message_id="fresh_final"), 83 ] 84 consumer = GatewayStreamConsumer( 85 adapter=adapter, 86 chat_id="chat", 87 config=StreamConsumerConfig(fresh_final_after_seconds=60.0), 88 ) 89 await consumer._send_or_edit("hello") 90 # Force the preview to look stale (visible for > 60s). 91 consumer._message_created_ts = 0.0 # zero = ~uptime seconds old 92 await consumer._send_or_edit("hello world", finalize=True) 93 # Fresh send happened; no edit of the old preview. 94 assert adapter.send.call_count == 2 95 adapter.edit_message.assert_not_called() 96 # The old preview was deleted as cleanup. 97 adapter.delete_message.assert_awaited_once_with("chat", "initial_preview") 98 # State was updated to the new message id. 99 assert consumer._message_id == "fresh_final" 100 assert consumer._final_response_sent is True 101 102 @pytest.mark.asyncio 103 async def test_fresh_final_without_delete_support_is_best_effort(self): 104 """Adapter lacking ``delete_message`` still gets the fresh send.""" 105 adapter = _make_adapter(supports_delete=False) 106 adapter.send.side_effect = [ 107 SimpleNamespace(success=True, message_id="initial_preview"), 108 SimpleNamespace(success=True, message_id="fresh_final"), 109 ] 110 consumer = GatewayStreamConsumer( 111 adapter=adapter, 112 chat_id="chat", 113 config=StreamConsumerConfig(fresh_final_after_seconds=60.0), 114 ) 115 await consumer._send_or_edit("hello") 116 consumer._message_created_ts = 0.0 117 await consumer._send_or_edit("hello world", finalize=True) 118 assert adapter.send.call_count == 2 119 adapter.edit_message.assert_not_called() 120 # No delete attempt — just the fresh send. 121 assert consumer._message_id == "fresh_final" 122 123 @pytest.mark.asyncio 124 async def test_fresh_final_fallback_to_edit_on_send_failure(self): 125 """If the fresh send fails, fall back to the normal edit path.""" 126 adapter = _make_adapter() 127 adapter.send.side_effect = [ 128 SimpleNamespace(success=True, message_id="initial_preview"), 129 SimpleNamespace(success=False, error="network"), 130 ] 131 consumer = GatewayStreamConsumer( 132 adapter=adapter, 133 chat_id="chat", 134 config=StreamConsumerConfig(fresh_final_after_seconds=60.0), 135 ) 136 await consumer._send_or_edit("hello") 137 consumer._message_created_ts = 0.0 138 ok = await consumer._send_or_edit("hello world", finalize=True) 139 # Fresh send was attempted and failed → edit happened instead. 140 assert adapter.send.call_count == 2 141 adapter.edit_message.assert_called_once() 142 assert ok is True 143 144 @pytest.mark.asyncio 145 async def test_only_finalize_triggers_fresh_final(self): 146 """Intermediate edits (``finalize=False``) never switch to fresh send.""" 147 adapter = _make_adapter() 148 consumer = GatewayStreamConsumer( 149 adapter=adapter, 150 chat_id="chat", 151 config=StreamConsumerConfig(fresh_final_after_seconds=60.0), 152 ) 153 await consumer._send_or_edit("hello") 154 consumer._message_created_ts = 0.0 # stale 155 await consumer._send_or_edit("hello partial") # no finalize 156 assert adapter.send.call_count == 1 157 adapter.edit_message.assert_called_once() 158 159 @pytest.mark.asyncio 160 async def test_no_edit_sentinel_is_not_affected(self): 161 """Platforms with the ``__no_edit__`` sentinel never go fresh-final.""" 162 adapter = _make_adapter() 163 adapter.send.return_value = SimpleNamespace(success=True, message_id=None) 164 consumer = GatewayStreamConsumer( 165 adapter=adapter, 166 chat_id="chat", 167 config=StreamConsumerConfig(fresh_final_after_seconds=60.0), 168 ) 169 await consumer._send_or_edit("hello") 170 assert consumer._message_id == "__no_edit__" 171 assert consumer._message_created_ts is None 172 # Even with finalize=True, no fresh send — the sentinel gates it. 173 assert consumer._should_send_fresh_final() is False 174 175 176 class TestStreamConsumerConfigFreshFinalField: 177 """The dataclass field must exist and default to 0 (disabled).""" 178 179 def test_default_is_disabled(self): 180 cfg = StreamConsumerConfig() 181 assert cfg.fresh_final_after_seconds == 0.0 182 183 def test_field_is_configurable(self): 184 cfg = StreamConsumerConfig(fresh_final_after_seconds=120.0) 185 assert cfg.fresh_final_after_seconds == 120.0 186 187 188 class TestStreamingConfigFreshFinalField: 189 """The gateway-level StreamingConfig carries the setting.""" 190 191 def test_default_enables_with_60s(self): 192 from gateway.config import StreamingConfig 193 cfg = StreamingConfig() 194 assert cfg.fresh_final_after_seconds == 60.0 195 196 def test_from_dict_uses_default_when_missing(self): 197 from gateway.config import StreamingConfig 198 cfg = StreamingConfig.from_dict({"enabled": True}) 199 assert cfg.fresh_final_after_seconds == 60.0 200 201 def test_from_dict_respects_explicit_zero(self): 202 from gateway.config import StreamingConfig 203 cfg = StreamingConfig.from_dict({ 204 "enabled": True, 205 "fresh_final_after_seconds": 0, 206 }) 207 assert cfg.fresh_final_after_seconds == 0.0 208 209 def test_to_dict_round_trip(self): 210 from gateway.config import StreamingConfig 211 original = StreamingConfig(fresh_final_after_seconds=90.0) 212 restored = StreamingConfig.from_dict(original.to_dict()) 213 assert restored.fresh_final_after_seconds == 90.0 214 215 216 class TestTelegramAdapterDeleteMessage: 217 """Contract: Telegram adapter implements ``delete_message``.""" 218 219 def test_delete_message_method_exists(self): 220 telegram = pytest.importorskip("gateway.platforms.telegram") 221 import inspect 222 cls = telegram.TelegramAdapter 223 assert hasattr(cls, "delete_message"), ( 224 "TelegramAdapter.delete_message is required for the fresh-final " 225 "cleanup path (openclaw/openclaw#72038 port)." 226 ) 227 sig = inspect.signature(cls.delete_message) 228 params = list(sig.parameters) 229 assert params[:3] == ["self", "chat_id", "message_id"] 230 231 def test_base_adapter_default_returns_false(self): 232 """BasePlatformAdapter.delete_message default = no-op returning False.""" 233 from gateway.platforms.base import BasePlatformAdapter 234 import inspect 235 sig = inspect.signature(BasePlatformAdapter.delete_message) 236 assert list(sig.parameters)[:3] == ["self", "chat_id", "message_id"]