/ tests / gateway / test_weixin.py
test_weixin.py
  1  """Tests for the Weixin platform adapter."""
  2  
  3  import asyncio
  4  import base64
  5  import json
  6  import os
  7  from pathlib import Path
  8  from unittest.mock import AsyncMock, Mock, patch
  9  
 10  from gateway.config import PlatformConfig
 11  from gateway.config import GatewayConfig, HomeChannel, Platform, _apply_env_overrides
 12  from gateway.platforms.base import SendResult
 13  from gateway.platforms import weixin
 14  from gateway.platforms.weixin import ContextTokenStore, WeixinAdapter
 15  from tools.send_message_tool import _parse_target_ref, _send_to_platform
 16  
 17  
 18  def _make_adapter() -> WeixinAdapter:
 19      return WeixinAdapter(
 20          PlatformConfig(
 21              enabled=True,
 22              token="test-token",
 23              extra={"account_id": "test-account"},
 24          )
 25      )
 26  
 27  
 28  class TestWeixinFormatting:
 29      def test_format_message_preserves_markdown(self):
 30          adapter = _make_adapter()
 31  
 32          content = "# Title\n\n## Plan\n\nUse **bold** and [docs](https://example.com)."
 33  
 34          assert adapter.format_message(content) == content
 35  
 36      def test_format_message_preserves_markdown_tables(self):
 37          adapter = _make_adapter()
 38  
 39          content = (
 40              "| Setting | Value |\n"
 41              "| --- | --- |\n"
 42              "| Timeout | 30s |\n"
 43              "| Retries | 3 |\n"
 44          )
 45  
 46          assert adapter.format_message(content) == content.strip()
 47  
 48      def test_format_message_preserves_fenced_code_blocks(self):
 49          adapter = _make_adapter()
 50  
 51          content = "## Snippet\n\n```python\nprint('hi')\n```"
 52  
 53          assert adapter.format_message(content) == content
 54  
 55      def test_format_message_returns_empty_string_for_none(self):
 56          adapter = _make_adapter()
 57  
 58          assert adapter.format_message(None) == ""
 59  
 60  
 61  class TestWeixinChunking:
 62      def test_split_text_splits_short_chatty_replies_into_separate_bubbles(self):
 63          adapter = _make_adapter()
 64  
 65          content = adapter.format_message("第一行\n第二行\n第三行")
 66          chunks = adapter._split_text(content)
 67  
 68          assert chunks == ["第一行", "第二行", "第三行"]
 69  
 70      def test_split_text_keeps_structured_table_block_together(self):
 71          adapter = _make_adapter()
 72  
 73          content = adapter.format_message(
 74              "- Setting: Timeout\n  Value: 30s\n- Setting: Retries\n  Value: 3"
 75          )
 76          chunks = adapter._split_text(content)
 77  
 78          assert chunks == ["- Setting: Timeout\n  Value: 30s\n- Setting: Retries\n  Value: 3"]
 79  
 80      def test_split_text_keeps_four_line_structured_blocks_together(self):
 81          adapter = _make_adapter()
 82  
 83          content = adapter.format_message(
 84              "今天结论:\n"
 85              "- 留存下降 3%\n"
 86              "- 转化上涨 8%\n"
 87              "- 主要问题在首日激活"
 88          )
 89          chunks = adapter._split_text(content)
 90  
 91          assert chunks == ["今天结论:\n- 留存下降 3%\n- 转化上涨 8%\n- 主要问题在首日激活"]
 92  
 93      def test_split_text_keeps_heading_with_body_together(self):
 94          adapter = _make_adapter()
 95  
 96          content = adapter.format_message("## 结论\n这是正文")
 97          chunks = adapter._split_text(content)
 98  
 99          assert chunks == ["## 结论\n这是正文"]
