/ tests / gateway / test_platform_base.py
test_platform_base.py
  1  """Tests for gateway/platforms/base.py β€” MessageEvent, media extraction, message truncation."""
  2  
  3  import os
  4  from unittest.mock import patch
  5  
  6  import pytest
  7  
  8  from gateway.platforms.base import (
  9      BasePlatformAdapter,
 10      GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE,
 11      MessageEvent,
 12      MessageType,
 13      safe_url_for_log,
 14      utf16_len,
 15      _prefix_within_utf16_limit,
 16  )
 17  
 18  
 19  class TestSecretCaptureGuidance:
 20      def test_gateway_secret_capture_message_points_to_local_setup(self):
 21          message = GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE
 22          assert "local cli" in message.lower()
 23          assert "~/.hermes/.env" in message
 24  
 25  
 26  class TestSafeUrlForLog:
 27      def test_strips_query_fragment_and_userinfo(self):
 28          url = (
 29              "https://user:pass@example.com/private/path/image.png"
 30              "?X-Amz-Signature=supersecret&token=abc#frag"
 31          )
 32          result = safe_url_for_log(url)
 33          assert result == "https://example.com/.../image.png"
 34          assert "supersecret" not in result
 35          assert "token=abc" not in result
 36          assert "user:pass@" not in result
 37  
 38      def test_truncates_long_values(self):
 39          long_url = "https://example.com/" + ("a" * 300)
 40          result = safe_url_for_log(long_url, max_len=40)
 41          assert len(result) == 40
 42          assert result.endswith("...")
 43  
 44      def test_handles_small_and_non_positive_max_len(self):
 45          url = "https://example.com/very/long/path/file.png?token=secret"
 46          assert safe_url_for_log(url, max_len=3) == "..."
 47          assert safe_url_for_log(url, max_len=2) == ".."
 48          assert safe_url_for_log(url, max_len=0) == ""
 49  
 50  
 51  # ---------------------------------------------------------------------------
 52  # MessageEvent β€” command parsing
 53  # ---------------------------------------------------------------------------
 54  
 55  
 56  class TestMessageEventIsCommand:
 57      def test_slash_command(self):
 58          event = MessageEvent(text="/new")
 59          assert event.is_command() is True
 60  
 61      def test_regular_text(self):
 62          event = MessageEvent(text="hello world")
 63          assert event.is_command() is False
 64  
 65      def test_empty_text(self):
 66          event = MessageEvent(text="")
 67          assert event.is_command() is False
 68  
 69      def test_slash_only(self):
 70          event = MessageEvent(text="/")
 71          assert event.is_command() is True
 72  
 73  
 74  class TestMessageEventGetCommand:
 75      def test_simple_command(self):
 76          event = MessageEvent(text="/new")
 77          assert event.get_command() == "new"
 78  
 79      def test_command_with_args(self):
 80          event = MessageEvent(text="/reset session")
 81          assert event.get_command() == "reset"
 82  
 83      def test_not_a_command(self):
 84          event = MessageEvent(text="hello")
 85          assert event.get_command() is None
 86  
 87      def test_command_is_lowercased(self):
 88          event = MessageEvent(text="/HELP")
 89          assert event.get_command() == "help"
 90  
 91      def test_slash_only_returns_empty(self):
 92          event = MessageEvent(text="/")
 93          assert event.get_command() == ""
 94  
 95      def test_command_with_at_botname(self):
 96          event = MessageEvent(text="/new@TigerNanoBot")
 97          assert event.get_command() == "new"
 98  
 99      def test_command_with_at_botname_and_args(self):
