/ tests / gateway / test_wecom.py
test_wecom.py
  1  """Tests for the WeCom platform adapter."""
  2  
  3  import base64
  4  import os
  5  from pathlib import Path
  6  from types import SimpleNamespace
  7  from unittest.mock import AsyncMock, patch
  8  
  9  import pytest
 10  
 11  from gateway.config import Platform, PlatformConfig
 12  from gateway.platforms.base import SendResult
 13  
 14  
 15  class TestWeComRequirements:
 16      def test_returns_false_without_aiohttp(self, monkeypatch):
 17          monkeypatch.setattr("gateway.platforms.wecom.AIOHTTP_AVAILABLE", False)
 18          monkeypatch.setattr("gateway.platforms.wecom.HTTPX_AVAILABLE", True)
 19          from gateway.platforms.wecom import check_wecom_requirements
 20  
 21          assert check_wecom_requirements() is False
 22  
 23      def test_returns_false_without_httpx(self, monkeypatch):
 24          monkeypatch.setattr("gateway.platforms.wecom.AIOHTTP_AVAILABLE", True)
 25          monkeypatch.setattr("gateway.platforms.wecom.HTTPX_AVAILABLE", False)
 26          from gateway.platforms.wecom import check_wecom_requirements
 27  
 28          assert check_wecom_requirements() is False
 29  
 30      def test_returns_true_when_available(self, monkeypatch):
 31          monkeypatch.setattr("gateway.platforms.wecom.AIOHTTP_AVAILABLE", True)
 32          monkeypatch.setattr("gateway.platforms.wecom.HTTPX_AVAILABLE", True)
 33          from gateway.platforms.wecom import check_wecom_requirements
 34  
 35          assert check_wecom_requirements() is True
 36  
 37  
 38  class TestWeComAdapterInit:
 39      def test_declares_non_editable_message_capability(self):
 40          from gateway.platforms.wecom import WeComAdapter
 41  
 42          assert WeComAdapter.SUPPORTS_MESSAGE_EDITING is False
 43  
 44      def test_reads_config_from_extra(self):
 45          from gateway.platforms.wecom import WeComAdapter
 46  
 47          config = PlatformConfig(
 48              enabled=True,
 49              extra={
 50                  "bot_id": "cfg-bot",
 51                  "secret": "cfg-secret",
 52                  "websocket_url": "wss://custom.wecom.example/ws",
 53                  "group_policy": "allowlist",
 54                  "group_allow_from": ["group-1"],
 55              },
 56          )
 57          adapter = WeComAdapter(config)
 58  
 59          assert adapter._bot_id == "cfg-bot"
 60          assert adapter._secret == "cfg-secret"
 61          assert adapter._ws_url == "wss://custom.wecom.example/ws"
 62          assert adapter._group_policy == "allowlist"
 63          assert adapter._group_allow_from == ["group-1"]
 64  
 65      def test_falls_back_to_env_vars(self, monkeypatch):
 66          monkeypatch.setenv("WECOM_BOT_ID", "env-bot")
 67          monkeypatch.setenv("WECOM_SECRET", "env-secret")
 68          monkeypatch.setenv("WECOM_WEBSOCKET_URL", "wss://env.example/ws")
 69          from gateway.platforms.wecom import WeComAdapter
 70  
 71          adapter = WeComAdapter(PlatformConfig(enabled=True))
 72          assert adapter._bot_id == "env-bot"
 73          assert adapter._secret == "env-secret"
 74          assert adapter._ws_url == "wss://env.example/ws"
 75  
 76  
 77  class TestWeComConnect:
 78      @pytest.mark.asyncio
 79      async def test_connect_records_missing_credentials(self, monkeypatch):
 80          import gateway.platforms.wecom as wecom_module
 81          from gateway.platforms.wecom import WeComAdapter
 82  
 83          monkeypatch.setattr(wecom_module, "AIOHTTP_AVAILABLE", True)
 84          monkeypatch.setattr(wecom_module, "HTTPX_AVAILABLE", True)
 85  
 86          adapter = WeComAdapter(PlatformConfig(enabled=True))
 87  
 88          success = await adapter.connect()
 89  
 90          assert success is False
 91          assert adapter.has_fatal_error is True
 92          assert adapter.fatal_error_code == "wecom_missing_credentials"
 93          assert "WECOM_BOT_ID" in (adapter.fatal_error_message or "")
 94  
 95      @pytest.mark.asyncio
 96      async def test_connect_records_handshake_failure_details(self, monkeypatch):
 97          import gateway.platforms.wecom as wecom_module
 98          from gateway.platforms.wecom import WeComAdapter
 99  