100  
101      def test_split_text_keeps_short_reformatted_table_in_single_chunk(self):
102          adapter = _make_adapter()
103  
104          content = adapter.format_message(
105              "| Setting | Value |\n"
106              "| --- | --- |\n"
107              "| Timeout | 30s |\n"
108              "| Retries | 3 |\n"
109          )
110          chunks = adapter._split_text(content)
111  
112          assert chunks == [content]
113  
114      def test_split_text_keeps_complete_code_block_together_when_possible(self):
115          adapter = _make_adapter()
116          adapter.MAX_MESSAGE_LENGTH = 80
117  
118          content = adapter.format_message(
119              "## Intro\n\nShort paragraph.\n\n```python\nprint('hello world')\nprint('again')\n```\n\nTail paragraph."
120          )
121          chunks = adapter._split_text(content)
122  
123          assert len(chunks) >= 2
124          assert any(
125              "```python\nprint('hello world')\nprint('again')\n```" in chunk
126              for chunk in chunks
127          )
128          assert all(chunk.count("```") % 2 == 0 for chunk in chunks)
129  
130      def test_split_text_safely_splits_long_code_blocks(self):
131          adapter = _make_adapter()
132          adapter.MAX_MESSAGE_LENGTH = 70
133  
134          lines = "\n".join(f"line_{idx:02d} = {idx}" for idx in range(10))
135          content = adapter.format_message(f"```python\n{lines}\n```")
136          chunks = adapter._split_text(content)
137  
138          assert len(chunks) > 1
139          assert all(len(chunk) <= adapter.MAX_MESSAGE_LENGTH for chunk in chunks)
140          assert all(chunk.count("```") >= 2 for chunk in chunks)
141  
142      def test_split_text_can_restore_legacy_multiline_splitting_via_config(self):
143          adapter = WeixinAdapter(
144              PlatformConfig(
145                  enabled=True,
146                  extra={
147                      "account_id": "acct",
148                      "token": "***",
149                      "split_multiline_messages": True,
150                  },
151              )
152          )
153  
154          content = adapter.format_message("第一行\n第二行\n第三行")
155          chunks = adapter._split_text(content)
156  
157          assert chunks == ["第一行", "第二行", "第三行"]
158  
159  
160  class TestWeixinConfig:
161      def test_apply_env_overrides_configures_weixin(self):
162          config = GatewayConfig()
163  
164          with patch.dict(
165              os.environ,
166              {
167                  "WEIXIN_ACCOUNT_ID": "bot-account",
168                  "WEIXIN_TOKEN": "bot-token",
169                  "WEIXIN_BASE_URL": "https://ilink.example.com/",
170                  "WEIXIN_CDN_BASE_URL": "https://cdn.example.com/c2c/",
171                  "WEIXIN_DM_POLICY": "allowlist",
172                  "WEIXIN_SPLIT_MULTILINE_MESSAGES": "true",
173                  "WEIXIN_ALLOWED_USERS": "wxid_1,wxid_2",
174                  "WEIXIN_HOME_CHANNEL": "wxid_1",
175                  "WEIXIN_HOME_CHANNEL_NAME": "Primary DM",
176              },
177              clear=True,
178          ):
179              _apply_env_overrides(config)
180  
181          platform_config = config.platforms[Platform.WEIXIN]
182          assert platform_config.enabled is True
183          assert platform_config.token == "bot-token"
184          assert platform_config.extra["account_id"] == "bot-account"
185          assert platform_config.extra["base_url"] == "https://ilink.example.com"
186          assert platform_config.extra["cdn_base_url"] == "https://cdn.example.com/c2c"
187          assert platform_config.extra["dm_policy"] == "allowlist"
188          assert platform_config.extra["split_multiline_messages"] == "true"
189          assert platform_config.extra["allow_from"] == "wxid_1,wxid_2"
190          assert platform_config.home_channel == HomeChannel(Platform.WEIXIN, "wxid_1", "Primary DM")
191  
192      def test_get_connected_platforms_includes_weixin_with_token(self):
193          config = GatewayConfig(
194              platforms={
195                  Platform.WEIXIN: PlatformConfig(
196                      enabled=True,
197                      token="bot-token",
198                      extra={"account_id": "bot-account"},
199                  )
200              }
201          )
202  
203          assert config.get_connected_platforms() == [Platform.WEIXIN]
204  
205      def test_get_connected_platforms_requires_account_id(self):
206          config = GatewayConfig(
207              platforms={
208                  Platform.WEIXIN: PlatformConfig(
209                      enabled=True,
210                      token="bot-token",
211                  )
212              }
213          )
214  
215          assert config.get_connected_platforms() == []
216  
217  
218  class TestWeixinStatePersistence:
219      def test_save_weixin_account_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch):
220          account_path = tmp_path / "weixin" / "accounts" / "acct.json"
221          account_path.parent.mkdir(parents=True, exist_ok=True)
222          original = {"token": "old-token", "base_url": "https://old.example.com"}
223          account_path.write_text(json.dumps(original), encoding="utf-8")
224  
225          def _boom(_src, _dst):
226              raise OSError("disk full")
227  
228          monkeypatch.setattr("utils.os.replace", _boom)
229  
230          try:
231              weixin.save_weixin_account(
232                  str(tmp_path),
233                  account_id="acct",
234                  token="new-token",
235                  base_url="https://new.example.com",
236                  user_id="wxid_new",
237              )
238          except OSError:
239              pass
240          else:
241              raise AssertionError("expected save_weixin_account to propagate replace failure")
242  
243          assert json.loads(account_path.read_text(encoding="utf-8")) == original
244  
245      def test_context_token_persist_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch):
246          token_path = tmp_path / "weixin" / "accounts" / "acct.context-tokens.json"
247          token_path.parent.mkdir(parents=True, exist_ok=True)
248          token_path.write_text(json.dumps({"user-a": "old-token"}), encoding="utf-8")
249  
250          def _boom(_src, _dst):
251              raise OSError("disk full")
252  
253          monkeypatch.setattr("utils.os.replace", _boom)
254  
255          store = ContextTokenStore(str(tmp_path))
256          with patch.object(weixin.logger, "warning") as warning_mock:
257              store.set("acct", "user-b", "new-token")
258  
259          assert json.loads(token_path.read_text(encoding="utf-8")) == {"user-a": "old-token"}
260          warning_mock.assert_called_once()
261  
262      def test_save_sync_buf_preserves_existing_file_on_replace_failure(self, tmp_path, monkeypatch):
263          sync_path = tmp_path / "weixin" / "accounts" / "acct.sync.json"
264          sync_path.parent.mkdir(parents=True, exist_ok=True)
265          sync_path.write_text(json.dumps({"get_updates_buf": "old-sync"}), encoding="utf-8")
266  
267          def _boom(_src, _dst):
268              raise OSError("disk full")
269  
270          monkeypatch.setattr("utils.os.replace", _boom)
271  
272          try:
273              weixin._save_sync_buf(str(tmp_path), "acct", "new-sync")
274          except OSError:
275              pass
276          else:
277              raise AssertionError("expected _save_sync_buf to propagate replace failure")
278  
279          assert json.loads(sync_path.read_text(encoding="utf-8")) == {"get_updates_buf": "old-sync"}
280  
281  
282  class TestWeixinSendMessageIntegration:
283      def test_parse_target_ref_accepts_weixin_ids(self):
284          assert _parse_target_ref("weixin", "wxid_test123") == ("wxid_test123", None, True)
285          assert _parse_target_ref("weixin", "filehelper") == ("filehelper", None, True)
286          assert _parse_target_ref("weixin", "group@chatroom") == ("group@chatroom", None, True)
287  
288      @patch("tools.send_message_tool._send_weixin", new_callable=AsyncMock)
289      def test_send_to_platform_routes_weixin_media_to_native_helper(self, send_weixin_mock):
290          send_weixin_mock.return_value = {"success": True, "platform": "weixin", "chat_id": "wxid_test123"}
291          config = PlatformConfig(enabled=True, token="bot-token", extra={"account_id": "bot-account"})
292  
293          result = asyncio.run(
294              _send_to_platform(
295                  Platform.WEIXIN,
296                  config,
297                  "wxid_test123",
298                  "hello",
299                  media_files=[("/tmp/demo.png", False)],
300              )
301          )
302  
303          assert result["success"] is True
304          send_weixin_mock.assert_awaited_once_with(
305              config,
306              "wxid_test123",
307              "hello",
308              media_files=[("/tmp/demo.png", False)],
309          )
310  
311  
312  class TestWeixinChunkDelivery:
313      def _connected_adapter(self) -> WeixinAdapter:
314          adapter = _make_adapter()
315          adapter._session = object()
316          adapter._send_session = adapter._session
317          adapter._token = "test-token"
318          adapter._base_url = "https://weixin.example.com"
319          adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
320          return adapter
321  
322      @patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock)
323      @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
324      def test_send_waits_between_multiple_chunks(self, send_message_mock, sleep_mock):
325          adapter = self._connected_adapter()
326          adapter.MAX_MESSAGE_LENGTH = 12
327  
328          # Use double newlines so _pack_markdown_blocks splits into 3 blocks
329          result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird"))
330  
331          assert result.success is True
332          assert send_message_mock.await_count == 3
333          assert sleep_mock.await_count == 2
334  
335      @patch("gateway.platforms.weixin.asyncio.sleep", new_callable=AsyncMock)
336      @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
337      def test_send_retries_failed_chunk_before_continuing(self, send_message_mock, sleep_mock):
338          adapter = self._connected_adapter()
339          adapter.MAX_MESSAGE_LENGTH = 12
340          calls = {"count": 0}
341  
342          async def flaky_send(*args, **kwargs):
343              calls["count"] += 1
344              if calls["count"] == 2:
345                  raise RuntimeError("temporary iLink failure")
346  
347          send_message_mock.side_effect = flaky_send
348  
349          # Use double newlines so _pack_markdown_blocks splits into 3 blocks
350          result = asyncio.run(adapter.send("wxid_test123", "first\n\nsecond\n\nthird"))
351  
352          assert result.success is True
353          # 3 chunks, but chunk 2 fails once and retries → 4 _send_message calls total
354          assert send_message_mock.await_count == 4
355          # The retried chunk should reuse the same client_id for deduplication
356          first_try = send_message_mock.await_args_list[1].kwargs
357          retry = send_message_mock.await_args_list[2].kwargs
358          assert first_try["text"] == retry["text"]
359          assert first_try["client_id"] == retry["client_id"]
360  
361  
362  class TestWeixinOutboundMedia:
363      def test_send_image_file_accepts_keyword_image_path(self):
364          adapter = _make_adapter()
365          expected = SendResult(success=True, message_id="msg-1")
366          adapter.send_document = AsyncMock(return_value=expected)
367  
368          result = asyncio.run(
369              adapter.send_image_file(
370                  chat_id="wxid_test123",
371                  image_path="/tmp/demo.png",
372                  caption="截图说明",
373                  reply_to="reply-1",
374                  metadata={"thread_id": "t-1"},
375              )
376          )
377  
378          assert result == expected
379          adapter.send_document.assert_awaited_once_with(
380              chat_id="wxid_test123",
381              file_path="/tmp/demo.png",
382              caption="截图说明",
383              metadata={"thread_id": "t-1"},
384          )
385  
386      def test_send_document_accepts_keyword_file_path(self):
387          adapter = _make_adapter()
388          adapter._session = object()
389          adapter._send_session = adapter._session
390          adapter._token = "test-token"
391          adapter._send_file = AsyncMock(return_value="msg-2")
392  
393          result = asyncio.run(
394              adapter.send_document(
395                  chat_id="wxid_test123",
396                  file_path="/tmp/report.pdf",
397                  caption="报告请看",
398                  file_name="renamed.pdf",
399                  reply_to="reply-1",
400                  metadata={"thread_id": "t-1"},
401              )
402          )
403  
404          assert result.success is True
405          assert result.message_id == "msg-2"
406          adapter._send_file.assert_awaited_once_with("wxid_test123", "/tmp/report.pdf", "报告请看")
407  
408      def test_send_file_uses_post_for_upload_full_url_and_hex_encoded_aes_key(self, tmp_path):
409          class _UploadResponse:
410              def __init__(self):
411                  self.status = 200
412                  self.headers = {"x-encrypted-param": "enc-param"}
413  
414              async def __aenter__(self):
415                  return self
416  
417              async def __aexit__(self, exc_type, exc, tb):
418                  return False
419  
420              async def read(self):
421                  return b""
422  
423              async def text(self):
424                  return ""
425  
426          class _RecordingSession:
427              def __init__(self):
428                  self.post_calls = []
429  
430              def post(self, url, **kwargs):
431                  self.post_calls.append((url, kwargs))
432                  return _UploadResponse()
433  
434              def put(self, *_args, **_kwargs):
435                  raise AssertionError("upload_full_url branch should use POST")
436  
437          image_path = tmp_path / "demo.png"
438          image_path.write_bytes(b"fake-png-bytes")
439  
440          adapter = _make_adapter()
441          session = _RecordingSession()
442          adapter._session = session
443          adapter._send_session = session
444          adapter._token = "test-token"
445          adapter._base_url = "https://weixin.example.com"
446          adapter._cdn_base_url = "https://cdn.example.com/c2c"
447          adapter._token_store.get = lambda account_id, chat_id: None
448  
449          aes_key = bytes(range(16))
450          expected_aes_key = base64.b64encode(aes_key.hex().encode("ascii")).decode("ascii")
451  
452          with patch("gateway.platforms.weixin._get_upload_url", new=AsyncMock(return_value={"upload_full_url": "https://upload.example.com/media"})), \
453               patch("gateway.platforms.weixin._api_post", new_callable=AsyncMock) as api_post_mock, \
454               patch("gateway.platforms.weixin.secrets.token_hex", return_value="filekey-123"), \
455               patch("gateway.platforms.weixin.secrets.token_bytes", return_value=aes_key):
456              message_id = asyncio.run(adapter._send_file("wxid_test123", str(image_path), ""))
457  
458          assert message_id.startswith("hermes-weixin-")
459          assert len(session.post_calls) == 1
460          upload_url, upload_kwargs = session.post_calls[0]
461          assert upload_url == "https://upload.example.com/media"
462          assert upload_kwargs["headers"] == {"Content-Type": "application/octet-stream"}
463          assert upload_kwargs["data"]
464          assert upload_kwargs["timeout"].total == 120
465          payload = api_post_mock.await_args.kwargs["payload"]
466          media = payload["msg"]["item_list"][0]["image_item"]["media"]
467          assert media["encrypt_query_param"] == "enc-param"
468          assert media["aes_key"] == expected_aes_key
469  
470  
471  class TestWeixinRemoteMediaSafety:
472      def test_download_remote_media_blocks_unsafe_urls(self):
473          adapter = _make_adapter()
474  
475          with patch("tools.url_safety.is_safe_url", return_value=False):
476              try:
477                  asyncio.run(adapter._download_remote_media("http://127.0.0.1/private.png"))
478              except ValueError as exc:
479                  assert "Blocked unsafe URL" in str(exc)
480              else:
481                  raise AssertionError("expected ValueError for unsafe URL")
482  
483  
484  class TestWeixinMarkdownLinks:
485      """Markdown links should be preserved so WeChat can render them natively."""
486  
487      def test_format_message_preserves_markdown_links(self):
488          adapter = _make_adapter()
489  
490          content = "Check [the docs](https://example.com) and [GitHub](https://github.com) for details"
491          assert adapter.format_message(content) == content
492  
493      def test_format_message_preserves_links_inside_code_blocks(self):
494          adapter = _make_adapter()
495  
496          content = "See below:\n\n```\n[link](https://example.com)\n```\n\nDone."
497          result = adapter.format_message(content)
498          assert "[link](https://example.com)" in result
499  
500  
501  class TestWeixinBlankMessagePrevention:
502      """Regression tests for the blank-bubble bugs.
503  
504      Three separate guards now prevent a blank WeChat message from ever being
505      dispatched:
506  
507      1. ``_split_text_for_weixin_delivery("")`` returns ``[]`` — not ``[""]``.
508      2. ``send()`` filters out empty/whitespace-only chunks before calling
509         ``_send_text_chunk``.
510      3. ``_send_message()`` raises ``ValueError`` for empty text as a last-resort
511         safety net.
512      """
513  
514      def test_split_text_returns_empty_list_for_empty_string(self):
515          adapter = _make_adapter()
516          assert adapter._split_text("") == []
517  
518      def test_split_text_returns_empty_list_for_empty_string_split_per_line(self):
519          adapter = WeixinAdapter(
520              PlatformConfig(
521                  enabled=True,
522                  extra={
523                      "account_id": "acct",
524                      "token": "test-tok",
525                      "split_multiline_messages": True,
526                  },
527              )
528          )
529          assert adapter._split_text("") == []
530  
531      @patch("gateway.platforms.weixin._send_message", new_callable=AsyncMock)
532      def test_send_empty_content_does_not_call_send_message(self, send_message_mock):
533          adapter = _make_adapter()
534          adapter._session = object()
535          adapter._send_session = adapter._session
536          adapter._token = "test-token"
537          adapter._base_url = "https://weixin.example.com"
538          adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
539  
540          result = asyncio.run(adapter.send("wxid_test123", ""))
541          # Empty content → no chunks → no _send_message calls
542          assert result.success is True
543          send_message_mock.assert_not_awaited()
544  
545      def test_send_message_rejects_empty_text(self):
546          """_send_message raises ValueError for empty/whitespace text."""
547          import pytest
548          with pytest.raises(ValueError, match="text must not be empty"):
549              asyncio.run(
550                  weixin._send_message(
551                      AsyncMock(),
552                      base_url="https://example.com",
553                      token="tok",
554                      to="wxid_test",
555                      text="",
556                      context_token=None,
557                      client_id="cid",
558                  )
559              )
560  
561  
562  class TestWeixinStreamingCursorSuppression:
563      """WeChat doesn't support message editing — cursor must be suppressed."""
564  
565      def test_supports_message_editing_is_false(self):
566          adapter = _make_adapter()
567          assert adapter.SUPPORTS_MESSAGE_EDITING is False
568  
569  
570  class TestWeixinMediaBuilder:
571      """Media builder uses base64(hex_key), not base64(raw_bytes) for aes_key."""
572  
573      def test_image_builder_aes_key_is_base64_of_hex(self):
574          import base64
575          adapter = _make_adapter()
576          media_type, builder = adapter._outbound_media_builder("photo.jpg")
577          assert media_type == weixin.MEDIA_IMAGE
578  
579          fake_hex_key = "0123456789abcdef0123456789abcdef"
580          expected_aes = base64.b64encode(fake_hex_key.encode("ascii")).decode("ascii")
581          item = builder(
582              encrypt_query_param="eq",
583              aes_key_for_api=expected_aes,
584              ciphertext_size=1024,
585              plaintext_size=1000,
586              filename="photo.jpg",
587              rawfilemd5="abc123",
588          )
589          assert item["image_item"]["media"]["aes_key"] == expected_aes
590  
591      def test_video_builder_includes_md5(self):
592          adapter = _make_adapter()
593          media_type, builder = adapter._outbound_media_builder("clip.mp4")
594          assert media_type == weixin.MEDIA_VIDEO
595  
596          item = builder(
597              encrypt_query_param="eq",
598              aes_key_for_api="fakekey",
599              ciphertext_size=2048,
600              plaintext_size=2000,
601              filename="clip.mp4",
602              rawfilemd5="deadbeef",
603          )
604          assert item["video_item"]["video_md5"] == "deadbeef"
605  
606      def test_voice_builder_for_audio_files_uses_file_attachment_type(self):
607          adapter = _make_adapter()
608          media_type, builder = adapter._outbound_media_builder("note.mp3")
609          assert media_type == weixin.MEDIA_FILE
610  
611          item = builder(
612              encrypt_query_param="eq",
613              aes_key_for_api="fakekey",
614              ciphertext_size=512,
615              plaintext_size=500,
616              filename="note.mp3",
617              rawfilemd5="abc",
618          )
619          assert item["type"] == weixin.ITEM_FILE
620          assert item["file_item"]["file_name"] == "note.mp3"
621  
622      def test_voice_builder_for_silk_files(self):
623          adapter = _make_adapter()
624          media_type, builder = adapter._outbound_media_builder("recording.silk")
625          assert media_type == weixin.MEDIA_VOICE
626  
627  
628  class TestWeixinSendImageFileParameterName:
629      """Regression test for send_image_file parameter name mismatch.
630  
631      The gateway calls send_image_file(chat_id=..., image_path=...) but the
632      WeixinAdapter previously used 'path' as the parameter name, causing
633      image sending to fail. This test ensures the interface stays correct.
634      """
635  
636      @patch.object(WeixinAdapter, "send_document", new_callable=AsyncMock)
637      def test_send_image_file_uses_image_path_parameter(self, send_document_mock):
638          """Verify send_image_file accepts image_path and forwards to send_document."""
639          adapter = _make_adapter()
640          adapter._session = object()
641          adapter._send_session = adapter._session
642          adapter._token = "test-token"
643  
644          send_document_mock.return_value = weixin.SendResult(success=True, message_id="test-id")
645  
646          # This is the call pattern used by gateway/run.py extract_media
647          result = asyncio.run(
648              adapter.send_image_file(
649                  chat_id="wxid_test123",
650                  image_path="/tmp/test_image.png",
651                  caption="Test caption",
652                  metadata={"thread_id": "thread-123"},
653              )
654          )
655  
656          assert result.success is True
657          send_document_mock.assert_awaited_once_with(
658              chat_id="wxid_test123",
659              file_path="/tmp/test_image.png",
660              caption="Test caption",
661              metadata={"thread_id": "thread-123"},
662          )
663  
664      @patch.object(WeixinAdapter, "send_document", new_callable=AsyncMock)
665      def test_send_image_file_works_without_optional_params(self, send_document_mock):
666          """Verify send_image_file works with minimal required params."""
667          adapter = _make_adapter()
668          adapter._session = object()
669          adapter._send_session = adapter._session
670          adapter._token = "test-token"
671  
672          send_document_mock.return_value = weixin.SendResult(success=True, message_id="test-id")
673  
674          result = asyncio.run(
675              adapter.send_image_file(
676                  chat_id="wxid_test123",
677                  image_path="/tmp/test_image.jpg",
678              )
679          )
680  
681          assert result.success is True
682          send_document_mock.assert_awaited_once_with(
683              chat_id="wxid_test123",
684              file_path="/tmp/test_image.jpg",
685              caption=None,
686              metadata=None,
687          )
688  
689  
690  class TestWeixinVoiceSending:
691      def _connected_adapter(self) -> WeixinAdapter:
692          adapter = _make_adapter()
693          adapter._session = object()
694          adapter._send_session = adapter._session
695          adapter._token = "test-token"
696          adapter._base_url = "https://weixin.example.com"
697          adapter._token_store.get = lambda account_id, chat_id: "ctx-token"
698          return adapter
699  
700      @patch.object(WeixinAdapter, "_send_file", new_callable=AsyncMock)
701      def test_send_voice_downgrades_to_document_attachment(self, send_file_mock, tmp_path):
702          adapter = self._connected_adapter()
703          source = tmp_path / "voice.ogg"
704          source.write_bytes(b"ogg")
705          send_file_mock.return_value = "msg-1"
706  
707          result = asyncio.run(adapter.send_voice("wxid_test123", str(source)))
708  
709          assert result.success is True
710          send_file_mock.assert_awaited_once_with(
711              "wxid_test123",
712              str(source),
713              "[voice message as attachment]",
714              force_file_attachment=True,
715          )
716  
717      def test_voice_builder_for_silk_files_can_be_forced_to_file_attachment(self):
718          adapter = _make_adapter()
719          media_type, builder = adapter._outbound_media_builder(
720              "recording.silk",
721              force_file_attachment=True,
722          )
723          assert media_type == weixin.MEDIA_FILE
724  
725          item = builder(
726              encrypt_query_param="eq",
727              aes_key_for_api="fakekey",
728              ciphertext_size=512,
729              plaintext_size=500,
730              filename="recording.silk",
731              rawfilemd5="abc",
732          )
733          assert item["type"] == weixin.ITEM_FILE
734          assert item["file_item"]["file_name"] == "recording.silk"
735  
736      @patch.object(weixin, "_api_post", new_callable=AsyncMock)
737      @patch.object(weixin, "_upload_ciphertext", new_callable=AsyncMock)
738      @patch.object(weixin, "_get_upload_url", new_callable=AsyncMock)
739      def test_send_file_sets_voice_metadata_for_silk_payload(
740          self,
741          get_upload_url_mock,
742          upload_ciphertext_mock,
743          api_post_mock,
744          tmp_path,
745      ):
746          adapter = self._connected_adapter()
747          silk = tmp_path / "voice.silk"
748          silk.write_bytes(b"\x02#!SILK_V3\x01\x00")
749          get_upload_url_mock.return_value = {"upload_full_url": "https://cdn.example.com/upload"}
750          upload_ciphertext_mock.return_value = "enc-q"
751          api_post_mock.return_value = {"success": True}
752  
753          asyncio.run(adapter._send_file("wxid_test123", str(silk), ""))
754  
755          payload = api_post_mock.await_args.kwargs["payload"]
756          voice_item = payload["msg"]["item_list"][0]["voice_item"]
757          assert voice_item.get("playtime", 0) == 0
758          assert voice_item["encode_type"] == 6
759          assert voice_item["sample_rate"] == 24000
760          assert voice_item["bits_per_sample"] == 16
761  
762  
763  class TestIsStaleSessionRet:
764      """Regression test for #17228: distinguish stale-session ret=-2 from rate-limit ret=-2."""
765  
766      def test_ret_minus_2_with_unknown_error_is_stale(self):
767          assert weixin._is_stale_session_ret(-2, None, "unknown error") is True
768  
769      def test_errcode_minus_2_with_unknown_error_is_stale(self):
770          assert weixin._is_stale_session_ret(None, -2, "unknown error") is True
771  
772      def test_unknown_error_case_insensitive(self):
773          assert weixin._is_stale_session_ret(-2, None, "Unknown Error") is True
774  
775      def test_ret_minus_2_with_freq_limit_is_not_stale(self):
776          # Genuine rate limit — must NOT be treated as stale session.
777          assert weixin._is_stale_session_ret(-2, None, "freq limit") is False
778  
779      def test_ret_minus_2_with_no_errmsg_is_not_stale(self):
780          assert weixin._is_stale_session_ret(-2, None, None) is False
781          assert weixin._is_stale_session_ret(-2, None, "") is False
782  
783      def test_errcode_minus_14_is_not_matched_here(self):
784          # -14 is handled by the separate SESSION_EXPIRED_ERRCODE path; the
785          # helper only disambiguates -2 from a genuine rate limit.
786          assert weixin._is_stale_session_ret(-14, None, "session expired") is False
787  
788      def test_success_codes_are_not_stale(self):
789          assert weixin._is_stale_session_ret(0, 0, "") is False
790          assert weixin._is_stale_session_ret(None, None, "unknown error") is False
791  
792  
793  class TestWeixinContentDedup:
794      """Regression tests for Issue #16182 — upstream API sends duplicate content
795      with different message_ids, bypassing message_id deduplication.
796      """
797  
798      def test_duplicate_content_with_different_message_ids_is_dropped(self):
799          adapter = _make_adapter()
800          adapter._poll_session = object()
801          adapter.handle_message = AsyncMock()
802  
803          base_msg = {
804              "from_user_id": "wxid_user1",
805              "item_list": [{"type": 1, "text_item": {"text": "hello world"}}],
806          }
807  
808          asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-1"}))
809          asyncio.run(adapter._process_message({**base_msg, "message_id": "msg-2"}))
810  
811          assert adapter.handle_message.await_count == 1
812          event = adapter.handle_message.await_args[0][0]
813          assert event.text == "hello world"
814  
815      def test_content_dedup_not_called_for_messages_without_text(self):
816          adapter = _make_adapter()
817          adapter._poll_session = object()
818          adapter.handle_message = AsyncMock()
819          adapter._dedup.is_duplicate = Mock(return_value=False)
820  
821          empty_msg = {
822              "from_user_id": "wxid_user1",
823              "message_id": "msg-1",
824              "item_list": [],
825          }
826          asyncio.run(adapter._process_message(empty_msg))
827  
828          assert adapter.handle_message.await_count == 0
829          # is_duplicate should only be called for message_id, never for content
830          assert all("content:" not in str(call) for call in adapter._dedup.is_duplicate.call_args_list)