100          event = MessageEvent(text="/compress@TigerNanoBot")
101          assert event.get_command() == "compress"
102  
103      def test_command_mixed_case_with_at_botname(self):
104          event = MessageEvent(text="/RESET@TigerNanoBot")
105          assert event.get_command() == "reset"
106  
107  
108  class TestMessageEventGetCommandArgs:
109      def test_command_with_args(self):
110          event = MessageEvent(text="/new session id 123")
111          assert event.get_command_args() == "session id 123"
112  
113      def test_command_without_args(self):
114          event = MessageEvent(text="/new")
115          assert event.get_command_args() == ""
116  
117      def test_not_a_command_returns_full_text(self):
118          event = MessageEvent(text="hello world")
119          assert event.get_command_args() == "hello world"
120  
121  
122  # ---------------------------------------------------------------------------
123  # extract_images
124  # ---------------------------------------------------------------------------
125  
126  
127  class TestExtractImages:
128      def test_no_images(self):
129          images, cleaned = BasePlatformAdapter.extract_images("Just regular text.")
130          assert images == []
131          assert cleaned == "Just regular text."
132  
133      def test_markdown_image_with_image_ext(self):
134          content = "Here is a photo: ![cat](https://example.com/cat.png)"
135          images, cleaned = BasePlatformAdapter.extract_images(content)
136          assert len(images) == 1
137          assert images[0][0] == "https://example.com/cat.png"
138          assert images[0][1] == "cat"
139          assert "![cat]" not in cleaned
140  
141      def test_markdown_image_jpg(self):
142          content = "![photo](https://example.com/photo.jpg)"
143          images, _ = BasePlatformAdapter.extract_images(content)
144          assert len(images) == 1
145          assert images[0][0] == "https://example.com/photo.jpg"
146          assert images[0][1] == "photo"
147  
148      def test_markdown_image_jpeg(self):
149          content = "![](https://example.com/photo.jpeg)"
150          images, _ = BasePlatformAdapter.extract_images(content)
151          assert len(images) == 1
152          assert images[0][0] == "https://example.com/photo.jpeg"
153          assert images[0][1] == ""
154  
155      def test_markdown_image_gif(self):
156          content = "![anim](https://example.com/anim.gif)"
157          images, _ = BasePlatformAdapter.extract_images(content)
158          assert len(images) == 1
159          assert images[0][0] == "https://example.com/anim.gif"
160          assert images[0][1] == "anim"
161  
162      def test_markdown_image_webp(self):
163          content = "![](https://example.com/img.webp)"
164          images, _ = BasePlatformAdapter.extract_images(content)
165          assert len(images) == 1
166          assert images[0][0] == "https://example.com/img.webp"
167          assert images[0][1] == ""
168  
169      def test_fal_media_cdn(self):
170          content = "![gen](https://fal.media/files/abc123/output.png)"
171          images, _ = BasePlatformAdapter.extract_images(content)
172          assert len(images) == 1
173          assert images[0][0] == "https://fal.media/files/abc123/output.png"
174          assert images[0][1] == "gen"
175  
176      def test_fal_cdn_url(self):
177          content = "![](https://fal-cdn.example.com/result)"
178          images, _ = BasePlatformAdapter.extract_images(content)
179          assert len(images) == 1
180          assert images[0][0] == "https://fal-cdn.example.com/result"
181          assert images[0][1] == ""
182  
183      def test_replicate_delivery(self):
184          content = "![](https://replicate.delivery/pbxt/abc/output)"
185          images, _ = BasePlatformAdapter.extract_images(content)
186          assert len(images) == 1
187          assert images[0][0] == "https://replicate.delivery/pbxt/abc/output"
188          assert images[0][1] == ""
189  
190      def test_non_image_ext_not_extracted(self):
191          """Markdown image with non-image extension should not be extracted."""
192          content = "![doc](https://example.com/report.pdf)"
193          images, cleaned = BasePlatformAdapter.extract_images(content)
194          assert images == []
195          assert "![doc]" in cleaned  # Should be preserved
196  
197      def test_html_img_tag(self):
198          content = 'Check this: <img src="https://example.com/photo.png">'
199          images, cleaned = BasePlatformAdapter.extract_images(content)
200          assert len(images) == 1
201          assert images[0][0] == "https://example.com/photo.png"
202          assert images[0][1] == ""  # HTML images have no alt text
203          assert "<img" not in cleaned
204  
205      def test_html_img_self_closing(self):
206          content = '<img src="https://example.com/photo.png"/>'
207          images, _ = BasePlatformAdapter.extract_images(content)
208          assert len(images) == 1
209          assert images[0][0] == "https://example.com/photo.png"
210          assert images[0][1] == ""
211  
212      def test_html_img_with_closing_tag(self):
213          content = '<img src="https://example.com/photo.png"></img>'
214          images, _ = BasePlatformAdapter.extract_images(content)
215          assert len(images) == 1
216          assert images[0][0] == "https://example.com/photo.png"
217          assert images[0][1] == ""
218  
219      def test_multiple_images(self):
220          content = "![a](https://example.com/a.png)\n![b](https://example.com/b.jpg)"
221          images, cleaned = BasePlatformAdapter.extract_images(content)
222          assert len(images) == 2
223          assert "![a]" not in cleaned
224          assert "![b]" not in cleaned
225  
226      def test_mixed_markdown_and_html(self):
227          content = '![cat](https://example.com/cat.png)\n<img src="https://example.com/dog.jpg">'
228          images, _ = BasePlatformAdapter.extract_images(content)
229          assert len(images) == 2
230  
231      def test_cleaned_content_trims_excess_newlines(self):
232          content = "Before\n\n![img](https://example.com/img.png)\n\n\n\nAfter"
233          _, cleaned = BasePlatformAdapter.extract_images(content)
234          assert "\n\n\n" not in cleaned
235  
236      def test_non_http_url_not_matched(self):
237          content = "![file](file:///local/path.png)"
238          images, _ = BasePlatformAdapter.extract_images(content)
239          assert images == []
240  
241      def test_non_image_link_preserved_when_mixed_with_images(self):
242          """Regression: non-image markdown links must not be silently removed
243          when the response also contains real images."""
244          content = (
245              "Here is the image: ![photo](https://fal.media/cat.png)\n"
246              "And a doc: ![report](https://example.com/report.pdf)"
247          )
248          images, cleaned = BasePlatformAdapter.extract_images(content)
249          assert len(images) == 1
250          assert images[0][0] == "https://fal.media/cat.png"
251          # The PDF link must survive in cleaned content
252          assert "![report](https://example.com/report.pdf)" in cleaned
253  
254  
255  # ---------------------------------------------------------------------------
256  # extract_media
257  # ---------------------------------------------------------------------------
258  
259  
260  class TestExtractMedia:
261      def test_no_media(self):
262          media, cleaned = BasePlatformAdapter.extract_media("Just text.")
263          assert media == []
264          assert cleaned == "Just text."
265  
266      def test_single_media_tag(self):
267          content = "MEDIA:/path/to/audio.ogg"
268          media, cleaned = BasePlatformAdapter.extract_media(content)
269          assert len(media) == 1
270          assert media[0][0] == "/path/to/audio.ogg"
271          assert media[0][1] is False  # no voice tag
272  
273      def test_media_with_voice_directive(self):
274          content = "[[audio_as_voice]]\nMEDIA:/path/to/voice.ogg"
275          media, cleaned = BasePlatformAdapter.extract_media(content)
276          assert len(media) == 1
277          assert media[0][0] == "/path/to/voice.ogg"
278          assert media[0][1] is True  # voice tag present
279  
280      def test_multiple_media_tags(self):
281          content = "MEDIA:/a.ogg\nMEDIA:/b.ogg"
282          media, _ = BasePlatformAdapter.extract_media(content)
283          assert len(media) == 2
284  
285      def test_voice_directive_removed_from_content(self):
286          content = "[[audio_as_voice]]\nSome text\nMEDIA:/voice.ogg"
287          _, cleaned = BasePlatformAdapter.extract_media(content)
288          assert "[[audio_as_voice]]" not in cleaned
289          assert "MEDIA:" not in cleaned
290          assert "Some text" in cleaned
291  
292      def test_media_with_text_before(self):
293          content = "Here is your audio:\nMEDIA:/output.ogg"
294          media, cleaned = BasePlatformAdapter.extract_media(content)
295          assert len(media) == 1
296          assert "Here is your audio" in cleaned
297  
298      def test_cleaned_content_trims_excess_newlines(self):
299          content = "Before\n\nMEDIA:/audio.ogg\n\n\n\nAfter"
300          _, cleaned = BasePlatformAdapter.extract_media(content)
301          assert "\n\n\n" not in cleaned
302  
303      def test_media_tag_allows_optional_whitespace_after_colon(self):
304          content = "MEDIA: /path/to/audio.ogg"
305          media, cleaned = BasePlatformAdapter.extract_media(content)
306          assert media == [("/path/to/audio.ogg", False)]
307          assert cleaned == ""
308  
309      def test_media_tag_strips_wrapping_quotes_and_backticks(self):
310          content = "MEDIA: `/path/to/file.png`\nMEDIA:\"/path/to/file2.png\"\nMEDIA:'/path/to/file3.png'"
311          media, cleaned = BasePlatformAdapter.extract_media(content)
312          assert media == [
313              ("/path/to/file.png", False),
314              ("/path/to/file2.png", False),
315              ("/path/to/file3.png", False),
316          ]
317          assert cleaned == ""
318  
319      def test_media_tag_supports_quoted_paths_with_spaces(self):
320          content = "Here\nMEDIA: '/tmp/my image.png'\nAfter"
321          media, cleaned = BasePlatformAdapter.extract_media(content)
322          assert media == [("/tmp/my image.png", False)]
323          assert "Here" in cleaned
324          assert "After" in cleaned
325  
326      def test_media_tag_supports_unquoted_flac_paths_with_spaces(self):
327          content = "MEDIA:/tmp/Jane Doe/speech.flac"
328          media, cleaned = BasePlatformAdapter.extract_media(content)
329          assert media == [("/tmp/Jane Doe/speech.flac", False)]
330          assert cleaned == ""
331  
332  
333  # ---------------------------------------------------------------------------
334  # should_send_media_as_audio
335  # ---------------------------------------------------------------------------
336  
337  class TestShouldSendMediaAsAudio:
338      """Audio-routing policy shared by gateway + scheduler + send_message."""
339  
340      def test_unknown_extension_returns_false(self):
341          from gateway.platforms.base import should_send_media_as_audio
342          assert should_send_media_as_audio(None, ".png") is False
343          assert should_send_media_as_audio("telegram", ".pdf") is False
344  
345      def test_non_telegram_platforms_route_all_audio(self):
346          from gateway.platforms.base import should_send_media_as_audio
347          for ext in (".mp3", ".m4a", ".wav", ".flac", ".ogg", ".opus"):
348              assert should_send_media_as_audio("discord", ext) is True
349              assert should_send_media_as_audio("slack", ext) is True
350  
351      def test_telegram_mp3_and_m4a_route_to_audio(self):
352          from gateway.platforms.base import should_send_media_as_audio
353          assert should_send_media_as_audio("telegram", ".mp3") is True
354          assert should_send_media_as_audio("telegram", ".m4a") is True
355  
356      def test_telegram_wav_and_flac_fall_through_to_document(self):
357          from gateway.platforms.base import should_send_media_as_audio
358          assert should_send_media_as_audio("telegram", ".wav") is False
359          assert should_send_media_as_audio("telegram", ".flac") is False
360  
361      def test_telegram_ogg_opus_only_when_voice_flagged(self):
362          from gateway.platforms.base import should_send_media_as_audio
363          assert should_send_media_as_audio("telegram", ".ogg", is_voice=True) is True
364          assert should_send_media_as_audio("telegram", ".opus", is_voice=True) is True
365          assert should_send_media_as_audio("telegram", ".ogg") is False
366          assert should_send_media_as_audio("telegram", ".opus") is False
367  
368      def test_accepts_platform_enum(self):
369          from gateway.config import Platform
370          from gateway.platforms.base import should_send_media_as_audio
371          assert should_send_media_as_audio(Platform.TELEGRAM, ".mp3") is True
372          assert should_send_media_as_audio(Platform.TELEGRAM, ".flac") is False
373          assert should_send_media_as_audio(Platform.DISCORD, ".flac") is True
374  
375  
376  # ---------------------------------------------------------------------------
377  # truncate_message
378  # ---------------------------------------------------------------------------
379  
380  
381  class TestTruncateMessage:
382      def _adapter(self):
383          """Create a minimal adapter instance for testing static/instance methods."""
384  
385          class StubAdapter(BasePlatformAdapter):
386              async def connect(self):
387                  return True
388  
389              async def disconnect(self):
390                  pass
391  
392              async def send(self, *a, **kw):
393                  pass
394  
395              async def get_chat_info(self, *a):
396                  return {}
397  
398          from gateway.config import Platform, PlatformConfig
399  
400          config = PlatformConfig(enabled=True, token="test")
401          return StubAdapter(config=config, platform=Platform.TELEGRAM)
402  
403      def test_short_message_single_chunk(self):
404          adapter = self._adapter()
405          chunks = adapter.truncate_message("Hello world", max_length=100)
406          assert chunks == ["Hello world"]
407  
408      def test_exact_length_single_chunk(self):
409          adapter = self._adapter()
410          msg = "x" * 100
411          chunks = adapter.truncate_message(msg, max_length=100)
412          assert chunks == [msg]
413  
414      def test_long_message_splits(self):
415          adapter = self._adapter()
416          msg = "word " * 200  # ~1000 chars
417          chunks = adapter.truncate_message(msg, max_length=200)
418          assert len(chunks) > 1
419          # Verify all original content is preserved across chunks
420          reassembled = "".join(chunks)
421          # Strip chunk indicators like (1/N) to get raw content
422          for word in msg.strip().split():
423              assert word in reassembled, f"Word '{word}' lost during truncation"
424  
425      def test_chunks_have_indicators(self):
426          adapter = self._adapter()
427          msg = "word " * 200
428          chunks = adapter.truncate_message(msg, max_length=200)
429          assert "(1/" in chunks[0]
430          assert f"({len(chunks)}/{len(chunks)})" in chunks[-1]
431  
432      def test_code_block_first_chunk_closed(self):
433          adapter = self._adapter()
434          msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter"
435          chunks = adapter.truncate_message(msg, max_length=300)
436          assert len(chunks) > 1
437          # First chunk must have a closing fence appended (code block was split)
438          first_fences = chunks[0].count("```")
439          assert first_fences == 2, "First chunk should have opening + closing fence"
440  
441      def test_code_block_language_tag_carried(self):
442          adapter = self._adapter()
443          msg = "Start\n```javascript\n" + "console.log('x');\n" * 80 + "```\nEnd"
444          chunks = adapter.truncate_message(msg, max_length=300)
445          if len(chunks) > 1:
446              # At least one continuation chunk should reopen with ```javascript
447              reopened_with_lang = any("```javascript" in chunk for chunk in chunks[1:])
448              assert reopened_with_lang, (
449                  "No continuation chunk reopened with language tag"
450              )
451  
452      def test_continuation_chunks_have_balanced_fences(self):
453          """Regression: continuation chunks must close reopened code blocks."""
454          adapter = self._adapter()
455          msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter"
456          chunks = adapter.truncate_message(msg, max_length=300)
457          assert len(chunks) > 1
458          for i, chunk in enumerate(chunks):
459              fence_count = chunk.count("```")
460              assert fence_count % 2 == 0, (
461                  f"Chunk {i} has unbalanced fences ({fence_count})"
462              )
463  
464      def test_each_chunk_under_max_length(self):
465          adapter = self._adapter()
466          msg = "word " * 500
467          max_len = 200
468          chunks = adapter.truncate_message(msg, max_length=max_len)
469          for i, chunk in enumerate(chunks):
470              assert len(chunk) <= max_len + 20, (
471                  f"Chunk {i} too long: {len(chunk)} > {max_len}"
472              )
473  
474  
475  # ---------------------------------------------------------------------------
476  # _get_human_delay
477  # ---------------------------------------------------------------------------
478  
479  
480  class TestGetHumanDelay:
481      def test_off_mode(self):
482          with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "off"}):
483              assert BasePlatformAdapter._get_human_delay() == 0.0
484  
485      def test_default_is_off(self):
486          with patch.dict(os.environ, {}, clear=False):
487              os.environ.pop("HERMES_HUMAN_DELAY_MODE", None)
488              assert BasePlatformAdapter._get_human_delay() == 0.0
489  
490      def test_natural_mode_range(self):
491          with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "natural"}):
492              delay = BasePlatformAdapter._get_human_delay()
493              assert 0.8 <= delay <= 2.5
494  
495      def test_custom_mode_uses_env_vars(self):
496          env = {
497              "HERMES_HUMAN_DELAY_MODE": "custom",
498              "HERMES_HUMAN_DELAY_MIN_MS": "100",
499              "HERMES_HUMAN_DELAY_MAX_MS": "200",
500          }
501          with patch.dict(os.environ, env):
502              delay = BasePlatformAdapter._get_human_delay()
503              assert 0.1 <= delay <= 0.2
504  
505  
506  # ---------------------------------------------------------------------------
507  # utf16_len / _prefix_within_utf16_limit / truncate_message with len_fn
508  # ---------------------------------------------------------------------------
509  # Ported from nearai/ironclaw#2304 β€” Telegram counts message length in UTF-16
510  # code units, not Unicode code-points.  Astral-plane characters (emoji, CJK
511  # Extension B) are surrogate pairs: 1 Python char but 2 UTF-16 units.
512  
513  
514  class TestUtf16Len:
515      """Verify the UTF-16 length helper."""
516  
517      def test_ascii(self):
518          assert utf16_len("hello") == 5
519  
520      def test_bmp_cjk(self):
521          # CJK ideographs in the BMP are 1 code unit each
522          assert utf16_len("δ½ ε₯½") == 2
523  
524      def test_emoji_surrogate_pair(self):
525          # πŸ˜€ (U+1F600) is outside BMP β†’ 2 UTF-16 code units
526          assert utf16_len("πŸ˜€") == 2
527  
528      def test_mixed(self):
529          # "hiπŸ˜€" = 2 + 2 = 4 UTF-16 units
530          assert utf16_len("hiπŸ˜€") == 4
531  
532      def test_musical_symbol(self):
533          # π„ž (U+1D11E) β€” Musical Symbol G Clef, surrogate pair
534          assert utf16_len("π„ž") == 2
535  
536      def test_empty(self):
537          assert utf16_len("") == 0
538  
539  
540  class TestPrefixWithinUtf16Limit:
541      """Verify UTF-16-aware prefix truncation."""
542  
543      def test_fits_entirely(self):
544          assert _prefix_within_utf16_limit("hello", 10) == "hello"
545  
546      def test_ascii_truncation(self):
547          result = _prefix_within_utf16_limit("hello world", 5)
548          assert result == "hello"
549          assert utf16_len(result) <= 5
550  
551      def test_does_not_split_surrogate_pair(self):
552          # "aπŸ˜€b" = 1 + 2 + 1 = 4 UTF-16 units; limit 2 should give "a"
553          result = _prefix_within_utf16_limit("aπŸ˜€b", 2)
554          assert result == "a"
555          assert utf16_len(result) <= 2
556  
557      def test_emoji_at_limit(self):
558          # "πŸ˜€" = 2 UTF-16 units; limit 2 should include it
559          result = _prefix_within_utf16_limit("πŸ˜€x", 2)
560          assert result == "πŸ˜€"
561  
562      def test_all_emoji(self):
563          msg = "πŸ˜€" * 10  # 20 UTF-16 units
564          result = _prefix_within_utf16_limit(msg, 6)
565          assert result == "πŸ˜€πŸ˜€πŸ˜€"
566          assert utf16_len(result) == 6
567  
568      def test_empty(self):
569          assert _prefix_within_utf16_limit("", 5) == ""
570  
571  
572  class TestTruncateMessageUtf16:
573      """Verify truncate_message respects UTF-16 lengths when len_fn=utf16_len."""
574  
575      def test_short_emoji_message_no_split(self):
576          """A short message under the UTF-16 limit should not be split."""
577          msg = "Hello πŸ˜€ world"
578          chunks = BasePlatformAdapter.truncate_message(msg, 4096, len_fn=utf16_len)
579          assert len(chunks) == 1
580          assert chunks[0] == msg
581  
582      def test_emoji_near_limit_triggers_split(self):
583          """A message at 4096 codepoints but >4096 UTF-16 units must split."""
584          # 2049 emoji = 2049 codepoints but 4098 UTF-16 units β†’ exceeds 4096
585          msg = "πŸ˜€" * 2049
586          assert len(msg) == 2049  # Python len sees 2049 chars
587          assert utf16_len(msg) == 4098  # but it's 4098 UTF-16 units
588  
589          # Without UTF-16 awareness, this would NOT split (2049 < 4096)
590          chunks_naive = BasePlatformAdapter.truncate_message(msg, 4096)
591          assert len(chunks_naive) == 1, "Without len_fn, no split expected"
592  
593          # With UTF-16 awareness, it MUST split
594          chunks = BasePlatformAdapter.truncate_message(msg, 4096, len_fn=utf16_len)
595          assert len(chunks) > 1, "With utf16_len, message should be split"
596  
597          # Each chunk must fit within the UTF-16 limit
598          for i, chunk in enumerate(chunks):
599              assert utf16_len(chunk) <= 4096, (
600                  f"Chunk {i} exceeds 4096 UTF-16 units: {utf16_len(chunk)}"
601              )
602  
603      def test_each_utf16_chunk_within_limit(self):
604          """All chunks produced with utf16_len must fit the limit."""
605          # Mix of BMP and astral-plane characters
606          msg = ("Hello πŸ˜€ world 🎡 test π„ž " * 200).strip()
607          max_len = 200
608          chunks = BasePlatformAdapter.truncate_message(msg, max_len, len_fn=utf16_len)
609          for i, chunk in enumerate(chunks):
610              u16_len = utf16_len(chunk)
611              assert u16_len <= max_len + 20, (
612                  f"Chunk {i} UTF-16 length {u16_len} exceeds {max_len}"
613              )
614  
615      def test_all_content_preserved(self):
616          """Splitting with utf16_len must not lose content."""
617          words = ["emojiπŸ˜€", "music🎡", "cjkδ½ ε₯½", "plain"] * 100
618          msg = " ".join(words)
619          chunks = BasePlatformAdapter.truncate_message(msg, 200, len_fn=utf16_len)
620          reassembled = " ".join(chunks)
621          for word in words:
622              assert word in reassembled, f"Word '{word}' lost during UTF-16 split"
623  
624      def test_code_blocks_preserved_with_utf16(self):
625          """Code block fence handling should work with utf16_len too."""
626          msg = "Before\n```python\n" + "x = 'πŸ˜€'\n" * 200 + "```\nAfter"
627          chunks = BasePlatformAdapter.truncate_message(msg, 300, len_fn=utf16_len)
628          assert len(chunks) > 1
629          # Each chunk should have balanced fences
630          for i, chunk in enumerate(chunks):
631              fence_count = chunk.count("```")
632              assert fence_count % 2 == 0, (
633                  f"Chunk {i} has unbalanced fences ({fence_count})"
634              )
635  
636  
637  class TestProxyKwargsForAiohttp:
638      """Verify proxy_kwargs_for_aiohttp routes all schemes through ProxyConnector."""
639  
640      def test_none_returns_empty(self):
641          from gateway.platforms.base import proxy_kwargs_for_aiohttp
642  
643          sess_kw, req_kw = proxy_kwargs_for_aiohttp(None)
644          assert sess_kw == {}
645          assert req_kw == {}
646  
647      def test_http_proxy_uses_connector_when_aiohttp_socks_available(self):
648          pytest.importorskip("aiohttp_socks")
649          from unittest.mock import MagicMock
650          from gateway.platforms.base import proxy_kwargs_for_aiohttp
651  
652          sentinel = MagicMock(name="ProxyConnector")
653          with patch("aiohttp_socks.ProxyConnector.from_url", return_value=sentinel):
654              sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080")
655          assert sess_kw.get("connector") is sentinel, (
656              "HTTP proxy must use ProxyConnector so libraries that don't "
657              "forward per-request proxy= kwargs still route through the proxy"
658          )
659          assert req_kw == {}
660  
661      def test_socks_proxy_uses_connector(self):
662          pytest.importorskip("aiohttp_socks")
663          from unittest.mock import MagicMock
664          from gateway.platforms.base import proxy_kwargs_for_aiohttp
665  
666          sentinel = MagicMock(name="ProxyConnector")
667          with patch("aiohttp_socks.ProxyConnector.from_url", return_value=sentinel):
668              sess_kw, req_kw = proxy_kwargs_for_aiohttp("socks5://proxy:1080")
669          assert sess_kw.get("connector") is sentinel
670          assert req_kw == {}
671  
672      def test_http_proxy_falls_back_without_aiohttp_socks(self):
673          from gateway.platforms.base import proxy_kwargs_for_aiohttp
674  
675          with patch.dict("sys.modules", {"aiohttp_socks": None}):
676              sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080")
677              assert sess_kw == {}
678              assert req_kw == {"proxy": "http://proxy:8080"}
679