100          class DummyClient:
101              async def aclose(self):
102                  return None
103  
104          monkeypatch.setattr(wecom_module, "AIOHTTP_AVAILABLE", True)
105          monkeypatch.setattr(wecom_module, "HTTPX_AVAILABLE", True)
106          monkeypatch.setattr(
107              wecom_module,
108              "httpx",
109              SimpleNamespace(AsyncClient=lambda **kwargs: DummyClient()),
110          )
111  
112          adapter = WeComAdapter(
113              PlatformConfig(enabled=True, extra={"bot_id": "bot-1", "secret": "secret-1"})
114          )
115          adapter._open_connection = AsyncMock(side_effect=RuntimeError("invalid secret (errcode=40013)"))
116  
117          success = await adapter.connect()
118  
119          assert success is False
120          assert adapter.has_fatal_error is True
121          assert adapter.fatal_error_code == "wecom_connect_error"
122          assert "invalid secret" in (adapter.fatal_error_message or "")
123  
124  
125  class TestWeComReplyMode:
126      @pytest.mark.asyncio
127      async def test_send_uses_passive_reply_markdown_when_reply_context_exists(self):
128          from gateway.platforms.wecom import WeComAdapter
129  
130          adapter = WeComAdapter(PlatformConfig(enabled=True))
131          adapter._reply_req_ids["msg-1"] = "req-1"
132          adapter._send_reply_request = AsyncMock(
133              return_value={"headers": {"req_id": "req-1"}, "errcode": 0}
134          )
135  
136          result = await adapter.send("chat-123", "hello from reply", reply_to="msg-1")
137  
138          assert result.success is True
139          adapter._send_reply_request.assert_awaited_once()
140          args = adapter._send_reply_request.await_args.args
141          assert args[0] == "req-1"
142          # msgtype: stream triggers WeCom errcode 600039 on many mobile clients
143          # (unsupported type). Markdown renders everywhere.
144          assert args[1]["msgtype"] == "markdown"
145          assert args[1]["markdown"]["content"] == "hello from reply"
146  
147      @pytest.mark.asyncio
148      async def test_send_image_file_uses_passive_reply_media_when_reply_context_exists(self):
149          from gateway.platforms.wecom import WeComAdapter
150  
151          adapter = WeComAdapter(PlatformConfig(enabled=True))
152          adapter._reply_req_ids["msg-1"] = "req-1"
153          adapter._prepare_outbound_media = AsyncMock(
154              return_value={
155                  "data": b"image-bytes",
156                  "content_type": "image/png",
157                  "file_name": "demo.png",
158                  "detected_type": "image",
159                  "final_type": "image",
160                  "rejected": False,
161                  "reject_reason": None,
162                  "downgraded": False,
163                  "downgrade_note": None,
164              }
165          )
166          adapter._upload_media_bytes = AsyncMock(return_value={"media_id": "media-1", "type": "image"})
167          adapter._send_reply_request = AsyncMock(
168              return_value={"headers": {"req_id": "req-1"}, "errcode": 0}
169          )
170  
171          result = await adapter.send_image_file("chat-123", "/tmp/demo.png", reply_to="msg-1")
172  
173          assert result.success is True
174          adapter._send_reply_request.assert_awaited_once()
175          args = adapter._send_reply_request.await_args.args
176          assert args[0] == "req-1"
177          assert args[1] == {"msgtype": "image", "image": {"media_id": "media-1"}}
178  
179  
180  class TestExtractText:
181      def test_extracts_plain_text(self):
182          from gateway.platforms.wecom import WeComAdapter
183  
184          body = {
185              "msgtype": "text",
186              "text": {"content": "  hello world  "},
187          }
188          text, reply_text = WeComAdapter._extract_text(body)
189          assert text == "hello world"
190          assert reply_text is None
191  
192      def test_extracts_mixed_text(self):
193          from gateway.platforms.wecom import WeComAdapter
194  
195          body = {
196              "msgtype": "mixed",
197              "mixed": {
198                  "msg_item": [
199                      {"msgtype": "text", "text": {"content": "part1"}},
200                      {"msgtype": "image", "image": {"url": "https://example.com/x.png"}},
201                      {"msgtype": "text", "text": {"content": "part2"}},
202                  ]
203              },
204          }
205          text, _reply_text = WeComAdapter._extract_text(body)
206          assert text == "part1\npart2"
207  
208      def test_extracts_voice_and_quote(self):
209          from gateway.platforms.wecom import WeComAdapter
210  
211          body = {
212              "msgtype": "voice",
213              "voice": {"content": "spoken text"},
214              "quote": {"msgtype": "text", "text": {"content": "quoted"}},
215          }
216          text, reply_text = WeComAdapter._extract_text(body)
217          assert text == "spoken text"
218          assert reply_text == "quoted"
219  
220  
221  class TestCallbackDispatch:
222      @pytest.mark.asyncio
223      @pytest.mark.parametrize("cmd", ["aibot_msg_callback", "aibot_callback"])
224      async def test_dispatch_accepts_new_and_legacy_callback_cmds(self, cmd):
225          from gateway.platforms.wecom import WeComAdapter
226  
227          adapter = WeComAdapter(PlatformConfig(enabled=True))
228          adapter._on_message = AsyncMock()
229  
230          await adapter._dispatch_payload({"cmd": cmd, "headers": {"req_id": "req-1"}, "body": {}})
231  
232          adapter._on_message.assert_awaited_once()
233  
234  
235  class TestPolicyHelpers:
236      def test_dm_allowlist(self):
237          from gateway.platforms.wecom import WeComAdapter
238  
239          adapter = WeComAdapter(
240              PlatformConfig(enabled=True, extra={"dm_policy": "allowlist", "allow_from": ["user-1"]})
241          )
242          assert adapter._is_dm_allowed("user-1") is True
243          assert adapter._is_dm_allowed("user-2") is False
244  
245      def test_group_allowlist_and_per_group_sender_allowlist(self):
246          from gateway.platforms.wecom import WeComAdapter
247  
248          adapter = WeComAdapter(
249              PlatformConfig(
250                  enabled=True,
251                  extra={
252                      "group_policy": "allowlist",
253                      "group_allow_from": ["group-1"],
254                      "groups": {"group-1": {"allow_from": ["user-1"]}},
255                  },
256              )
257          )
258  
259          assert adapter._is_group_allowed("group-1", "user-1") is True
260          assert adapter._is_group_allowed("group-1", "user-2") is False
261          assert adapter._is_group_allowed("group-2", "user-1") is False
262  
263  
264  class TestMediaHelpers:
265      def test_detect_wecom_media_type(self):
266          from gateway.platforms.wecom import WeComAdapter
267  
268          assert WeComAdapter._detect_wecom_media_type("image/png") == "image"
269          assert WeComAdapter._detect_wecom_media_type("video/mp4") == "video"
270          assert WeComAdapter._detect_wecom_media_type("audio/amr") == "voice"
271          assert WeComAdapter._detect_wecom_media_type("application/pdf") == "file"
272  
273      def test_voice_non_amr_downgrades_to_file(self):
274          from gateway.platforms.wecom import WeComAdapter
275  
276          result = WeComAdapter._apply_file_size_limits(128, "voice", "audio/mpeg")
277  
278          assert result["final_type"] == "file"
279          assert result["downgraded"] is True
280          assert "AMR" in (result["downgrade_note"] or "")
281  
282      def test_oversized_file_is_rejected(self):
283          from gateway.platforms.wecom import ABSOLUTE_MAX_BYTES, WeComAdapter
284  
285          result = WeComAdapter._apply_file_size_limits(ABSOLUTE_MAX_BYTES + 1, "file", "application/pdf")
286  
287          assert result["rejected"] is True
288          assert "20MB" in (result["reject_reason"] or "")
289  
290      def test_decrypt_file_bytes_round_trip(self):
291          from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
292          from gateway.platforms.wecom import WeComAdapter
293  
294          plaintext = b"wecom-secret"
295          key = os.urandom(32)
296          pad_len = 32 - (len(plaintext) % 32)
297          padded = plaintext + bytes([pad_len]) * pad_len
298          encryptor = Cipher(algorithms.AES(key), modes.CBC(key[:16])).encryptor()
299          encrypted = encryptor.update(padded) + encryptor.finalize()
300  
301          decrypted = WeComAdapter._decrypt_file_bytes(encrypted, base64.b64encode(key).decode("ascii"))
302  
303          assert decrypted == plaintext
304  
305      @pytest.mark.asyncio
306      async def test_load_outbound_media_rejects_placeholder_path(self):
307          from gateway.platforms.wecom import WeComAdapter
308  
309          adapter = WeComAdapter(PlatformConfig(enabled=True))
310  
311          with pytest.raises(ValueError, match="placeholder was not replaced"):
312              await adapter._load_outbound_media("<path>")
313  
314  
315  class TestMediaUpload:
316      @pytest.mark.asyncio
317      async def test_upload_media_bytes_uses_sdk_sequence(self, monkeypatch):
318          import gateway.platforms.wecom as wecom_module
319          from gateway.platforms.wecom import (
320              APP_CMD_UPLOAD_MEDIA_CHUNK,
321              APP_CMD_UPLOAD_MEDIA_FINISH,
322              APP_CMD_UPLOAD_MEDIA_INIT,
323              WeComAdapter,
324          )
325  
326          adapter = WeComAdapter(PlatformConfig(enabled=True))
327          calls = []
328  
329          async def fake_send_request(cmd, body, timeout=0):
330              calls.append((cmd, body))
331              if cmd == APP_CMD_UPLOAD_MEDIA_INIT:
332                  return {"errcode": 0, "body": {"upload_id": "upload-1"}}
333              if cmd == APP_CMD_UPLOAD_MEDIA_CHUNK:
334                  return {"errcode": 0}
335              if cmd == APP_CMD_UPLOAD_MEDIA_FINISH:
336                  return {
337                      "errcode": 0,
338                      "body": {
339                          "media_id": "media-1",
340                          "type": "file",
341                          "created_at": "2026-03-18T00:00:00Z",
342                      },
343                  }
344              raise AssertionError(f"unexpected cmd {cmd}")
345  
346          monkeypatch.setattr(wecom_module, "UPLOAD_CHUNK_SIZE", 4)
347          adapter._send_request = fake_send_request
348  
349          result = await adapter._upload_media_bytes(b"abcdefghij", "file", "demo.bin")
350  
351          assert result["media_id"] == "media-1"
352          assert [cmd for cmd, _body in calls] == [
353              APP_CMD_UPLOAD_MEDIA_INIT,
354              APP_CMD_UPLOAD_MEDIA_CHUNK,
355              APP_CMD_UPLOAD_MEDIA_CHUNK,
356              APP_CMD_UPLOAD_MEDIA_CHUNK,
357              APP_CMD_UPLOAD_MEDIA_FINISH,
358          ]
359          assert calls[1][1]["chunk_index"] == 0
360          assert calls[2][1]["chunk_index"] == 1
361          assert calls[3][1]["chunk_index"] == 2
362  
363      @pytest.mark.asyncio
364      @patch("tools.url_safety.is_safe_url", return_value=True)
365      async def test_download_remote_bytes_rejects_large_content_length(self, _mock_safe):
366          from gateway.platforms.wecom import WeComAdapter
367  
368          class FakeResponse:
369              headers = {"content-length": "10"}
370  
371              async def __aenter__(self):
372                  return self
373  
374              async def __aexit__(self, exc_type, exc, tb):
375                  return None
376  
377              def raise_for_status(self):
378                  return None
379  
380              async def aiter_bytes(self):
381                  yield b"abc"
382  
383          class FakeClient:
384              def stream(self, method, url, headers=None):
385                  return FakeResponse()
386  
387          adapter = WeComAdapter(PlatformConfig(enabled=True))
388          adapter._http_client = FakeClient()
389  
390          with pytest.raises(ValueError, match="exceeds WeCom limit"):
391              await adapter._download_remote_bytes("https://example.com/file.bin", max_bytes=4)
392  
393      @pytest.mark.asyncio
394      async def test_cache_media_decrypts_url_payload_before_writing(self):
395          from gateway.platforms.wecom import WeComAdapter
396  
397          adapter = WeComAdapter(PlatformConfig(enabled=True))
398          plaintext = b"secret document bytes"
399          key = os.urandom(32)
400          pad_len = 32 - (len(plaintext) % 32)
401          padded = plaintext + bytes([pad_len]) * pad_len
402  
403          from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
404  
405          encryptor = Cipher(algorithms.AES(key), modes.CBC(key[:16])).encryptor()
406          encrypted = encryptor.update(padded) + encryptor.finalize()
407          adapter._download_remote_bytes = AsyncMock(
408              return_value=(
409                  encrypted,
410                  {
411                      "content-type": "application/octet-stream",
412                      "content-disposition": 'attachment; filename="secret.bin"',
413                  },
414              )
415          )
416  
417          cached = await adapter._cache_media(
418              "file",
419              {
420                  "url": "https://example.com/secret.bin",
421                  "aeskey": base64.b64encode(key).decode("ascii"),
422              },
423          )
424  
425          assert cached is not None
426          cached_path, content_type = cached
427          assert Path(cached_path).read_bytes() == plaintext
428          assert content_type == "application/octet-stream"
429  
430  
431  class TestSend:
432      @pytest.mark.asyncio
433      async def test_send_uses_proactive_payload(self):
434          from gateway.platforms.wecom import APP_CMD_SEND, WeComAdapter
435  
436          adapter = WeComAdapter(PlatformConfig(enabled=True))
437          adapter._send_request = AsyncMock(return_value={"headers": {"req_id": "req-1"}, "errcode": 0})
438  
439          result = await adapter.send("chat-123", "Hello WeCom")
440  
441          assert result.success is True
442          adapter._send_request.assert_awaited_once_with(
443              APP_CMD_SEND,
444              {
445                  "chatid": "chat-123",
446                  "msgtype": "markdown",
447                  "markdown": {"content": "Hello WeCom"},
448              },
449          )
450  
451      @pytest.mark.asyncio
452      async def test_send_reports_wecom_errors(self):
453          from gateway.platforms.wecom import WeComAdapter
454  
455          adapter = WeComAdapter(PlatformConfig(enabled=True))
456          adapter._send_request = AsyncMock(return_value={"errcode": 40001, "errmsg": "bad request"})
457  
458          result = await adapter.send("chat-123", "Hello WeCom")
459  
460          assert result.success is False
461          assert "40001" in (result.error or "")
462  
463      @pytest.mark.asyncio
464      async def test_send_image_falls_back_to_text_for_remote_url(self):
465          from gateway.platforms.wecom import WeComAdapter
466  
467          adapter = WeComAdapter(PlatformConfig(enabled=True))
468          adapter._send_media_source = AsyncMock(return_value=SendResult(success=False, error="upload failed"))
469          adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="msg-1"))
470  
471          result = await adapter.send_image("chat-123", "https://example.com/demo.png", caption="demo")
472  
473          assert result.success is True
474          adapter.send.assert_awaited_once_with(chat_id="chat-123", content="demo\nhttps://example.com/demo.png", reply_to=None)
475  
476      @pytest.mark.asyncio
477      async def test_send_voice_sends_caption_and_downgrade_note(self):
478          from gateway.platforms.wecom import WeComAdapter
479  
480          adapter = WeComAdapter(PlatformConfig(enabled=True))
481          adapter._prepare_outbound_media = AsyncMock(
482              return_value={
483                  "data": b"voice-bytes",
484                  "content_type": "audio/mpeg",
485                  "file_name": "voice.mp3",
486                  "detected_type": "voice",
487                  "final_type": "file",
488                  "rejected": False,
489                  "reject_reason": None,
490                  "downgraded": True,
491                  "downgrade_note": "语音格式 audio/mpeg 不支持,企微仅支持 AMR 格式,已转为文件格式发送",
492              }
493          )
494          adapter._upload_media_bytes = AsyncMock(return_value={"media_id": "media-1", "type": "file"})
495          adapter._send_media_message = AsyncMock(return_value={"headers": {"req_id": "req-media"}, "errcode": 0})
496          adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="msg-1"))
497  
498          result = await adapter.send_voice("chat-123", "/tmp/voice.mp3", caption="listen")
499  
500          assert result.success is True
501          adapter._send_media_message.assert_awaited_once_with("chat-123", "file", "media-1")
502          assert adapter.send.await_count == 2
503          adapter.send.assert_any_await(chat_id="chat-123", content="listen", reply_to=None)
504          adapter.send.assert_any_await(
505              chat_id="chat-123",
506              content="ℹ️ 语音格式 audio/mpeg 不支持,企微仅支持 AMR 格式,已转为文件格式发送",
507              reply_to=None,
508          )
509  
510  
511  class TestInboundMessages:
512      @pytest.mark.asyncio
513      async def test_on_message_builds_event(self):
514          from gateway.platforms.wecom import WeComAdapter
515  
516          adapter = WeComAdapter(PlatformConfig(enabled=True))
517          adapter._text_batch_delay_seconds = 0  # disable batching for tests
518          adapter.handle_message = AsyncMock()
519          adapter._extract_media = AsyncMock(return_value=(["/tmp/test.png"], ["image/png"]))
520  
521          payload = {
522              "cmd": "aibot_msg_callback",
523              "headers": {"req_id": "req-1"},
524              "body": {
525                  "msgid": "msg-1",
526                  "chatid": "group-1",
527                  "chattype": "group",
528                  "from": {"userid": "user-1"},
529                  "msgtype": "text",
530                  "text": {"content": "hello"},
531              },
532          }
533  
534          await adapter._on_message(payload)
535  
536          adapter.handle_message.assert_awaited_once()
537          event = adapter.handle_message.await_args.args[0]
538          assert event.text == "hello"
539          assert event.source.chat_id == "group-1"
540          assert event.source.user_id == "user-1"
541          assert event.media_urls == ["/tmp/test.png"]
542          assert event.media_types == ["image/png"]
543  
544      @pytest.mark.asyncio
545      async def test_on_message_preserves_quote_context(self):
546          from gateway.platforms.wecom import WeComAdapter
547  
548          adapter = WeComAdapter(PlatformConfig(enabled=True))
549          adapter._text_batch_delay_seconds = 0  # disable batching for tests
550          adapter.handle_message = AsyncMock()
551          adapter._extract_media = AsyncMock(return_value=([], []))
552  
553          payload = {
554              "cmd": "aibot_msg_callback",
555              "headers": {"req_id": "req-1"},
556              "body": {
557                  "msgid": "msg-1",
558                  "chatid": "group-1",
559                  "chattype": "group",
560                  "from": {"userid": "user-1"},
561                  "msgtype": "text",
562                  "text": {"content": "follow up"},
563                  "quote": {"msgtype": "text", "text": {"content": "quoted message"}},
564              },
565          }
566  
567          await adapter._on_message(payload)
568  
569          event = adapter.handle_message.await_args.args[0]
570          assert event.reply_to_text == "quoted message"
571          assert event.reply_to_message_id == "quote:msg-1"
572  
573      @pytest.mark.asyncio
574      async def test_on_message_respects_group_policy(self):
575          from gateway.platforms.wecom import WeComAdapter
576  
577          adapter = WeComAdapter(
578              PlatformConfig(
579                  enabled=True,
580                  extra={"group_policy": "allowlist", "group_allow_from": ["group-allowed"]},
581              )
582          )
583          adapter.handle_message = AsyncMock()
584          adapter._extract_media = AsyncMock(return_value=([], []))
585  
586          payload = {
587              "cmd": "aibot_callback",
588              "headers": {"req_id": "req-1"},
589              "body": {
590                  "msgid": "msg-1",
591                  "chatid": "group-blocked",
592                  "chattype": "group",
593                  "from": {"userid": "user-1"},
594                  "msgtype": "text",
595                  "text": {"content": "hello"},
596              },
597          }
598  
599          await adapter._on_message(payload)
600          adapter.handle_message.assert_not_awaited()
601  
602  
603  class TestWeComZombieSessionFix:
604      """Tests for PR #11572 — device_id, markdown reply, group req_id fallback."""
605  
606      def test_adapter_generates_stable_device_id_per_instance(self):
607          from gateway.platforms.wecom import WeComAdapter
608  
609          adapter = WeComAdapter(PlatformConfig(enabled=True))
610          assert isinstance(adapter._device_id, str)
611          assert len(adapter._device_id) > 0
612          # Second snapshot on the same adapter must be identical — only a fresh
613          # adapter instance should get a new device_id (one-per-reconnect is the
614          # zombie-session footgun we're fixing).
615          assert adapter._device_id == adapter._device_id
616  
617      def test_different_adapter_instances_get_distinct_device_ids(self):
618          from gateway.platforms.wecom import WeComAdapter
619  
620          a = WeComAdapter(PlatformConfig(enabled=True))
621          b = WeComAdapter(PlatformConfig(enabled=True))
622          assert a._device_id != b._device_id
623  
624      @pytest.mark.asyncio
625      async def test_open_connection_includes_device_id_in_subscribe(self):
626          from gateway.platforms.wecom import APP_CMD_SUBSCRIBE, WeComAdapter
627  
628          adapter = WeComAdapter(PlatformConfig(enabled=True))
629          adapter._bot_id = "test-bot"
630          adapter._secret = "test-secret"
631  
632          sent_payloads = []
633  
634          class _FakeWS:
635              closed = False
636  
637              async def send_json(self, payload):
638                  sent_payloads.append(payload)
639  
640              async def close(self):
641                  return None
642  
643          class _FakeSession:
644              def __init__(self, *args, **kwargs):
645                  pass
646  
647              async def ws_connect(self, *args, **kwargs):
648                  return _FakeWS()
649  
650              async def close(self):
651                  return None
652  
653          async def _fake_cleanup():
654              return None
655  
656          async def _fake_handshake(req_id):
657              return {"errcode": 0, "headers": {"req_id": req_id}}
658  
659          adapter._cleanup_ws = _fake_cleanup
660          adapter._wait_for_handshake = _fake_handshake
661  
662          with patch("gateway.platforms.wecom.aiohttp.ClientSession", _FakeSession):
663              await adapter._open_connection()
664  
665          assert len(sent_payloads) == 1
666          subscribe = sent_payloads[0]
667          assert subscribe["cmd"] == APP_CMD_SUBSCRIBE
668          assert subscribe["body"]["bot_id"] == "test-bot"
669          assert subscribe["body"]["secret"] == "test-secret"
670          assert subscribe["body"]["device_id"] == adapter._device_id
671  
672      @pytest.mark.asyncio
673      async def test_on_message_caches_last_req_id_per_chat(self):
674          from gateway.platforms.wecom import WeComAdapter
675  
676          adapter = WeComAdapter(PlatformConfig(enabled=True))
677          adapter._text_batch_delay_seconds = 0
678          adapter.handle_message = AsyncMock()
679          adapter._extract_media = AsyncMock(return_value=([], []))
680  
681          payload = {
682              "cmd": "aibot_msg_callback",
683              "headers": {"req_id": "req-abc"},
684              "body": {
685                  "msgid": "msg-1",
686                  "chatid": "group-1",
687                  "chattype": "group",
688                  "from": {"userid": "user-1"},
689                  "msgtype": "text",
690                  "text": {"content": "hi"},
691              },
692          }
693  
694          await adapter._on_message(payload)
695          assert adapter._last_chat_req_ids["group-1"] == "req-abc"
696  
697      @pytest.mark.asyncio
698      async def test_on_message_does_not_cache_blocked_sender_req_id(self):
699          """Blocked chats shouldn't populate the proactive-send fallback cache."""
700          from gateway.platforms.wecom import WeComAdapter
701  
702          adapter = WeComAdapter(
703              PlatformConfig(
704                  enabled=True,
705                  extra={"group_policy": "allowlist", "group_allow_from": ["group-ok"]},
706              )
707          )
708          adapter.handle_message = AsyncMock()
709          adapter._extract_media = AsyncMock(return_value=([], []))
710  
711          payload = {
712              "cmd": "aibot_msg_callback",
713              "headers": {"req_id": "req-abc"},
714              "body": {
715                  "msgid": "msg-1",
716                  "chatid": "group-blocked",
717                  "chattype": "group",
718                  "from": {"userid": "user-1"},
719                  "msgtype": "text",
720                  "text": {"content": "hi"},
721              },
722          }
723  
724          await adapter._on_message(payload)
725          adapter.handle_message.assert_not_awaited()
726          assert "group-blocked" not in adapter._last_chat_req_ids
727  
728      def test_remember_chat_req_id_is_bounded(self):
729          from gateway.platforms.wecom import DEDUP_MAX_SIZE, WeComAdapter
730  
731          adapter = WeComAdapter(PlatformConfig(enabled=True))
732          for i in range(DEDUP_MAX_SIZE + 50):
733              adapter._remember_chat_req_id(f"chat-{i}", f"req-{i}")
734          assert len(adapter._last_chat_req_ids) <= DEDUP_MAX_SIZE
735          # The most recently remembered chat must still be present.
736          latest = f"chat-{DEDUP_MAX_SIZE + 49}"
737          assert adapter._last_chat_req_ids[latest] == f"req-{DEDUP_MAX_SIZE + 49}"
738  
739      def test_remember_chat_req_id_ignores_empty_values(self):
740          from gateway.platforms.wecom import WeComAdapter
741  
742          adapter = WeComAdapter(PlatformConfig(enabled=True))
743          adapter._remember_chat_req_id("", "req-1")
744          adapter._remember_chat_req_id("chat-1", "")
745          adapter._remember_chat_req_id("   ", "   ")
746          assert adapter._last_chat_req_ids == {}
747  
748      @pytest.mark.asyncio
749      async def test_proactive_group_send_falls_back_to_cached_req_id(self):
750          """Sending into a group without reply_to should use the last cached
751          req_id via APP_CMD_RESPONSE — WeCom AI Bots cannot initiate APP_CMD_SEND
752          in group chats (errcode 600039)."""
753          from gateway.platforms.wecom import WeComAdapter
754  
755          adapter = WeComAdapter(PlatformConfig(enabled=True))
756          adapter._last_chat_req_ids["group-1"] = "inbound-req-42"
757          adapter._send_reply_request = AsyncMock(
758              return_value={"headers": {"req_id": "inbound-req-42"}, "errcode": 0}
759          )
760          adapter._send_request = AsyncMock(
761              return_value={"headers": {"req_id": "new"}, "errcode": 0}
762          )
763  
764          result = await adapter.send("group-1", "ping", reply_to=None)
765  
766          assert result.success is True
767          # Must route through reply (APP_CMD_RESPONSE), not proactive send.
768          adapter._send_reply_request.assert_awaited_once()
769          adapter._send_request.assert_not_awaited()
770          args = adapter._send_reply_request.await_args.args
771          assert args[0] == "inbound-req-42"
772          assert args[1]["msgtype"] == "markdown"
773          assert args[1]["markdown"]["content"] == "ping"
774  
775      @pytest.mark.asyncio
776      async def test_proactive_send_without_cached_req_id_uses_app_cmd_send(self):
777          """When we have no prior req_id (fresh DM target), APP_CMD_SEND is used."""
778          from gateway.platforms.wecom import APP_CMD_SEND, WeComAdapter
779  
780          adapter = WeComAdapter(PlatformConfig(enabled=True))
781          adapter._send_request = AsyncMock(
782              return_value={"headers": {"req_id": "new"}, "errcode": 0}
783          )
784  
785          result = await adapter.send("fresh-dm-chat", "ping", reply_to=None)
786  
787          assert result.success is True
788          adapter._send_request.assert_awaited_once()
789          cmd = adapter._send_request.await_args.args[0]
790          assert cmd == APP_CMD_SEND
791