test_weixin.py
1 """Tests for the Weixin platform adapter.""" 2 3 import asyncio 4 import base64 5 import json 6 import os 7 from pathlib import Path 8 from unittest.mock import AsyncMock, Mock, patch 9 10 from gateway.config import PlatformConfig 11 from gateway.config import GatewayConfig, HomeChannel, Platform, _apply_env_overrides 12 from gateway.platforms.base import SendResult 13 from gateway.platforms import weixin 14 from gateway.platforms.weixin import ContextTokenStore, WeixinAdapter 15 from tools.send_message_tool import _parse_target_ref, _send_to_platform 16 17 18 def _make_adapter() -> WeixinAdapter: 19 return WeixinAdapter( 20 PlatformConfig( 21 enabled=True, 22 token="test-token", 23 extra={"account_id": "test-account"}, 24 ) 25 ) 26 27 28 class TestWeixinFormatting: 29 def test_format_message_preserves_markdown(self): 30 adapter = _make_adapter() 31 32 content = "# Title\n\n## Plan\n\nUse **bold** and [docs](https://example.com)." 33 34 assert adapter.format_message(content) == content 35 36 def test_format_message_preserves_markdown_tables(self): 37 adapter = _make_adapter() 38 39 content = ( 40 "| Setting | Value |\n" 41 "| --- | --- |\n" 42 "| Timeout | 30s |\n" 43 "| Retries | 3 |\n" 44 ) 45 46 assert adapter.format_message(content) == content.strip() 47 48 def test_format_message_preserves_fenced_code_blocks(self): 49 adapter = _make_adapter() 50 51 content = "## Snippet\n\n```python\nprint('hi')\n```" 52 53 assert adapter.format_message(content) == content 54 55 def test_format_message_returns_empty_string_for_none(self): 56 adapter = _make_adapter() 57 58 assert adapter.format_message(None) == "" 59 60 61 class TestWeixinChunking: 62 def test_split_text_splits_short_chatty_replies_into_separate_bubbles(self): 63 adapter = _make_adapter() 64 65 content = adapter.format_message("第一行\n第二行\n第三行") 66 chunks = adapter._split_text(content) 67 68 assert chunks == ["第一行", "第二行", "第三行"] 69 70 def test_split_text_keeps_structured_table_block_together(self): 71 adapter = _make_adapter() 72 73 content = adapter.format_message( 74 "- Setting: Timeout\n Value: 30s\n- Setting: Retries\n Value: 3" 75 ) 76 chunks = adapter._split_text(content) 77 78 assert chunks == ["- Setting: Timeout\n Value: 30s\n- Setting: Retries\n Value: 3"] 79 80 def test_split_text_keeps_four_line_structured_blocks_together(self): 81 adapter = _make_adapter() 82 83 content = adapter.format_message( 84 "今天结论:\n" 85 "- 留存下降 3%\n" 86 "- 转化上涨 8%\n" 87 "- 主要问题在首日激活" 88 ) 89 chunks = adapter._split_text(content) 90 91 assert chunks == ["今天结论:\n- 留存下降 3%\n- 转化上涨 8%\n- 主要问题在首日激活"] 92 93 def test_split_text_keeps_heading_with_body_together(self): 94 adapter = _make_adapter() 95 96 content = adapter.format_message("## 结论\n这是正文") 97 chunks = adapter._split_text(content) 98 99 assert chunks == ["## 结论\n这是正文"] 100 101 def test_split_text_keeps_short_reformatted_table_in_single_chunk(self): 102 adapter = _make_adapter() 103 104 content = adapter.format_message( 105 "| Setting | Value |\n" 106 "| --- | --- |\n" 107 "| Timeout | 30s |\n" 108 "| Retries | 3 |\n" 109 ) 110 chunks = adapter._split_text(content) 111 112 assert chunks == [content] 113 114 def test_split_text_keeps_complete_code_block_together_when_possible(self): 115 adapter = _make_adapter() 116 adapter.MAX_MESSAGE_LENGTH = 80 117 118 content = adapter.format_message( 119 "## Intro\n\nShort paragraph.\n\n```python\nprint('hello world')\nprint('again')\n```\n\nTail paragraph." 120 ) 121 chunks = adapter._split_text(content) 122 123 assert len(chunks) >= 2 124 assert any( 125 "```python\nprint('hello world')\nprint('again')\n```" in chunk 126 for chunk in chunks 127 ) 128 assert all(chunk.count("```") % 2 == 0 for chunk in chunks) 129 130 def test_split_text_safely_splits_long_code_blocks(self): 131 adapter = _make_adapter() 132 adapter.MAX_MESSAGE_LENGTH = 70 133 134 lines = "\n".join(f"line_{idx:02d} = {idx}" for idx in range(10)) 135 content = adapter.format_message(f"```python\n{lines}\n```") 136 chunks = adapter._split_text(content) 137 138 assert len(chunks) > 1 139 assert all(len(chunk) <= adapter.MAX_MESSAGE_LENGTH for chunk in chunks) 140 assert all(chunk.count("```") >= 2 for chunk in chunks) 141 142 def test_split_text_can_restore_legacy_multiline_splitting_via_config(self): 143 adapter = WeixinAdapter( 144 PlatformConfig( 145 enabled=True, 146 extra={ 147 "account_id": "acct", 148 "token": "***", 149 "split_multiline_messages": True, 150 }, 151 ) 152 ) 153 154 content = adapter.format_message("第一行\n第二行\n第三行") 155 chunks = adapter._split_text(content) 156 157 assert chunks == ["第一行", "第二行", "第三行"] 158 159 160 class TestWeixinConfig: 161 def test_apply_env_overrides_configures_weixin(self): 162 config = GatewayConfig() 163 164 with patch.dict( 165 os.environ, 166 { 167 "WEIXIN_ACCOUNT_ID": "bot-account", 168 "WEIXIN_TOKEN": "bot-token", 169 "WEIXIN_BASE_URL": "https://ilink.example.com/", 170 "WEIXIN_CDN_BASE_URL": "https://cdn.example.com/c2c/", 171 "WEIXIN_DM_POLICY": "allowlist", 172 "WEIXIN_SPLIT_MULTILINE_MESSAGES": "true", 173 "WEIXIN_ALLOWED_USERS": "wxid_1,wxid_2", 174 "WEIXIN_HOME_CHANNEL": "wxid_1", 175 "WEIXIN_HOME_CHANNEL_NAME": "Primary DM", 176 }, 177 clear=True, 178 ): 179 _apply_env_overrides(config) 180 181 platform_config = config.platforms[Platform.WEIXIN] 182 assert platform_config.enabled is True 183 assert platform_config.token == "bot-token" 184 assert platform_config.extra["account_id"] == "bot-account" 185 assert platform_config.extra["base_url"] == "https://ilink.example.com" 186 assert platform_config.extra["cdn_base_url"] == "https://cdn.example.com/c2c" 187 assert platform_config.extra["dm_policy"] == "allowlist" 188 assert platform_config.extra["split_multiline_messages"] == "true" 189 assert platform_config.extra["allow_from"] == "wxid_1,wxid_2" 190 assert platform_config.home_channel == HomeChannel(Platform.WEIXIN, "wxid_1", "Primary DM") 191 192 def test_get_connected_platforms_includes_weixin_with_token(self): 193 config = GatewayConfig( 194 platforms={ 195 Platform.WEIXIN: PlatformConfig( 196 enabled=True, 197 token="bot-token", 198 extra={"account_id": "bot-account"}, 199 ) 200 } 201 ) 202 203 assert config.get_connected_platforms() == [Platform.WEIXIN] 204 205 def test_get_connected_platforms_requires_account_id(self): 206 config = GatewayConfig( 207 platforms={ 208 Platform.WEIXIN: PlatformConfig( 209 enabled=True, 210 token="bot-token", 211 ) 212 } 213 ) 214 215 assert config.get_connected_platforms() == [] 216 217 218 class TestWeixinStatePersistence: 219 def test_save_weixin_account_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch): 220 account_path = tmp_path / "weixin" / "accounts" / "acct.json" 221 account_path.parent.mkdir(parents=True, exist_ok=True) 222 original = {"token": "old-token", "base_url": "https://old.example.com"} 223 account_path.write_text(json.dumps(original), encoding="utf-8") 224 225 def _boom(_src, _dst): 226 raise OSError("disk full") 227 228 monkeypatch.setattr("utils.os.replace", _boom) 229 230 try: 231 weixin.save_weixin_account( 232 str(tmp_path), 233 account_id="acct", 234 token="new-token", 235 base_url="https://new.example.com", 236 user_id="wxid_new", 237 ) 238 except OSError: 239 pass 240 else: 241 raise AssertionError("expected save_weixin_account to propagate replace failure") 242 243 assert json.loads(account_path.read_text(encoding="utf-8")) == original 244 245 def test_context_token_persist_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch): 246 token_path = tmp_path / "weixin" / "accounts" / "acct.context-tokens.json" 247 token_path.parent.mkdir(parents=True, exist_ok=True) 248 token_path.write_text(json.dumps({"user-a": "old-token"}), encoding="utf-8") 249 250 def _boom(_src, _dst): 251 raise OSError("disk full") 252 253 monkeypatch.setattr("utils.os.replace", _boom) 254 255 store = ContextTokenStore(str(tmp_path)) 256 with patch.object(weixin.logger, "warning") as warning_mock: 257 store.set("acct", "user-b", "new-token") 258 259 assert json.loads(token_path.read_text(encoding="utf-8")) == {"user-a": "old-token"} 260 warning_mock.assert_called_once() 261 262 def test_save_sync_buf_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch): 263 sync_path = tmp_path / "weixin" / "accounts" / "acct.sync.json" 264 sync_path.parent.mkdir(parents=True, exist_ok=True) 265 sync_path.write_text(json.dumps({"get_updates_buf": "old-sync"}), encoding="utf-8") 266 267 def _boom(_src, _dst): 268 raise OSError("disk full") 269 270 monkeypatch.setattr("utils.os.replace", _boom) 271 272 try: 273 weixin._save_sync_buf(str(tmp_path), "acct", "new-sync") 274 except OSError: 275 pass 276 else: 277 raise AssertionError("expected _save_sync_buf to propagate replace failure") 278 279 assert json.loads(sync_path.read_text(encoding="utf-8")) == {"get_updates_buf": "old-sync"} 280 281 282 class TestWeixinSendMessageIntegration: 283 def test_parse_target_ref_accepts_weixin_ids(self): 284 assert _parse_target_ref("weixin", "wxid_test123") == ("wxid_test123", None, True) 285 assert _parse_target_ref("weixin", "filehelper") == ("filehelper", None, True) 286 assert _parse_target_ref("weixin", "group@chatroom") == ("group@chatroom", None, True) 287 288 @patch("tools.send_message_tool._send_weixin", new_callable=AsyncMock) 289 def test_send_to_platform_routes_weixin_media_to_native_helper(self, send_weixin_mock): 290 send_weixin_mock.return_value = {"success": True, "platform": "weixin", "chat_id": "wxid_test123"} 291 config = PlatformConfig(enabled=True, token="bot-token", extra={"account_id": "bot-account"}) 292 293 result = asyncio.run( 294 _send_to_platform( 295 Platform.WEIXIN, 296 config, 297 "wxid_test123", 298 "hello", 299 media_files=[("/tmp/demo.png", False)], 300 ) 301 ) 302 303 assert result["success"] is True 304 send_weixin_mock.assert_awaited_once_with( 305 config, 306 "wxid_test123", 307 "hello", 308 media_files=[("/tmp/demo.png", False)], 309 ) 310 311 312 class TestWeixinChunkDelivery: 313 def _connected_adapter(self) -> WeixinAdapter: 314 adapter = _make_adapter() 315 adapter._session = object() 316 adapter._send_session = adapter._session 317 adapter._token = "test-token" 318 adapter._base_url = "https://weixin.example.com" 319 adapter._token_store.get = lambda account_id, chat_id: "ctx-token" 320 return adapter 321 322 @patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock) 323 @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) 324 def test_send_waits_between_multiple_chunks(self, send_message_mock, sleep_mock): 325 adapter = self._connected_adapter() 326 adapter.MAX_MESSAGE_LENGTH = 12 327 328 # Use double newlines so _pack_markdown_blocks splits into 3 blocks 329 result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird")) 330 331 assert result.success is True 332 assert send_message_mock.await_count == 3 333 assert sleep_mock.await_count == 2 334 335 @patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock) 336 @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) 337 def test_send_retries_failed_chunk_before_continuing(self, send_message_mock, sleep_mock): 338 adapter = self._connected_adapter() 339 adapter.MAX_MESSAGE_LENGTH = 12 340 calls = {"count": 0} 341 342 async def flaky_send(*args, **kwargs): 343 calls["count"] += 1 344 if calls["count"] == 2: 345 raise RuntimeError("temporary iLink failure") 346 347 send_message_mock.side_effect = flaky_send 348 349 # Use double newlines so _pack_markdown_blocks splits into 3 blocks 350 result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird")) 351 352 assert result.success is True 353 # 3 chunks, but chunk 2 fails once and retries → 4 _send_message calls total 354 assert send_message_mock.await_count == 4 355 # The retried chunk should reuse the same client_id for deduplication 356 first_try = send_message_mock.await_args_list[1].kwargs 357 retry = send_message_mock.await_args_list[2].kwargs 358 assert first_try["text"] == retry["text"] 359 assert first_try["client_id"] == retry["client_id"] 360 361 362 class TestWeixinOutboundMedia: 363 def test_send_image_file_accepts_keyword_image_path(self): 364 adapter = _make_adapter() 365 expected = SendResult(success=True, message_id="msg-1") 366 adapter.send_document = AsyncMock(return_value=expected) 367 368 result = asyncio.run( 369 adapter.send_image_file( 370 chat_id="wxid_test123", 371 image_path="/tmp/demo.png", 372 caption="截图说明", 373 reply_to="reply-1", 374 metadata={"thread_id": "t-1"}, 375 ) 376 ) 377 378 assert result == expected 379 adapter.send_document.assert_awaited_once_with( 380 chat_id="wxid_test123", 381 file_path="/tmp/demo.png", 382 caption="截图说明", 383 metadata={"thread_id": "t-1"}, 384 ) 385 386 def test_send_document_accepts_keyword_file_path(self): 387 adapter = _make_adapter() 388 adapter._session = object() 389 adapter._send_session = adapter._session 390 adapter._token = "test-token" 391 adapter._send_file = AsyncMock(return_value="msg-2") 392 393 result = asyncio.run( 394 adapter.send_document( 395 chat_id="wxid_test123", 396 file_path="/tmp/report.pdf", 397 caption="报告请看", 398 file_name="renamed.pdf", 399 reply_to="reply-1", 400 metadata={"thread_id": "t-1"}, 401 ) 402 ) 403 404 assert result.success is True 405 assert result.message_id == "msg-2" 406 adapter._send_file.assert_awaited_once_with("wxid_test123", "/tmp/report.pdf", "报告请看") 407 408 def test_send_file_uses_post_for_upload_full_url_and_hex_encoded_aes_key(self, tmp_path): 409 class _UploadResponse: 410 def __init__(self): 411 self.status = 200 412 self.headers = {"x-encrypted-param": "enc-param"} 413 414 async def __aenter__(self): 415 return self 416 417 async def __aexit__(self, exc_type, exc, tb): 418 return False 419 420 async def read(self): 421 return b"" 422 423 async def text(self): 424 return "" 425 426 class _RecordingSession: 427 def __init__(self): 428 self.post_calls = [] 429 430 def post(self, url, **kwargs): 431 self.post_calls.append((url, kwargs)) 432 return _UploadResponse() 433 434 def put(self, *_args, **_kwargs): 435 raise AssertionError("upload_full_url branch should use POST") 436 437 image_path = tmp_path / "demo.png" 438 image_path.write_bytes(b"fake-png-bytes") 439 440 adapter = _make_adapter() 441 session = _RecordingSession() 442 adapter._session = session 443 adapter._send_session = session 444 adapter._token = "test-token" 445 adapter._base_url = "https://weixin.example.com" 446 adapter._cdn_base_url = "https://cdn.example.com/c2c" 447 adapter._token_store.get = lambda account_id, chat_id: None 448 449 aes_key = bytes(range(16)) 450 expected_aes_key = base64.b64encode(aes_key.hex().encode("ascii")).decode("ascii") 451 452 with patch("gateway.platforms.weixin._get_upload_url", new=AsyncMock(return_value={"upload_full_url": "https://upload.example.com/media"})), \ 453 patch("gateway.platforms.weixin._api_post", new_callable=AsyncMock) as api_post_mock, \ 454 patch("gateway.platforms.weixin.secrets.token_hex", return_value="filekey-123"), \ 455 patch("gateway.platforms.weixin.secrets.token_bytes", return_value=aes_key): 456 message_id = asyncio.run(adapter._send_file("wxid_test123", str(image_path), "")) 457 458 assert message_id.startswith("hermes-weixin-") 459 assert len(session.post_calls) == 1 460 upload_url, upload_kwargs = session.post_calls[0] 461 assert upload_url == "https://upload.example.com/media" 462 assert upload_kwargs["headers"] == {"Content-Type": "application/octet-stream"} 463 assert upload_kwargs["data"] 464 assert upload_kwargs["timeout"].total == 120 465 payload = api_post_mock.await_args.kwargs["payload"] 466 media = payload["msg"]["item_list"][0]["image_item"]["media"] 467 assert media["encrypt_query_param"] == "enc-param" 468 assert media["aes_key"] == expected_aes_key 469 470 471 class TestWeixinRemoteMediaSafety: 472 def test_download_remote_media_blocks_unsafe_urls(self): 473 adapter = _make_adapter() 474 475 with patch("tools.url_safety.is_safe_url", return_value=False): 476 try: 477 asyncio.run(adapter._download_remote_media("http://127.0.0.1/private.png")) 478 except ValueError as exc: 479 assert "Blocked unsafe URL" in str(exc) 480 else: 481 raise AssertionError("expected ValueError for unsafe URL") 482 483 484 class TestWeixinMarkdownLinks: 485 """Markdown links should be preserved so WeChat can render them natively.""" 486 487 def test_format_message_preserves_markdown_links(self): 488 adapter = _make_adapter() 489 490 content = "Check [the docs](https://example.com) and [GitHub](https://github.com) for details" 491 assert adapter.format_message(content) == content 492 493 def test_format_message_preserves_links_inside_code_blocks(self): 494 adapter = _make_adapter() 495 496 content = "See below:\n\n```\n[link](https://example.com)\n```\n\nDone." 497 result = adapter.format_message(content) 498 assert "[link](https://example.com)" in result 499 500 501 class TestWeixinBlankMessagePrevention: 502 """Regression tests for the blank-bubble bugs. 503 504 Three separate guards now prevent a blank WeChat message from ever being 505 dispatched: 506 507 1. ``_split_text_for_weixin_delivery("")`` returns ``[]`` — not ``[""]``. 508 2. ``send()`` filters out empty/whitespace-only chunks before calling 509 ``_send_text_chunk``. 510 3. ``_send_message()`` raises ``ValueError`` for empty text as a last-resort 511 safety net. 512 """ 513 514 def test_split_text_returns_empty_list_for_empty_string(self): 515 adapter = _make_adapter() 516 assert adapter._split_text("") == [] 517 518 def test_split_text_returns_empty_list_for_empty_string_split_per_line(self): 519 adapter = WeixinAdapter( 520 PlatformConfig( 521 enabled=True, 522 extra={ 523 "account_id": "acct", 524 "token": "test-tok", 525 "split_multiline_messages": True, 526 }, 527 ) 528 ) 529 assert adapter._split_text("") == [] 530 531 @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock) 532 def test_send_empty_content_does_not_call_send_message(self, send_message_mock): 533 adapter = _make_adapter() 534 adapter._session = object() 535 adapter._send_session = adapter._session 536 adapter._token = "test-token" 537 adapter._base_url = "https://weixin.example.com" 538 adapter._token_store.get = lambda account_id, chat_id: "ctx-token" 539 540 result = asyncio.run(adapter.send("wxid_test123", "")) 541 # Empty content → no chunks → no _send_message calls 542 assert result.success is True 543 send_message_mock.assert_not_awaited() 544 545 def test_send_message_rejects_empty_text(self): 546 """_send_message raises ValueError for empty/whitespace text.""" 547 import pytest 548 with pytest.raises(ValueError, match="text must not be empty"): 549 asyncio.run( 550 weixin._send_message( 551 AsyncMock(), 552 base_url="https://example.com", 553 token="tok", 554 to="wxid_test", 555 text="", 556 context_token=None, 557 client_id="cid", 558 ) 559 ) 560 561 562 class TestWeixinStreamingCursorSuppression: 563 """WeChat doesn't support message editing — cursor must be suppressed.""" 564 565 def test_supports_message_editing_is_false(self): 566 adapter = _make_adapter() 567 assert adapter.SUPPORTS_MESSAGE_EDITING is False 568 569 570 class TestWeixinMediaBuilder: 571 """Media builder uses base64(hex_key), not base64(raw_bytes) for aes_key.""" 572 573 def test_image_builder_aes_key_is_base64_of_hex(self): 574 import base64 575 adapter = _make_adapter() 576 media_type, builder = adapter._outbound_media_builder("photo.jpg") 577 assert media_type == weixin.MEDIA_IMAGE 578 579 fake_hex_key = "0123456789abcdef0123456789abcdef" 580 expected_aes = base64.b64encode(fake_hex_key.encode("ascii")).decode("ascii") 581 item = builder( 582 encrypt_query_param="eq", 583 aes_key_for_api=expected_aes, 584 ciphertext_size=1024, 585 plaintext_size=1000, 586 filename="photo.jpg", 587 rawfilemd5="abc123", 588 ) 589 assert item["image_item"]["media"]["aes_key"] == expected_aes 590 591 def test_video_builder_includes_md5(self): 592 adapter = _make_adapter() 593 media_type, builder = adapter._outbound_media_builder("clip.mp4") 594 assert media_type == weixin.MEDIA_VIDEO 595 596 item = builder( 597 encrypt_query_param="eq", 598 aes_key_for_api="fakekey", 599 ciphertext_size=2048, 600 plaintext_size=2000, 601 filename="clip.mp4", 602 rawfilemd5="deadbeef", 603 ) 604 assert item["video_item"]["video_md5"] == "deadbeef" 605 606 def test_voice_builder_for_audio_files_uses_file_attachment_type(self): 607 adapter = _make_adapter() 608 media_type, builder = adapter._outbound_media_builder("note.mp3") 609 assert media_type == weixin.MEDIA_FILE 610 611 item = builder( 612 encrypt_query_param="eq", 613 aes_key_for_api="fakekey", 614 ciphertext_size=512, 615 plaintext_size=500, 616 filename="note.mp3", 617 rawfilemd5="abc", 618 ) 619 assert item["type"] == weixin.ITEM_FILE 620 assert item["file_item"]["file_name"] == "note.mp3" 621 622 def test_voice_builder_for_silk_files(self): 623 adapter = _make_adapter() 624 media_type, builder = adapter._outbound_media_builder("recording.silk") 625 assert media_type == weixin.MEDIA_VOICE 626 627 628 class TestWeixinSendImageFileParameterName: 629 """Regression test for send_image_file parameter name mismatch. 630 631 The gateway calls send_image_file(chat_id=..., image_path=...) but the 632 WeixinAdapter previously used 'path' as the parameter name, causing 633 image sending to fail. This test ensures the interface stays correct. 634 """ 635 636 @patch.object(WeixinAdapter, "send_document", new_callable=AsyncMock) 637 def test_send_image_file_uses_image_path_parameter(self, send_document_mock): 638 """Verify send_image_file accepts image_path and forwards to send_document.""" 639 adapter = _make_adapter() 640 adapter._session = object() 641 adapter._send_session = adapter._session 642 adapter._token = "test-token" 643 644 send_document_mock.return_value = weixin.SendResult(success=True, message_id="test-id") 645 646 # This is the call pattern used by gateway/run.py extract_media 647 result = asyncio.run( 648 adapter.send_image_file( 649 chat_id="wxid_test123", 650 image_path="/tmp/test_image.png", 651 caption="Test caption", 652 metadata={"thread_id": "thread-123"}, 653 ) 654 ) 655 656 assert result.success is True 657 send_document_mock.assert_awaited_once_with( 658 chat_id="wxid_test123", 659 file_path="/tmp/test_image.png", 660 caption="Test caption", 661 metadata={"thread_id": "thread-123"}, 662 ) 663 664 @patch.object(WeixinAdapter, "send_document", new_callable=AsyncMock) 665 def test_send_image_file_works_without_optional_params(self, send_document_mock): 666 """Verify send_image_file works with minimal required params.""" 667 adapter = _make_adapter() 668 adapter._session = object() 669 adapter._send_session = adapter._session 670 adapter._token = "test-token" 671 672 send_document_mock.return_value = weixin.SendResult(success=True, message_id="test-id") 673 674 result = asyncio.run( 675 adapter.send_image_file( 676 chat_id="wxid_test123", 677 image_path="/tmp/test_image.jpg", 678 ) 679 ) 680 681 assert result.success is True 682 send_document_mock.assert_awaited_once_with( 683 chat_id="wxid_test123", 684 file_path="/tmp/test_image.jpg", 685 caption=None, 686 metadata=None, 687 ) 688 689 690 class TestWeixinVoiceSending: 691 def _connected_adapter(self) -> WeixinAdapter: 692 adapter = _make_adapter() 693 adapter._session = object() 694 adapter._send_session = adapter._session 695 adapter._token = "test-token" 696 adapter._base_url = "https://weixin.example.com" 697 adapter._token_store.get = lambda account_id, chat_id: "ctx-token" 698 return adapter 699 700 @patch.object(WeixinAdapter, "_send_file", new_callable=AsyncMock) 701 def test_send_voice_downgrades_to_document_attachment(self, send_file_mock, tmp_path): 702 adapter = self._connected_adapter() 703 source = tmp_path / "voice.ogg" 704 source.write_bytes(b"ogg") 705 send_file_mock.return_value = "msg-1" 706 707 result = asyncio.run(adapter.send_voice("wxid_test123", str(source))) 708 709 assert result.success is True 710 send_file_mock.assert_awaited_once_with( 711 "wxid_test123", 712 str(source), 713 "[voice message as attachment]", 714 force_file_attachment=True, 715 ) 716 717 def test_voice_builder_for_silk_files_can_be_forced_to_file_attachment(self): 718 adapter = _make_adapter() 719 media_type, builder = adapter._outbound_media_builder( 720 "recording.silk", 721 force_file_attachment=True, 722 ) 723 assert media_type == weixin.MEDIA_FILE 724 725 item = builder( 726 encrypt_query_param="eq", 727 aes_key_for_api="fakekey", 728 ciphertext_size=512, 729 plaintext_size=500, 730 filename="recording.silk", 731 rawfilemd5="abc", 732 ) 733 assert item["type"] == weixin.ITEM_FILE 734 assert item["file_item"]["file_name"] == "recording.silk" 735 736 @patch.object(weixin, "_api_post", new_callable=AsyncMock) 737 @patch.object(weixin, "_upload_ciphertext", new_callable=AsyncMock) 738 @patch.object(weixin, "_get_upload_url", new_callable=AsyncMock) 739 def test_send_file_sets_voice_metadata_for_silk_payload( 740 self, 741 get_upload_url_mock, 742 upload_ciphertext_mock, 743 api_post_mock, 744 tmp_path, 745 ): 746 adapter = self._connected_adapter() 747 silk = tmp_path / "voice.silk" 748 silk.write_bytes(b"\x02#!SILK_V3\x01\x00") 749 get_upload_url_mock.return_value = {"upload_full_url": "https://cdn.example.com/upload"} 750 upload_ciphertext_mock.return_value = "enc-q" 751 api_post_mock.return_value = {"success": True} 752 753 asyncio.run(adapter._send_file("wxid_test123", str(silk), "")) 754 755 payload = api_post_mock.await_args.kwargs["payload"] 756 voice_item = payload["msg"]["item_list"][0]["voice_item"] 757 assert voice_item.get("playtime", 0) == 0 758 assert voice_item["encode_type"] == 6 759 assert voice_item["sample_rate"] == 24000 760 assert voice_item["bits_per_sample"] == 16 761 762 763 class TestIsStaleSessionRet: 764 """Regression test for #17228: distinguish stale-session ret=-2 from rate-limit ret=-2.""" 765 766 def test_ret_minus_2_with_unknown_error_is_stale(self): 767 assert weixin._is_stale_session_ret(-2, None, "unknown error") is True 768 769 def test_errcode_minus_2_with_unknown_error_is_stale(self): 770 assert weixin._is_stale_session_ret(None, -2, "unknown error") is True 771 772 def test_unknown_error_case_insensitive(self): 773 assert weixin._is_stale_session_ret(-2, None, "Unknown Error") is True 774 775 def test_ret_minus_2_with_freq_limit_is_not_stale(self): 776 # Genuine rate limit — must NOT be treated as stale session. 777 assert weixin._is_stale_session_ret(-2, None, "freq limit") is False 778 779 def test_ret_minus_2_with_no_errmsg_is_not_stale(self): 780 assert weixin._is_stale_session_ret(-2, None, None) is False 781 assert weixin._is_stale_session_ret(-2, None, "") is False 782 783 def test_errcode_minus_14_is_not_matched_here(self): 784 # -14 is handled by the separate SESSION_EXPIRED_ERRCODE path; the 785 # helper only disambiguates -2 from a genuine rate limit. 786 assert weixin._is_stale_session_ret(-14, None, "session expired") is False 787 788 def test_success_codes_are_not_stale(self): 789 assert weixin._is_stale_session_ret(0, 0, "") is False 790 assert weixin._is_stale_session_ret(None, None, "unknown error") is False 791 792 793 class TestWeixinContentDedup: 794 """Regression tests for Issue #16182 — upstream API sends duplicate content 795 with different message_ids, bypassing message_id deduplication. 796 """ 797 798 def test_duplicate_content_with_different_message_ids_is_dropped(self): 799 adapter = _make_adapter() 800 adapter._poll_session = object() 801 adapter.handle_message = AsyncMock() 802 803 base_msg = { 804 "from_user_id": "wxid_user1", 805 "item_list": [{"type": 1, "text_item": {"text": "hello world"}}], 806 } 807 808 asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-1"})) 809 asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-2"})) 810 811 assert adapter.handle_message.await_count == 1 812 event = adapter.handle_message.await_args[0][0] 813 assert event.text == "hello world" 814 815 def test_content_dedup_not_called_for_messages_without_text(self): 816 adapter = _make_adapter() 817 adapter._poll_session = object() 818 adapter.handle_message = AsyncMock() 819 adapter._dedup.is_duplicate = Mock(return_value=False) 820 821 empty_msg = { 822 "from_user_id": "wxid_user1", 823 "message_id": "msg-1", 824 "item_list": [], 825 } 826 asyncio.run(adapter._process_message(empty_msg)) 827 828 assert adapter.handle_message.await_count == 0 829 # is_duplicate should only be called for message_id, never for content 830 assert all("content:" not in str(call) for call in adapter._dedup.is_duplicate.call_args_list)