test_platform_base.py
1 """Tests for gateway/platforms/base.py β MessageEvent, media extraction, message truncation.""" 2 3 import os 4 from unittest.mock import patch 5 6 import pytest 7 8 from gateway.platforms.base import ( 9 BasePlatformAdapter, 10 GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE, 11 MessageEvent, 12 MessageType, 13 safe_url_for_log, 14 utf16_len, 15 _prefix_within_utf16_limit, 16 ) 17 18 19 class TestSecretCaptureGuidance: 20 def test_gateway_secret_capture_message_points_to_local_setup(self): 21 message = GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE 22 assert "local cli" in message.lower() 23 assert "~/.hermes/.env" in message 24 25 26 class TestSafeUrlForLog: 27 def test_strips_query_fragment_and_userinfo(self): 28 url = ( 29 "https://user:pass@example.com/private/path/image.png" 30 "?X-Amz-Signature=supersecret&token=abc#frag" 31 ) 32 result = safe_url_for_log(url) 33 assert result == "https://example.com/.../image.png" 34 assert "supersecret" not in result 35 assert "token=abc" not in result 36 assert "user:pass@" not in result 37 38 def test_truncates_long_values(self): 39 long_url = "https://example.com/" + ("a" * 300) 40 result = safe_url_for_log(long_url, max_len=40) 41 assert len(result) == 40 42 assert result.endswith("...") 43 44 def test_handles_small_and_non_positive_max_len(self): 45 url = "https://example.com/very/long/path/file.png?token=secret" 46 assert safe_url_for_log(url, max_len=3) == "..." 47 assert safe_url_for_log(url, max_len=2) == ".." 48 assert safe_url_for_log(url, max_len=0) == "" 49 50 51 # --------------------------------------------------------------------------- 52 # MessageEvent β command parsing 53 # --------------------------------------------------------------------------- 54 55 56 class TestMessageEventIsCommand: 57 def test_slash_command(self): 58 event = MessageEvent(text="/new") 59 assert event.is_command() is True 60 61 def test_regular_text(self): 62 event = MessageEvent(text="hello world") 63 assert event.is_command() is False 64 65 def test_empty_text(self): 66 event = MessageEvent(text="") 67 assert event.is_command() is False 68 69 def test_slash_only(self): 70 event = MessageEvent(text="/") 71 assert event.is_command() is True 72 73 74 class TestMessageEventGetCommand: 75 def test_simple_command(self): 76 event = MessageEvent(text="/new") 77 assert event.get_command() == "new" 78 79 def test_command_with_args(self): 80 event = MessageEvent(text="/reset session") 81 assert event.get_command() == "reset" 82 83 def test_not_a_command(self): 84 event = MessageEvent(text="hello") 85 assert event.get_command() is None 86 87 def test_command_is_lowercased(self): 88 event = MessageEvent(text="/HELP") 89 assert event.get_command() == "help" 90 91 def test_slash_only_returns_empty(self): 92 event = MessageEvent(text="/") 93 assert event.get_command() == "" 94 95 def test_command_with_at_botname(self): 96 event = MessageEvent(text="/new@TigerNanoBot") 97 assert event.get_command() == "new" 98 99 def test_command_with_at_botname_and_args(self): 100 event = MessageEvent(text="/compress@TigerNanoBot") 101 assert event.get_command() == "compress" 102 103 def test_command_mixed_case_with_at_botname(self): 104 event = MessageEvent(text="/RESET@TigerNanoBot") 105 assert event.get_command() == "reset" 106 107 108 class TestMessageEventGetCommandArgs: 109 def test_command_with_args(self): 110 event = MessageEvent(text="/new session id 123") 111 assert event.get_command_args() == "session id 123" 112 113 def test_command_without_args(self): 114 event = MessageEvent(text="/new") 115 assert event.get_command_args() == "" 116 117 def test_not_a_command_returns_full_text(self): 118 event = MessageEvent(text="hello world") 119 assert event.get_command_args() == "hello world" 120 121 122 # --------------------------------------------------------------------------- 123 # extract_images 124 # --------------------------------------------------------------------------- 125 126 127 class TestExtractImages: 128 def test_no_images(self): 129 images, cleaned = BasePlatformAdapter.extract_images("Just regular text.") 130 assert images == [] 131 assert cleaned == "Just regular text." 132 133 def test_markdown_image_with_image_ext(self): 134 content = "Here is a photo: " 135 images, cleaned = BasePlatformAdapter.extract_images(content) 136 assert len(images) == 1 137 assert images[0][0] == "https://example.com/cat.png" 138 assert images[0][1] == "cat" 139 assert "![cat]" not in cleaned 140 141 def test_markdown_image_jpg(self): 142 content = "" 143 images, _ = BasePlatformAdapter.extract_images(content) 144 assert len(images) == 1 145 assert images[0][0] == "https://example.com/photo.jpg" 146 assert images[0][1] == "photo" 147 148 def test_markdown_image_jpeg(self): 149 content = "" 150 images, _ = BasePlatformAdapter.extract_images(content) 151 assert len(images) == 1 152 assert images[0][0] == "https://example.com/photo.jpeg" 153 assert images[0][1] == "" 154 155 def test_markdown_image_gif(self): 156 content = "" 157 images, _ = BasePlatformAdapter.extract_images(content) 158 assert len(images) == 1 159 assert images[0][0] == "https://example.com/anim.gif" 160 assert images[0][1] == "anim" 161 162 def test_markdown_image_webp(self): 163 content = "" 164 images, _ = BasePlatformAdapter.extract_images(content) 165 assert len(images) == 1 166 assert images[0][0] == "https://example.com/img.webp" 167 assert images[0][1] == "" 168 169 def test_fal_media_cdn(self): 170 content = "" 171 images, _ = BasePlatformAdapter.extract_images(content) 172 assert len(images) == 1 173 assert images[0][0] == "https://fal.media/files/abc123/output.png" 174 assert images[0][1] == "gen" 175 176 def test_fal_cdn_url(self): 177 content = "" 178 images, _ = BasePlatformAdapter.extract_images(content) 179 assert len(images) == 1 180 assert images[0][0] == "https://fal-cdn.example.com/result" 181 assert images[0][1] == "" 182 183 def test_replicate_delivery(self): 184 content = "" 185 images, _ = BasePlatformAdapter.extract_images(content) 186 assert len(images) == 1 187 assert images[0][0] == "https://replicate.delivery/pbxt/abc/output" 188 assert images[0][1] == "" 189 190 def test_non_image_ext_not_extracted(self): 191 """Markdown image with non-image extension should not be extracted.""" 192 content = "" 193 images, cleaned = BasePlatformAdapter.extract_images(content) 194 assert images == [] 195 assert "![doc]" in cleaned # Should be preserved 196 197 def test_html_img_tag(self): 198 content = 'Check this: <img src="https://example.com/photo.png">' 199 images, cleaned = BasePlatformAdapter.extract_images(content) 200 assert len(images) == 1 201 assert images[0][0] == "https://example.com/photo.png" 202 assert images[0][1] == "" # HTML images have no alt text 203 assert "<img" not in cleaned 204 205 def test_html_img_self_closing(self): 206 content = '<img src="https://example.com/photo.png"/>' 207 images, _ = BasePlatformAdapter.extract_images(content) 208 assert len(images) == 1 209 assert images[0][0] == "https://example.com/photo.png" 210 assert images[0][1] == "" 211 212 def test_html_img_with_closing_tag(self): 213 content = '<img src="https://example.com/photo.png"></img>' 214 images, _ = BasePlatformAdapter.extract_images(content) 215 assert len(images) == 1 216 assert images[0][0] == "https://example.com/photo.png" 217 assert images[0][1] == "" 218 219 def test_multiple_images(self): 220 content = "\n" 221 images, cleaned = BasePlatformAdapter.extract_images(content) 222 assert len(images) == 2 223 assert "![a]" not in cleaned 224 assert "![b]" not in cleaned 225 226 def test_mixed_markdown_and_html(self): 227 content = '\n<img src="https://example.com/dog.jpg">' 228 images, _ = BasePlatformAdapter.extract_images(content) 229 assert len(images) == 2 230 231 def test_cleaned_content_trims_excess_newlines(self): 232 content = "Before\n\n\n\n\n\nAfter" 233 _, cleaned = BasePlatformAdapter.extract_images(content) 234 assert "\n\n\n" not in cleaned 235 236 def test_non_http_url_not_matched(self): 237 content = "" 238 images, _ = BasePlatformAdapter.extract_images(content) 239 assert images == [] 240 241 def test_non_image_link_preserved_when_mixed_with_images(self): 242 """Regression: non-image markdown links must not be silently removed 243 when the response also contains real images.""" 244 content = ( 245 "Here is the image: \n" 246 "And a doc: " 247 ) 248 images, cleaned = BasePlatformAdapter.extract_images(content) 249 assert len(images) == 1 250 assert images[0][0] == "https://fal.media/cat.png" 251 # The PDF link must survive in cleaned content 252 assert "" in cleaned 253 254 255 # --------------------------------------------------------------------------- 256 # extract_media 257 # --------------------------------------------------------------------------- 258 259 260 class TestExtractMedia: 261 def test_no_media(self): 262 media, cleaned = BasePlatformAdapter.extract_media("Just text.") 263 assert media == [] 264 assert cleaned == "Just text." 265 266 def test_single_media_tag(self): 267 content = "MEDIA:/path/to/audio.ogg" 268 media, cleaned = BasePlatformAdapter.extract_media(content) 269 assert len(media) == 1 270 assert media[0][0] == "/path/to/audio.ogg" 271 assert media[0][1] is False # no voice tag 272 273 def test_media_with_voice_directive(self): 274 content = "[[audio_as_voice]]\nMEDIA:/path/to/voice.ogg" 275 media, cleaned = BasePlatformAdapter.extract_media(content) 276 assert len(media) == 1 277 assert media[0][0] == "/path/to/voice.ogg" 278 assert media[0][1] is True # voice tag present 279 280 def test_multiple_media_tags(self): 281 content = "MEDIA:/a.ogg\nMEDIA:/b.ogg" 282 media, _ = BasePlatformAdapter.extract_media(content) 283 assert len(media) == 2 284 285 def test_voice_directive_removed_from_content(self): 286 content = "[[audio_as_voice]]\nSome text\nMEDIA:/voice.ogg" 287 _, cleaned = BasePlatformAdapter.extract_media(content) 288 assert "[[audio_as_voice]]" not in cleaned 289 assert "MEDIA:" not in cleaned 290 assert "Some text" in cleaned 291 292 def test_media_with_text_before(self): 293 content = "Here is your audio:\nMEDIA:/output.ogg" 294 media, cleaned = BasePlatformAdapter.extract_media(content) 295 assert len(media) == 1 296 assert "Here is your audio" in cleaned 297 298 def test_cleaned_content_trims_excess_newlines(self): 299 content = "Before\n\nMEDIA:/audio.ogg\n\n\n\nAfter" 300 _, cleaned = BasePlatformAdapter.extract_media(content) 301 assert "\n\n\n" not in cleaned 302 303 def test_media_tag_allows_optional_whitespace_after_colon(self): 304 content = "MEDIA: /path/to/audio.ogg" 305 media, cleaned = BasePlatformAdapter.extract_media(content) 306 assert media == [("/path/to/audio.ogg", False)] 307 assert cleaned == "" 308 309 def test_media_tag_strips_wrapping_quotes_and_backticks(self): 310 content = "MEDIA: `/path/to/file.png`\nMEDIA:\"/path/to/file2.png\"\nMEDIA:'/path/to/file3.png'" 311 media, cleaned = BasePlatformAdapter.extract_media(content) 312 assert media == [ 313 ("/path/to/file.png", False), 314 ("/path/to/file2.png", False), 315 ("/path/to/file3.png", False), 316 ] 317 assert cleaned == "" 318 319 def test_media_tag_supports_quoted_paths_with_spaces(self): 320 content = "Here\nMEDIA: '/tmp/my image.png'\nAfter" 321 media, cleaned = BasePlatformAdapter.extract_media(content) 322 assert media == [("/tmp/my image.png", False)] 323 assert "Here" in cleaned 324 assert "After" in cleaned 325 326 def test_media_tag_supports_unquoted_flac_paths_with_spaces(self): 327 content = "MEDIA:/tmp/Jane Doe/speech.flac" 328 media, cleaned = BasePlatformAdapter.extract_media(content) 329 assert media == [("/tmp/Jane Doe/speech.flac", False)] 330 assert cleaned == "" 331 332 333 # --------------------------------------------------------------------------- 334 # should_send_media_as_audio 335 # --------------------------------------------------------------------------- 336 337 class TestShouldSendMediaAsAudio: 338 """Audio-routing policy shared by gateway + scheduler + send_message.""" 339 340 def test_unknown_extension_returns_false(self): 341 from gateway.platforms.base import should_send_media_as_audio 342 assert should_send_media_as_audio(None, ".png") is False 343 assert should_send_media_as_audio("telegram", ".pdf") is False 344 345 def test_non_telegram_platforms_route_all_audio(self): 346 from gateway.platforms.base import should_send_media_as_audio 347 for ext in (".mp3", ".m4a", ".wav", ".flac", ".ogg", ".opus"): 348 assert should_send_media_as_audio("discord", ext) is True 349 assert should_send_media_as_audio("slack", ext) is True 350 351 def test_telegram_mp3_and_m4a_route_to_audio(self): 352 from gateway.platforms.base import should_send_media_as_audio 353 assert should_send_media_as_audio("telegram", ".mp3") is True 354 assert should_send_media_as_audio("telegram", ".m4a") is True 355 356 def test_telegram_wav_and_flac_fall_through_to_document(self): 357 from gateway.platforms.base import should_send_media_as_audio 358 assert should_send_media_as_audio("telegram", ".wav") is False 359 assert should_send_media_as_audio("telegram", ".flac") is False 360 361 def test_telegram_ogg_opus_only_when_voice_flagged(self): 362 from gateway.platforms.base import should_send_media_as_audio 363 assert should_send_media_as_audio("telegram", ".ogg", is_voice=True) is True 364 assert should_send_media_as_audio("telegram", ".opus", is_voice=True) is True 365 assert should_send_media_as_audio("telegram", ".ogg") is False 366 assert should_send_media_as_audio("telegram", ".opus") is False 367 368 def test_accepts_platform_enum(self): 369 from gateway.config import Platform 370 from gateway.platforms.base import should_send_media_as_audio 371 assert should_send_media_as_audio(Platform.TELEGRAM, ".mp3") is True 372 assert should_send_media_as_audio(Platform.TELEGRAM, ".flac") is False 373 assert should_send_media_as_audio(Platform.DISCORD, ".flac") is True 374 375 376 # --------------------------------------------------------------------------- 377 # truncate_message 378 # --------------------------------------------------------------------------- 379 380 381 class TestTruncateMessage: 382 def _adapter(self): 383 """Create a minimal adapter instance for testing static/instance methods.""" 384 385 class StubAdapter(BasePlatformAdapter): 386 async def connect(self): 387 return True 388 389 async def disconnect(self): 390 pass 391 392 async def send(self, *a, **kw): 393 pass 394 395 async def get_chat_info(self, *a): 396 return {} 397 398 from gateway.config import Platform, PlatformConfig 399 400 config = PlatformConfig(enabled=True, token="test") 401 return StubAdapter(config=config, platform=Platform.TELEGRAM) 402 403 def test_short_message_single_chunk(self): 404 adapter = self._adapter() 405 chunks = adapter.truncate_message("Hello world", max_length=100) 406 assert chunks == ["Hello world"] 407 408 def test_exact_length_single_chunk(self): 409 adapter = self._adapter() 410 msg = "x" * 100 411 chunks = adapter.truncate_message(msg, max_length=100) 412 assert chunks == [msg] 413 414 def test_long_message_splits(self): 415 adapter = self._adapter() 416 msg = "word " * 200 # ~1000 chars 417 chunks = adapter.truncate_message(msg, max_length=200) 418 assert len(chunks) > 1 419 # Verify all original content is preserved across chunks 420 reassembled = "".join(chunks) 421 # Strip chunk indicators like (1/N) to get raw content 422 for word in msg.strip().split(): 423 assert word in reassembled, f"Word '{word}' lost during truncation" 424 425 def test_chunks_have_indicators(self): 426 adapter = self._adapter() 427 msg = "word " * 200 428 chunks = adapter.truncate_message(msg, max_length=200) 429 assert "(1/" in chunks[0] 430 assert f"({len(chunks)}/{len(chunks)})" in chunks[-1] 431 432 def test_code_block_first_chunk_closed(self): 433 adapter = self._adapter() 434 msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter" 435 chunks = adapter.truncate_message(msg, max_length=300) 436 assert len(chunks) > 1 437 # First chunk must have a closing fence appended (code block was split) 438 first_fences = chunks[0].count("```") 439 assert first_fences == 2, "First chunk should have opening + closing fence" 440 441 def test_code_block_language_tag_carried(self): 442 adapter = self._adapter() 443 msg = "Start\n```javascript\n" + "console.log('x');\n" * 80 + "```\nEnd" 444 chunks = adapter.truncate_message(msg, max_length=300) 445 if len(chunks) > 1: 446 # At least one continuation chunk should reopen with ```javascript 447 reopened_with_lang = any("```javascript" in chunk for chunk in chunks[1:]) 448 assert reopened_with_lang, ( 449 "No continuation chunk reopened with language tag" 450 ) 451 452 def test_continuation_chunks_have_balanced_fences(self): 453 """Regression: continuation chunks must close reopened code blocks.""" 454 adapter = self._adapter() 455 msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter" 456 chunks = adapter.truncate_message(msg, max_length=300) 457 assert len(chunks) > 1 458 for i, chunk in enumerate(chunks): 459 fence_count = chunk.count("```") 460 assert fence_count % 2 == 0, ( 461 f"Chunk {i} has unbalanced fences ({fence_count})" 462 ) 463 464 def test_each_chunk_under_max_length(self): 465 adapter = self._adapter() 466 msg = "word " * 500 467 max_len = 200 468 chunks = adapter.truncate_message(msg, max_length=max_len) 469 for i, chunk in enumerate(chunks): 470 assert len(chunk) <= max_len + 20, ( 471 f"Chunk {i} too long: {len(chunk)} > {max_len}" 472 ) 473 474 475 # --------------------------------------------------------------------------- 476 # _get_human_delay 477 # --------------------------------------------------------------------------- 478 479 480 class TestGetHumanDelay: 481 def test_off_mode(self): 482 with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "off"}): 483 assert BasePlatformAdapter._get_human_delay() == 0.0 484 485 def test_default_is_off(self): 486 with patch.dict(os.environ, {}, clear=False): 487 os.environ.pop("HERMES_HUMAN_DELAY_MODE", None) 488 assert BasePlatformAdapter._get_human_delay() == 0.0 489 490 def test_natural_mode_range(self): 491 with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "natural"}): 492 delay = BasePlatformAdapter._get_human_delay() 493 assert 0.8 <= delay <= 2.5 494 495 def test_custom_mode_uses_env_vars(self): 496 env = { 497 "HERMES_HUMAN_DELAY_MODE": "custom", 498 "HERMES_HUMAN_DELAY_MIN_MS": "100", 499 "HERMES_HUMAN_DELAY_MAX_MS": "200", 500 } 501 with patch.dict(os.environ, env): 502 delay = BasePlatformAdapter._get_human_delay() 503 assert 0.1 <= delay <= 0.2 504 505 506 # --------------------------------------------------------------------------- 507 # utf16_len / _prefix_within_utf16_limit / truncate_message with len_fn 508 # --------------------------------------------------------------------------- 509 # Ported from nearai/ironclaw#2304 β Telegram counts message length in UTF-16 510 # code units, not Unicode code-points. Astral-plane characters (emoji, CJK 511 # Extension B) are surrogate pairs: 1 Python char but 2 UTF-16 units. 512 513 514 class TestUtf16Len: 515 """Verify the UTF-16 length helper.""" 516 517 def test_ascii(self): 518 assert utf16_len("hello") == 5 519 520 def test_bmp_cjk(self): 521 # CJK ideographs in the BMP are 1 code unit each 522 assert utf16_len("δ½ ε₯½") == 2 523 524 def test_emoji_surrogate_pair(self): 525 # π (U+1F600) is outside BMP β 2 UTF-16 code units 526 assert utf16_len("π") == 2 527 528 def test_mixed(self): 529 # "hiπ" = 2 + 2 = 4 UTF-16 units 530 assert utf16_len("hiπ") == 4 531 532 def test_musical_symbol(self): 533 # π (U+1D11E) β Musical Symbol G Clef, surrogate pair 534 assert utf16_len("π") == 2 535 536 def test_empty(self): 537 assert utf16_len("") == 0 538 539 540 class TestPrefixWithinUtf16Limit: 541 """Verify UTF-16-aware prefix truncation.""" 542 543 def test_fits_entirely(self): 544 assert _prefix_within_utf16_limit("hello", 10) == "hello" 545 546 def test_ascii_truncation(self): 547 result = _prefix_within_utf16_limit("hello world", 5) 548 assert result == "hello" 549 assert utf16_len(result) <= 5 550 551 def test_does_not_split_surrogate_pair(self): 552 # "aπb" = 1 + 2 + 1 = 4 UTF-16 units; limit 2 should give "a" 553 result = _prefix_within_utf16_limit("aπb", 2) 554 assert result == "a" 555 assert utf16_len(result) <= 2 556 557 def test_emoji_at_limit(self): 558 # "π" = 2 UTF-16 units; limit 2 should include it 559 result = _prefix_within_utf16_limit("πx", 2) 560 assert result == "π" 561 562 def test_all_emoji(self): 563 msg = "π" * 10 # 20 UTF-16 units 564 result = _prefix_within_utf16_limit(msg, 6) 565 assert result == "πππ" 566 assert utf16_len(result) == 6 567 568 def test_empty(self): 569 assert _prefix_within_utf16_limit("", 5) == "" 570 571 572 class TestTruncateMessageUtf16: 573 """Verify truncate_message respects UTF-16 lengths when len_fn=utf16_len.""" 574 575 def test_short_emoji_message_no_split(self): 576 """A short message under the UTF-16 limit should not be split.""" 577 msg = "Hello π world" 578 chunks = BasePlatformAdapter.truncate_message(msg, 4096, len_fn=utf16_len) 579 assert len(chunks) == 1 580 assert chunks[0] == msg 581 582 def test_emoji_near_limit_triggers_split(self): 583 """A message at 4096 codepoints but >4096 UTF-16 units must split.""" 584 # 2049 emoji = 2049 codepoints but 4098 UTF-16 units β exceeds 4096 585 msg = "π" * 2049 586 assert len(msg) == 2049 # Python len sees 2049 chars 587 assert utf16_len(msg) == 4098 # but it's 4098 UTF-16 units 588 589 # Without UTF-16 awareness, this would NOT split (2049 < 4096) 590 chunks_naive = BasePlatformAdapter.truncate_message(msg, 4096) 591 assert len(chunks_naive) == 1, "Without len_fn, no split expected" 592 593 # With UTF-16 awareness, it MUST split 594 chunks = BasePlatformAdapter.truncate_message(msg, 4096, len_fn=utf16_len) 595 assert len(chunks) > 1, "With utf16_len, message should be split" 596 597 # Each chunk must fit within the UTF-16 limit 598 for i, chunk in enumerate(chunks): 599 assert utf16_len(chunk) <= 4096, ( 600 f"Chunk {i} exceeds 4096 UTF-16 units: {utf16_len(chunk)}" 601 ) 602 603 def test_each_utf16_chunk_within_limit(self): 604 """All chunks produced with utf16_len must fit the limit.""" 605 # Mix of BMP and astral-plane characters 606 msg = ("Hello π world π΅ test π " * 200).strip() 607 max_len = 200 608 chunks = BasePlatformAdapter.truncate_message(msg, max_len, len_fn=utf16_len) 609 for i, chunk in enumerate(chunks): 610 u16_len = utf16_len(chunk) 611 assert u16_len <= max_len + 20, ( 612 f"Chunk {i} UTF-16 length {u16_len} exceeds {max_len}" 613 ) 614 615 def test_all_content_preserved(self): 616 """Splitting with utf16_len must not lose content.""" 617 words = ["emojiπ", "musicπ΅", "cjkδ½ ε₯½", "plain"] * 100 618 msg = " ".join(words) 619 chunks = BasePlatformAdapter.truncate_message(msg, 200, len_fn=utf16_len) 620 reassembled = " ".join(chunks) 621 for word in words: 622 assert word in reassembled, f"Word '{word}' lost during UTF-16 split" 623 624 def test_code_blocks_preserved_with_utf16(self): 625 """Code block fence handling should work with utf16_len too.""" 626 msg = "Before\n```python\n" + "x = 'π'\n" * 200 + "```\nAfter" 627 chunks = BasePlatformAdapter.truncate_message(msg, 300, len_fn=utf16_len) 628 assert len(chunks) > 1 629 # Each chunk should have balanced fences 630 for i, chunk in enumerate(chunks): 631 fence_count = chunk.count("```") 632 assert fence_count % 2 == 0, ( 633 f"Chunk {i} has unbalanced fences ({fence_count})" 634 ) 635 636 637 class TestProxyKwargsForAiohttp: 638 """Verify proxy_kwargs_for_aiohttp routes all schemes through ProxyConnector.""" 639 640 def test_none_returns_empty(self): 641 from gateway.platforms.base import proxy_kwargs_for_aiohttp 642 643 sess_kw, req_kw = proxy_kwargs_for_aiohttp(None) 644 assert sess_kw == {} 645 assert req_kw == {} 646 647 def test_http_proxy_uses_connector_when_aiohttp_socks_available(self): 648 pytest.importorskip("aiohttp_socks") 649 from unittest.mock import MagicMock 650 from gateway.platforms.base import proxy_kwargs_for_aiohttp 651 652 sentinel = MagicMock(name="ProxyConnector") 653 with patch("aiohttp_socks.ProxyConnector.from_url", return_value=sentinel): 654 sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080") 655 assert sess_kw.get("connector") is sentinel, ( 656 "HTTP proxy must use ProxyConnector so libraries that don't " 657 "forward per-request proxy= kwargs still route through the proxy" 658 ) 659 assert req_kw == {} 660 661 def test_socks_proxy_uses_connector(self): 662 pytest.importorskip("aiohttp_socks") 663 from unittest.mock import MagicMock 664 from gateway.platforms.base import proxy_kwargs_for_aiohttp 665 666 sentinel = MagicMock(name="ProxyConnector") 667 with patch("aiohttp_socks.ProxyConnector.from_url", return_value=sentinel): 668 sess_kw, req_kw = proxy_kwargs_for_aiohttp("socks5://proxy:1080") 669 assert sess_kw.get("connector") is sentinel 670 assert req_kw == {} 671 672 def test_http_proxy_falls_back_without_aiohttp_socks(self): 673 from gateway.platforms.base import proxy_kwargs_for_aiohttp 674 675 with patch.dict("sys.modules", {"aiohttp_socks": None}): 676 sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080") 677 assert sess_kw == {} 678 assert req_kw == {"proxy": "http://proxy:8080"} 679