test_feishu.py
1 """Tests for the Feishu gateway integration.""" 2 3 import asyncio 4 import json 5 import os 6 import tempfile 7 import time 8 import unittest 9 from pathlib import Path 10 from types import SimpleNamespace 11 from typing import Dict 12 from unittest.mock import AsyncMock, Mock, patch 13 14 from gateway.platforms.base import ProcessingOutcome 15 16 try: 17 import lark_oapi 18 _HAS_LARK_OAPI = True 19 except ImportError: 20 _HAS_LARK_OAPI = False 21 22 23 def _mock_event_dispatcher_builder(mock_handler_class): 24 mock_builder = Mock() 25 mock_builder.register_p2_im_message_message_read_v1 = Mock(return_value=mock_builder) 26 mock_builder.register_p2_im_message_receive_v1 = Mock(return_value=mock_builder) 27 mock_builder.register_p2_im_message_reaction_created_v1 = Mock(return_value=mock_builder) 28 mock_builder.register_p2_im_message_reaction_deleted_v1 = Mock(return_value=mock_builder) 29 mock_builder.register_p2_card_action_trigger = Mock(return_value=mock_builder) 30 mock_builder.build = Mock(return_value=object()) 31 mock_handler_class.builder = Mock(return_value=mock_builder) 32 return mock_builder 33 34 35 class TestConfigEnvOverrides(unittest.TestCase): 36 @patch.dict(os.environ, { 37 "FEISHU_APP_ID": "cli_xxx", 38 "FEISHU_APP_SECRET": "secret_xxx", 39 "FEISHU_CONNECTION_MODE": "websocket", 40 "FEISHU_DOMAIN": "feishu", 41 }, clear=False) 42 def test_feishu_config_loaded_from_env(self): 43 from gateway.config import GatewayConfig, Platform, _apply_env_overrides 44 45 config = GatewayConfig() 46 _apply_env_overrides(config) 47 48 self.assertIn(Platform.FEISHU, config.platforms) 49 self.assertTrue(config.platforms[Platform.FEISHU].enabled) 50 self.assertEqual(config.platforms[Platform.FEISHU].extra["app_id"], "cli_xxx") 51 self.assertEqual(config.platforms[Platform.FEISHU].extra["connection_mode"], "websocket") 52 53 @patch.dict(os.environ, { 54 "FEISHU_APP_ID": "cli_xxx", 55 "FEISHU_APP_SECRET": "secret_xxx", 56 "FEISHU_HOME_CHANNEL": "oc_xxx", 57 }, clear=False) 58 def test_feishu_home_channel_loaded(self): 59 from gateway.config import GatewayConfig, Platform, _apply_env_overrides 60 61 config = GatewayConfig() 62 _apply_env_overrides(config) 63 64 home = config.platforms[Platform.FEISHU].home_channel 65 self.assertIsNotNone(home) 66 self.assertEqual(home.chat_id, "oc_xxx") 67 68 @patch.dict(os.environ, { 69 "FEISHU_APP_ID": "cli_xxx", 70 "FEISHU_APP_SECRET": "secret_xxx", 71 }, clear=False) 72 def test_feishu_in_connected_platforms(self): 73 from gateway.config import GatewayConfig, Platform, _apply_env_overrides 74 75 config = GatewayConfig() 76 _apply_env_overrides(config) 77 78 self.assertIn(Platform.FEISHU, config.get_connected_platforms()) 79 80 81 class TestFeishuMessageNormalization(unittest.TestCase): 82 def test_normalize_merge_forward_preserves_summary_lines(self): 83 from gateway.platforms.feishu import normalize_feishu_message 84 85 normalized = normalize_feishu_message( 86 message_type="merge_forward", 87 raw_content=json.dumps( 88 { 89 "title": "Sprint recap", 90 "messages": [ 91 {"sender_name": "Alice", "text": "Please review PR-128"}, 92 { 93 "sender_name": "Bob", 94 "message_type": "post", 95 "content": { 96 "en_us": { 97 "content": [[{"tag": "text", "text": "Ship it"}]], 98 } 99 }, 100 }, 101 ], 102 } 103 ), 104 ) 105 106 self.assertEqual(normalized.relation_kind, "merge_forward") 107 self.assertEqual( 108 normalized.text_content, 109 "Sprint recap\n- Alice: Please review PR-128\n- Bob: Ship it", 110 ) 111 112 def test_normalize_share_chat_exposes_summary_and_metadata(self): 113 from gateway.platforms.feishu import normalize_feishu_message 114 115 normalized = normalize_feishu_message( 116 message_type="share_chat", 117 raw_content=json.dumps( 118 { 119 "chat_id": "oc_chat_shared", 120 "chat_name": "Backend Guild", 121 } 122 ), 123 ) 124 125 self.assertEqual(normalized.relation_kind, "share_chat") 126 self.assertEqual(normalized.text_content, "Shared chat: Backend Guild\nChat ID: oc_chat_shared") 127 self.assertEqual(normalized.metadata["chat_id"], "oc_chat_shared") 128 self.assertEqual(normalized.metadata["chat_name"], "Backend Guild") 129 130 def test_normalize_interactive_card_preserves_title_body_and_actions(self): 131 from gateway.platforms.feishu import normalize_feishu_message 132 133 normalized = normalize_feishu_message( 134 message_type="interactive", 135 raw_content=json.dumps( 136 { 137 "card": { 138 "header": {"title": {"tag": "plain_text", "content": "Build Failed"}}, 139 "elements": [ 140 {"tag": "div", "text": {"tag": "lark_md", "content": "Service: payments-api"}}, 141 {"tag": "div", "text": {"tag": "plain_text", "content": "Branch: main"}}, 142 { 143 "tag": "action", 144 "actions": [ 145 {"tag": "button", "text": {"tag": "plain_text", "content": "View Logs"}}, 146 {"tag": "button", "text": {"tag": "plain_text", "content": "Retry"}}, 147 ], 148 }, 149 ], 150 } 151 } 152 ), 153 ) 154 155 self.assertEqual(normalized.relation_kind, "interactive") 156 self.assertEqual( 157 normalized.text_content, 158 "Build Failed\nService: payments-api\nBranch: main\nView Logs\nRetry\nActions: View Logs, Retry", 159 ) 160 161 162 class TestFeishuAdapterMessaging(unittest.TestCase): 163 @patch.dict(os.environ, { 164 "FEISHU_APP_ID": "cli_app", 165 "FEISHU_APP_SECRET": "secret_app", 166 "FEISHU_CONNECTION_MODE": "webhook", 167 "FEISHU_WEBHOOK_HOST": "127.0.0.1", 168 "FEISHU_WEBHOOK_PORT": "9001", 169 "FEISHU_WEBHOOK_PATH": "/hook", 170 }, clear=True) 171 def test_connect_webhook_mode_starts_local_server(self): 172 from gateway.config import PlatformConfig 173 from gateway.platforms.feishu import FeishuAdapter 174 175 adapter = FeishuAdapter(PlatformConfig()) 176 runner = AsyncMock() 177 site = AsyncMock() 178 web_module = SimpleNamespace( 179 Application=lambda: SimpleNamespace(router=SimpleNamespace(add_post=lambda *_args, **_kwargs: None)), 180 AppRunner=lambda _app: runner, 181 TCPSite=lambda _runner, host, port: SimpleNamespace(start=site.start, host=host, port=port), 182 ) 183 184 with ( 185 patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True), 186 patch("gateway.platforms.feishu.FEISHU_WEBHOOK_AVAILABLE", True), 187 patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class, 188 patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)), 189 patch("gateway.platforms.feishu.release_scoped_lock"), 190 patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()), 191 patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()), 192 patch("gateway.platforms.feishu.web", web_module), 193 ): 194 _mock_event_dispatcher_builder(mock_handler_class) 195 connected = asyncio.run(adapter.connect()) 196 197 self.assertTrue(connected) 198 runner.setup.assert_awaited_once() 199 site.start.assert_awaited_once() 200 201 @patch.dict(os.environ, { 202 "FEISHU_APP_ID": "cli_app", 203 "FEISHU_APP_SECRET": "secret_app", 204 }, clear=True) 205 def test_connect_acquires_scoped_lock_and_disconnect_releases_it(self): 206 from gateway.config import PlatformConfig 207 from gateway.platforms.feishu import FeishuAdapter 208 209 adapter = FeishuAdapter(PlatformConfig()) 210 ws_client = SimpleNamespace() 211 212 with ( 213 patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True), 214 patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True), 215 patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))), 216 patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class, 217 patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client), 218 patch("gateway.platforms.feishu._run_official_feishu_ws_client"), 219 patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)) as acquire_lock, 220 patch("gateway.platforms.feishu.release_scoped_lock") as release_lock, 221 patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()), 222 patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()), 223 ): 224 _mock_event_dispatcher_builder(mock_handler_class) 225 226 loop = asyncio.new_event_loop() 227 future = loop.create_future() 228 future.set_result(None) 229 230 class _Loop: 231 def run_in_executor(self, *_args, **_kwargs): 232 return future 233 234 def is_closed(self): 235 return False 236 237 try: 238 with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=_Loop()): 239 connected = asyncio.run(adapter.connect()) 240 asyncio.run(adapter.disconnect()) 241 finally: 242 loop.close() 243 244 self.assertTrue(connected) 245 self.assertIsNone(adapter._event_handler) 246 acquire_lock.assert_called_once_with( 247 "feishu-app-id", 248 "cli_app", 249 metadata={"platform": "feishu"}, 250 ) 251 release_lock.assert_called_once_with("feishu-app-id", "cli_app") 252 253 @patch.dict(os.environ, { 254 "FEISHU_APP_ID": "cli_app", 255 "FEISHU_APP_SECRET": "secret_app", 256 }, clear=True) 257 def test_connect_rejects_existing_app_lock(self): 258 from gateway.config import PlatformConfig 259 from gateway.platforms.feishu import FeishuAdapter 260 261 adapter = FeishuAdapter(PlatformConfig()) 262 263 with ( 264 patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True), 265 patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True), 266 patch( 267 "gateway.platforms.feishu.acquire_scoped_lock", 268 return_value=(False, {"pid": 4321}), 269 ), 270 ): 271 connected = asyncio.run(adapter.connect()) 272 273 self.assertFalse(connected) 274 self.assertEqual(adapter.fatal_error_code, "feishu_app_lock") 275 self.assertFalse(adapter.fatal_error_retryable) 276 self.assertIn("PID 4321", adapter.fatal_error_message) 277 278 @patch.dict(os.environ, { 279 "FEISHU_APP_ID": "cli_app", 280 "FEISHU_APP_SECRET": "secret_app", 281 }, clear=True) 282 def test_connect_retries_transient_startup_failure(self): 283 from gateway.config import PlatformConfig 284 from gateway.platforms.feishu import FeishuAdapter 285 286 adapter = FeishuAdapter(PlatformConfig()) 287 ws_client = SimpleNamespace() 288 sleeps = [] 289 290 with ( 291 patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True), 292 patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True), 293 patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))), 294 patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class, 295 patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client), 296 patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)), 297 patch("gateway.platforms.feishu.release_scoped_lock"), 298 patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()), 299 patch("gateway.platforms.feishu.asyncio.sleep", side_effect=lambda delay: sleeps.append(delay)), 300 patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()), 301 ): 302 _mock_event_dispatcher_builder(mock_handler_class) 303 304 loop = asyncio.new_event_loop() 305 future = loop.create_future() 306 future.set_result(None) 307 308 class _Loop: 309 def __init__(self): 310 self.calls = 0 311 312 def run_in_executor(self, *_args, **_kwargs): 313 self.calls += 1 314 if self.calls == 1: 315 raise OSError("temporary websocket failure") 316 return future 317 318 def is_closed(self): 319 return False 320 321 fake_loop = _Loop() 322 try: 323 with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=fake_loop): 324 connected = asyncio.run(adapter.connect()) 325 finally: 326 loop.close() 327 328 self.assertTrue(connected) 329 self.assertEqual(sleeps, [1]) 330 self.assertEqual(fake_loop.calls, 2) 331 332 @patch.dict(os.environ, {}, clear=True) 333 def test_edit_message_updates_existing_feishu_message(self): 334 from gateway.config import PlatformConfig 335 from gateway.platforms.feishu import FeishuAdapter 336 337 adapter = FeishuAdapter(PlatformConfig()) 338 captured = {} 339 340 class _MessageAPI: 341 def update(self, request): 342 captured["request"] = request 343 return SimpleNamespace(success=lambda: True) 344 345 adapter._client = SimpleNamespace( 346 im=SimpleNamespace( 347 v1=SimpleNamespace( 348 message=_MessageAPI(), 349 ) 350 ) 351 ) 352 353 async def _direct(func, *args, **kwargs): 354 return func(*args, **kwargs) 355 356 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 357 result = asyncio.run( 358 adapter.edit_message( 359 chat_id="oc_chat", 360 message_id="om_progress", 361 content="📖 read_file: \"/tmp/image.png\"", 362 ) 363 ) 364 365 self.assertTrue(result.success) 366 self.assertEqual(result.message_id, "om_progress") 367 self.assertEqual(captured["request"].message_id, "om_progress") 368 self.assertEqual(captured["request"].request_body.msg_type, "text") 369 self.assertEqual( 370 captured["request"].request_body.content, 371 json.dumps({"text": "📖 read_file: \"/tmp/image.png\""}, ensure_ascii=False), 372 ) 373 374 @patch.dict(os.environ, {}, clear=True) 375 def test_edit_message_falls_back_to_text_when_post_update_is_rejected(self): 376 from gateway.config import PlatformConfig 377 from gateway.platforms.feishu import FeishuAdapter 378 379 adapter = FeishuAdapter(PlatformConfig()) 380 captured = {"calls": []} 381 382 class _MessageAPI: 383 def update(self, request): 384 captured["calls"].append(request) 385 if len(captured["calls"]) == 1: 386 return SimpleNamespace(success=lambda: False, code=230001, msg="content format of the post type is incorrect") 387 return SimpleNamespace(success=lambda: True) 388 389 adapter._client = SimpleNamespace( 390 im=SimpleNamespace( 391 v1=SimpleNamespace( 392 message=_MessageAPI(), 393 ) 394 ) 395 ) 396 397 async def _direct(func, *args, **kwargs): 398 return func(*args, **kwargs) 399 400 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 401 result = asyncio.run( 402 adapter.edit_message( 403 chat_id="oc_chat", 404 message_id="om_progress", 405 content="可以用 **粗体** 和 *斜体*。", 406 ) 407 ) 408 409 self.assertTrue(result.success) 410 self.assertEqual(captured["calls"][0].request_body.msg_type, "post") 411 self.assertEqual(captured["calls"][1].request_body.msg_type, "text") 412 self.assertEqual( 413 captured["calls"][1].request_body.content, 414 json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False), 415 ) 416 417 @patch.dict(os.environ, {}, clear=True) 418 def test_get_chat_info_uses_real_feishu_chat_api(self): 419 from gateway.config import PlatformConfig 420 from gateway.platforms.feishu import FeishuAdapter 421 422 adapter = FeishuAdapter(PlatformConfig()) 423 424 class _ChatAPI: 425 def get(self, request): 426 self.request = request 427 return SimpleNamespace( 428 success=lambda: True, 429 data=SimpleNamespace(name="Hermes Group", chat_type="group"), 430 ) 431 432 chat_api = _ChatAPI() 433 adapter._client = SimpleNamespace( 434 im=SimpleNamespace( 435 v1=SimpleNamespace( 436 chat=chat_api, 437 ) 438 ) 439 ) 440 441 async def _direct(func, *args, **kwargs): 442 return func(*args, **kwargs) 443 444 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 445 info = asyncio.run(adapter.get_chat_info("oc_chat")) 446 447 self.assertEqual(chat_api.request.chat_id, "oc_chat") 448 self.assertEqual(info["chat_id"], "oc_chat") 449 self.assertEqual(info["name"], "Hermes Group") 450 self.assertEqual(info["type"], "group") 451 452 class TestAdapterModule(unittest.TestCase): 453 def test_load_settings_uses_sdk_defaults_for_invalid_ws_reconnect_values(self): 454 from gateway.platforms.feishu import FeishuAdapter 455 456 settings = FeishuAdapter._load_settings( 457 { 458 "ws_reconnect_nonce": -1, 459 "ws_reconnect_interval": "bad", 460 } 461 ) 462 463 self.assertEqual(settings.ws_reconnect_nonce, 30) 464 self.assertEqual(settings.ws_reconnect_interval, 120) 465 466 def test_load_settings_accepts_custom_ws_reconnect_values(self): 467 from gateway.platforms.feishu import FeishuAdapter 468 469 settings = FeishuAdapter._load_settings( 470 { 471 "ws_reconnect_nonce": 0, 472 "ws_reconnect_interval": 3, 473 } 474 ) 475 476 self.assertEqual(settings.ws_reconnect_nonce, 0) 477 self.assertEqual(settings.ws_reconnect_interval, 3) 478 479 def test_load_settings_accepts_custom_ws_ping_values(self): 480 from gateway.platforms.feishu import FeishuAdapter 481 482 settings = FeishuAdapter._load_settings( 483 { 484 "ws_ping_interval": 10, 485 "ws_ping_timeout": 8, 486 } 487 ) 488 489 self.assertEqual(settings.ws_ping_interval, 10) 490 self.assertEqual(settings.ws_ping_timeout, 8) 491 492 def test_load_settings_ignores_invalid_ws_ping_values(self): 493 from gateway.platforms.feishu import FeishuAdapter 494 495 settings = FeishuAdapter._load_settings( 496 { 497 "ws_ping_interval": 0, 498 "ws_ping_timeout": -1, 499 } 500 ) 501 502 self.assertIsNone(settings.ws_ping_interval) 503 self.assertIsNone(settings.ws_ping_timeout) 504 505 def test_runtime_ws_overrides_reapply_after_sdk_configure(self): 506 import sys 507 from types import ModuleType 508 509 class _FakeWSClient: 510 def __init__(self): 511 self._reconnect_nonce = 30 512 self._reconnect_interval = 120 513 self._ping_interval = 120 514 self.configure_calls = [] 515 516 def _configure(self, conf): 517 self.configure_calls.append(conf) 518 self._reconnect_nonce = conf.ReconnectNonce 519 self._reconnect_interval = conf.ReconnectInterval 520 self._ping_interval = conf.PingInterval 521 522 def start(self): 523 conf = SimpleNamespace(ReconnectNonce=99, ReconnectInterval=88, PingInterval=77) 524 self._configure(conf) 525 raise RuntimeError("stop test client") 526 527 fake_client = _FakeWSClient() 528 fake_adapter = SimpleNamespace( 529 _ws_thread_loop=None, 530 _ws_reconnect_nonce=2, 531 _ws_reconnect_interval=3, 532 _ws_ping_interval=4, 533 _ws_ping_timeout=5, 534 ) 535 fake_client_module = ModuleType("lark_oapi.ws.client") 536 fake_client_module.loop = None 537 fake_client_module.websockets = SimpleNamespace(connect=AsyncMock()) 538 fake_ws_module = ModuleType("lark_oapi.ws") 539 fake_ws_module.client = fake_client_module 540 fake_root_module = ModuleType("lark_oapi") 541 fake_root_module.ws = fake_ws_module 542 543 original_modules = sys.modules.copy() 544 sys.modules["lark_oapi"] = fake_root_module 545 sys.modules["lark_oapi.ws"] = fake_ws_module 546 sys.modules["lark_oapi.ws.client"] = fake_client_module 547 try: 548 from gateway.platforms.feishu import _run_official_feishu_ws_client 549 550 _run_official_feishu_ws_client(fake_client, fake_adapter) 551 finally: 552 sys.modules.clear() 553 sys.modules.update(original_modules) 554 555 self.assertEqual(len(fake_client.configure_calls), 1) 556 self.assertEqual(fake_client._reconnect_nonce, 2) 557 self.assertEqual(fake_client._reconnect_interval, 3) 558 self.assertEqual(fake_client._ping_interval, 4) 559 560 561 def _admits_group(adapter, message, sender_id, chat_id=""): 562 """Group-path shim: run a message through ``_admit`` and return a bool.""" 563 sender = SimpleNamespace(sender_type="user", sender_id=sender_id) 564 if not hasattr(message, "chat_type"): 565 message.chat_type = "group" 566 if chat_id: 567 message.chat_id = chat_id 568 return adapter._admit(sender, message) is None 569 570 571 class TestAdapterBehavior(unittest.TestCase): 572 @patch.dict(os.environ, {}, clear=True) 573 def test_build_event_handler_registers_reaction_and_card_processors(self): 574 from gateway.config import PlatformConfig 575 from gateway.platforms.feishu import FeishuAdapter 576 577 adapter = FeishuAdapter(PlatformConfig()) 578 calls = [] 579 580 class _Builder: 581 def register_p2_im_message_message_read_v1(self, _handler): 582 calls.append("message_read") 583 return self 584 585 def register_p2_im_message_receive_v1(self, _handler): 586 calls.append("message_receive") 587 return self 588 589 def register_p2_im_message_reaction_created_v1(self, _handler): 590 calls.append("reaction_created") 591 return self 592 593 def register_p2_im_message_reaction_deleted_v1(self, _handler): 594 calls.append("reaction_deleted") 595 return self 596 597 def register_p2_card_action_trigger(self, _handler): 598 calls.append("card_action") 599 return self 600 601 def register_p2_im_chat_member_bot_added_v1(self, _handler): 602 calls.append("bot_added") 603 return self 604 605 def register_p2_im_chat_member_bot_deleted_v1(self, _handler): 606 calls.append("bot_deleted") 607 return self 608 609 def register_p2_im_chat_access_event_bot_p2p_chat_entered_v1(self, _handler): 610 calls.append("p2p_chat_entered") 611 return self 612 613 def register_p2_im_message_recalled_v1(self, _handler): 614 calls.append("message_recalled") 615 return self 616 617 def register_p2_customized_event(self, event_key, _handler): 618 calls.append(f"customized:{event_key}") 619 return self 620 621 def build(self): 622 calls.append("build") 623 return "handler" 624 625 class _Dispatcher: 626 @staticmethod 627 def builder(_encrypt_key, _verification_token): 628 calls.append("builder") 629 return _Builder() 630 631 with patch("gateway.platforms.feishu.EventDispatcherHandler", _Dispatcher): 632 handler = adapter._build_event_handler() 633 634 self.assertEqual(handler, "handler") 635 self.assertEqual( 636 calls, 637 [ 638 "builder", 639 "message_read", 640 "message_receive", 641 "reaction_created", 642 "reaction_deleted", 643 "card_action", 644 "bot_added", 645 "bot_deleted", 646 "p2p_chat_entered", 647 "message_recalled", 648 "customized:drive.notice.comment_add_v1", 649 "build", 650 ], 651 ) 652 653 @patch.dict(os.environ, {}, clear=True) 654 def test_bot_origin_reactions_are_dropped_to_avoid_feedback_loops(self): 655 from gateway.config import PlatformConfig 656 from gateway.platforms.feishu import FeishuAdapter 657 658 adapter = FeishuAdapter(PlatformConfig()) 659 adapter._loop = object() 660 661 for emoji in ("Typing", "CrossMark"): 662 event = SimpleNamespace( 663 message_id="om_msg", 664 operator_type="bot", 665 reaction_type=SimpleNamespace(emoji_type=emoji), 666 ) 667 data = SimpleNamespace(event=event) 668 with patch( 669 "gateway.platforms.feishu.asyncio.run_coroutine_threadsafe" 670 ) as run_threadsafe: 671 adapter._on_reaction_event("im.message.reaction.created_v1", data) 672 run_threadsafe.assert_not_called() 673 674 @patch.dict(os.environ, {}, clear=True) 675 def test_user_reaction_with_managed_emoji_is_still_routed(self): 676 # Operator-origin filter is enough to prevent feedback loops; we must 677 # not additionally swallow user-origin reactions just because their 678 # emoji happens to collide with a lifecycle emoji. 679 from gateway.config import PlatformConfig 680 from gateway.platforms.feishu import FeishuAdapter 681 682 adapter = FeishuAdapter(PlatformConfig()) 683 adapter._loop = SimpleNamespace(is_closed=lambda: False) 684 685 event = SimpleNamespace( 686 message_id="om_msg", 687 operator_type="user", 688 reaction_type=SimpleNamespace(emoji_type="Typing"), 689 ) 690 data = SimpleNamespace(event=event) 691 692 def _close_coro_and_return_future(coro, _loop): 693 coro.close() 694 return SimpleNamespace(add_done_callback=lambda _: None) 695 696 with patch( 697 "gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", 698 side_effect=_close_coro_and_return_future, 699 ) as run_threadsafe: 700 adapter._on_reaction_event("im.message.reaction.created_v1", data) 701 run_threadsafe.assert_called_once() 702 703 def _build_reaction_adapter(self, *, msg_sender_id: str): 704 """Build a FeishuAdapter wired up to return a single GET-message result.""" 705 from gateway.config import PlatformConfig 706 from gateway.platforms.feishu import FeishuAdapter 707 708 adapter = FeishuAdapter(PlatformConfig()) 709 adapter._app_id = "cli_self_app" 710 adapter._bot_open_id = "ou_self_bot" 711 adapter._bot_user_id = "u_self_bot" 712 713 msg = SimpleNamespace( 714 sender=SimpleNamespace(sender_type="app", id=msg_sender_id, id_type="app_id"), 715 chat_id="oc_chat", 716 chat_type="group", 717 ) 718 response = SimpleNamespace(success=lambda: True, data=SimpleNamespace(items=[msg])) 719 adapter._client = SimpleNamespace( 720 im=SimpleNamespace( 721 v1=SimpleNamespace(message=SimpleNamespace(get=Mock(return_value=response))) 722 ) 723 ) 724 adapter._build_get_message_request = Mock(return_value=object()) 725 adapter._handle_message_with_guards = AsyncMock() 726 adapter._resolve_sender_profile = AsyncMock( 727 return_value={"user_id": "u_human", "user_name": "Human", "user_id_alt": None} 728 ) 729 adapter.get_chat_info = AsyncMock(return_value={"name": "Test Chat"}) 730 return adapter 731 732 @patch.dict(os.environ, {}, clear=True) 733 def test_reaction_on_peer_bot_message_is_not_routed(self): 734 # GET im/v1/messages sender for bot messages carries id=app_id; a peer 735 # bot's message has a different app_id than ours, so it must be dropped. 736 adapter = self._build_reaction_adapter(msg_sender_id="cli_peer_app") 737 738 event = SimpleNamespace( 739 message_id="om_peer_msg", 740 user_id=SimpleNamespace(open_id="ou_human", user_id=None, union_id=None), 741 reaction_type=SimpleNamespace(emoji_type="THUMBSUP"), 742 ) 743 data = SimpleNamespace(event=event) 744 asyncio.run( 745 adapter._handle_reaction_event("im.message.reaction.created_v1", data) 746 ) 747 adapter._handle_message_with_guards.assert_not_awaited() 748 749 @patch.dict(os.environ, {}, clear=True) 750 def test_reaction_on_our_own_bot_message_is_routed(self): 751 adapter = self._build_reaction_adapter(msg_sender_id="cli_self_app") 752 753 event = SimpleNamespace( 754 message_id="om_self_msg", 755 user_id=SimpleNamespace(open_id="ou_human", user_id=None, union_id=None), 756 reaction_type=SimpleNamespace(emoji_type="THUMBSUP"), 757 ) 758 data = SimpleNamespace(event=event) 759 asyncio.run( 760 adapter._handle_reaction_event("im.message.reaction.created_v1", data) 761 ) 762 adapter._handle_message_with_guards.assert_awaited_once() 763 764 @patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True) 765 def test_group_message_requires_mentions_even_when_policy_open(self): 766 from gateway.config import PlatformConfig 767 from gateway.platforms.feishu import FeishuAdapter 768 769 adapter = FeishuAdapter(PlatformConfig()) 770 message = SimpleNamespace(mentions=[]) 771 sender_id = SimpleNamespace(open_id="ou_any", user_id=None) 772 self.assertFalse(_admits_group(adapter, message, sender_id, "")) 773 774 message_with_mention = SimpleNamespace(mentions=[SimpleNamespace(key="@_user_1")]) 775 self.assertFalse(_admits_group(adapter, message_with_mention, sender_id, "")) 776 777 @patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True) 778 def test_group_message_with_other_user_mention_is_rejected_when_bot_identity_unknown(self): 779 from gateway.config import PlatformConfig 780 from gateway.platforms.feishu import FeishuAdapter 781 782 adapter = FeishuAdapter(PlatformConfig()) 783 sender_id = SimpleNamespace(open_id="ou_any", user_id=None) 784 other_mention = SimpleNamespace( 785 name="Other User", 786 id=SimpleNamespace(open_id="ou_other", user_id="u_other"), 787 ) 788 789 self.assertFalse( 790 _admits_group(adapter, SimpleNamespace(mentions=[other_mention]), sender_id, "") 791 ) 792 793 @patch.dict( 794 os.environ, 795 { 796 "FEISHU_GROUP_POLICY": "allowlist", 797 "FEISHU_ALLOWED_USERS": "ou_allowed", 798 "FEISHU_BOT_NAME": "Hermes Bot", 799 }, 800 clear=True, 801 ) 802 def test_group_message_allowlist_and_mention_both_required(self): 803 from gateway.config import PlatformConfig 804 from gateway.platforms.feishu import FeishuAdapter 805 806 adapter = FeishuAdapter(PlatformConfig()) 807 # Mention without IDs — name fallback legitimately engages. 808 mentioned = SimpleNamespace( 809 mentions=[ 810 SimpleNamespace( 811 name="Hermes Bot", 812 id=SimpleNamespace(open_id=None, user_id=None), 813 ) 814 ] 815 ) 816 817 self.assertTrue( 818 _admits_group(adapter, 819 mentioned, 820 SimpleNamespace(open_id="ou_allowed", user_id=None), 821 "", 822 ) 823 ) 824 self.assertFalse( 825 _admits_group(adapter, 826 mentioned, 827 SimpleNamespace(open_id="ou_blocked", user_id=None), 828 "", 829 ) 830 ) 831 832 def test_per_group_allowlist_policy_gates_by_sender(self): 833 from gateway.config import PlatformConfig 834 from gateway.platforms.feishu import FeishuAdapter 835 836 config = PlatformConfig( 837 extra={ 838 "group_rules": { 839 "oc_chat_a": { 840 "policy": "allowlist", 841 "allowlist": ["ou_alice", "ou_bob"], 842 } 843 } 844 } 845 ) 846 adapter = FeishuAdapter(config) 847 adapter._bot_open_id = "ou_bot" 848 849 message = SimpleNamespace( 850 mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))] 851 ) 852 853 self.assertTrue( 854 _admits_group(adapter, 855 message, 856 SimpleNamespace(open_id="ou_alice", user_id=None), 857 "oc_chat_a", 858 ) 859 ) 860 self.assertFalse( 861 _admits_group(adapter, 862 message, 863 SimpleNamespace(open_id="ou_charlie", user_id=None), 864 "oc_chat_a", 865 ) 866 ) 867 868 def test_per_group_blacklist_policy_blocks_specific_users(self): 869 from gateway.config import PlatformConfig 870 from gateway.platforms.feishu import FeishuAdapter 871 872 config = PlatformConfig( 873 extra={ 874 "group_rules": { 875 "oc_chat_b": { 876 "policy": "blacklist", 877 "blacklist": ["ou_blocked"], 878 } 879 } 880 } 881 ) 882 adapter = FeishuAdapter(config) 883 adapter._bot_open_id = "ou_bot" 884 885 message = SimpleNamespace( 886 mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))] 887 ) 888 889 self.assertTrue( 890 _admits_group(adapter, 891 message, 892 SimpleNamespace(open_id="ou_alice", user_id=None), 893 "oc_chat_b", 894 ) 895 ) 896 self.assertFalse( 897 _admits_group(adapter, 898 message, 899 SimpleNamespace(open_id="ou_blocked", user_id=None), 900 "oc_chat_b", 901 ) 902 ) 903 904 def test_per_group_admin_only_policy_requires_admin(self): 905 from gateway.config import PlatformConfig 906 from gateway.platforms.feishu import FeishuAdapter 907 908 config = PlatformConfig( 909 extra={ 910 "admins": ["ou_admin"], 911 "group_rules": { 912 "oc_chat_c": { 913 "policy": "admin_only", 914 } 915 }, 916 } 917 ) 918 adapter = FeishuAdapter(config) 919 adapter._bot_open_id = "ou_bot" 920 921 message = SimpleNamespace( 922 mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))] 923 ) 924 925 self.assertTrue( 926 _admits_group(adapter, 927 message, 928 SimpleNamespace(open_id="ou_admin", user_id=None), 929 "oc_chat_c", 930 ) 931 ) 932 self.assertFalse( 933 _admits_group(adapter, 934 message, 935 SimpleNamespace(open_id="ou_regular", user_id=None), 936 "oc_chat_c", 937 ) 938 ) 939 940 def test_per_group_disabled_policy_blocks_all(self): 941 from gateway.config import PlatformConfig 942 from gateway.platforms.feishu import FeishuAdapter 943 944 config = PlatformConfig( 945 extra={ 946 "admins": ["ou_admin"], 947 "group_rules": { 948 "oc_chat_d": { 949 "policy": "disabled", 950 } 951 }, 952 } 953 ) 954 adapter = FeishuAdapter(config) 955 adapter._bot_open_id = "ou_bot" 956 957 message = SimpleNamespace( 958 mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))] 959 ) 960 961 self.assertTrue( 962 _admits_group(adapter, 963 message, 964 SimpleNamespace(open_id="ou_admin", user_id=None), 965 "oc_chat_d", 966 ) 967 ) 968 self.assertFalse( 969 _admits_group(adapter, 970 message, 971 SimpleNamespace(open_id="ou_regular", user_id=None), 972 "oc_chat_d", 973 ) 974 ) 975 976 def test_global_admins_bypass_all_group_rules(self): 977 from gateway.config import PlatformConfig 978 from gateway.platforms.feishu import FeishuAdapter 979 980 config = PlatformConfig( 981 extra={ 982 "admins": ["ou_admin"], 983 "group_rules": { 984 "oc_chat_e": { 985 "policy": "allowlist", 986 "allowlist": ["ou_alice"], 987 } 988 }, 989 } 990 ) 991 adapter = FeishuAdapter(config) 992 adapter._bot_open_id = "ou_bot" 993 994 message = SimpleNamespace( 995 mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))] 996 ) 997 998 self.assertTrue( 999 _admits_group(adapter, 1000 message, 1001 SimpleNamespace(open_id="ou_admin", user_id=None), 1002 "oc_chat_e", 1003 ) 1004 ) 1005 1006 def test_default_group_policy_fallback_for_chats_without_explicit_rule(self): 1007 from gateway.config import PlatformConfig 1008 from gateway.platforms.feishu import FeishuAdapter 1009 1010 config = PlatformConfig( 1011 extra={ 1012 "default_group_policy": "open", 1013 } 1014 ) 1015 adapter = FeishuAdapter(config) 1016 adapter._bot_open_id = "ou_bot" 1017 1018 message = SimpleNamespace( 1019 mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))] 1020 ) 1021 1022 self.assertTrue( 1023 _admits_group(adapter, 1024 message, 1025 SimpleNamespace(open_id="ou_anyone", user_id=None), 1026 "oc_chat_unknown", 1027 ) 1028 ) 1029 1030 @patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True) 1031 def test_group_message_matches_bot_open_id_when_configured(self): 1032 from gateway.config import PlatformConfig 1033 from gateway.platforms.feishu import FeishuAdapter 1034 1035 adapter = FeishuAdapter(PlatformConfig()) 1036 adapter._bot_open_id = "ou_bot" 1037 sender_id = SimpleNamespace(open_id="ou_any", user_id=None) 1038 1039 bot_mention = SimpleNamespace( 1040 name="Hermes", 1041 id=SimpleNamespace(open_id="ou_bot", user_id="u_bot"), 1042 ) 1043 other_mention = SimpleNamespace( 1044 name="Other", 1045 id=SimpleNamespace(open_id="ou_other", user_id="u_other"), 1046 ) 1047 1048 self.assertTrue( 1049 _admits_group(adapter, SimpleNamespace(mentions=[bot_mention]), sender_id, "") 1050 ) 1051 self.assertFalse( 1052 _admits_group(adapter, SimpleNamespace(mentions=[other_mention]), sender_id, "") 1053 ) 1054 1055 @patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True) 1056 def test_group_message_matches_bot_name_when_only_name_available(self): 1057 """Name fallback engages when either side lacks an open_id. When BOTH 1058 the mention and the bot carry open_ids, IDs are authoritative — a 1059 same-name human with a different open_id must NOT admit.""" 1060 from gateway.config import PlatformConfig 1061 from gateway.platforms.feishu import FeishuAdapter 1062 1063 # Case 1: bot has only a name (open_id not hydrated / not configured). 1064 # Name fallback is the only available signal for any mention. 1065 adapter = FeishuAdapter(PlatformConfig()) 1066 adapter._bot_name = "Hermes Bot" 1067 sender_id = SimpleNamespace(open_id="ou_any", user_id=None) 1068 1069 name_only_mention = SimpleNamespace( 1070 name="Hermes Bot", 1071 id=SimpleNamespace(open_id=None, user_id=None), 1072 ) 1073 different_mention = SimpleNamespace( 1074 name="Another Bot", 1075 id=SimpleNamespace(open_id=None, user_id=None), 1076 ) 1077 1078 self.assertTrue( 1079 _admits_group(adapter, SimpleNamespace(mentions=[name_only_mention]), sender_id, "") 1080 ) 1081 self.assertFalse( 1082 _admits_group(adapter, SimpleNamespace(mentions=[different_mention]), sender_id, "") 1083 ) 1084 1085 # Case 2: bot's open_id IS known — a same-name human with different 1086 # open_id must NOT admit (IDs override names). 1087 adapter2 = FeishuAdapter(PlatformConfig()) 1088 adapter2._bot_open_id = "ou_bot" 1089 adapter2._bot_name = "Hermes Bot" 1090 1091 same_name_other_id_mention = SimpleNamespace( 1092 name="Hermes Bot", 1093 id=SimpleNamespace(open_id="ou_other", user_id="u_other"), 1094 ) 1095 bot_mention = SimpleNamespace( 1096 name="Hermes Bot", 1097 id=SimpleNamespace(open_id="ou_bot", user_id=None), 1098 ) 1099 1100 self.assertFalse( 1101 _admits_group( 1102 adapter2, 1103 SimpleNamespace(mentions=[same_name_other_id_mention]), 1104 sender_id, 1105 "", 1106 ) 1107 ) 1108 self.assertTrue( 1109 _admits_group(adapter2, SimpleNamespace(mentions=[bot_mention]), sender_id, "") 1110 ) 1111 1112 @patch.dict(os.environ, {}, clear=True) 1113 def test_extract_post_message_as_text(self): 1114 from gateway.config import PlatformConfig 1115 from gateway.platforms.feishu import FeishuAdapter 1116 1117 adapter = FeishuAdapter(PlatformConfig()) 1118 message = SimpleNamespace( 1119 message_type="post", 1120 content='{"zh_cn":{"title":"Title","content":[[{"tag":"text","text":"hello "}],[{"tag":"a","text":"doc","href":"https://example.com"}]]}}', 1121 message_id="om_post", 1122 ) 1123 1124 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1125 1126 self.assertEqual(text, "Title\nhello\n[doc](https://example.com)") 1127 self.assertEqual(msg_type.value, "text") 1128 self.assertEqual(media_urls, []) 1129 self.assertEqual(media_types, []) 1130 1131 @patch.dict(os.environ, {}, clear=True) 1132 def test_extract_post_message_uses_first_available_language_block(self): 1133 from gateway.config import PlatformConfig 1134 from gateway.platforms.feishu import FeishuAdapter 1135 1136 adapter = FeishuAdapter(PlatformConfig()) 1137 message = SimpleNamespace( 1138 message_type="post", 1139 content='{"fr_fr":{"title":"Subject","content":[[{"tag":"text","text":"bonjour"}]]}}', 1140 message_id="om_post_fr", 1141 ) 1142 1143 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1144 1145 self.assertEqual(text, "Subject\nbonjour") 1146 self.assertEqual(msg_type.value, "text") 1147 self.assertEqual(media_urls, []) 1148 self.assertEqual(media_types, []) 1149 1150 @patch.dict(os.environ, {}, clear=True) 1151 def test_extract_post_message_with_rich_elements_does_not_drop_content(self): 1152 from gateway.config import PlatformConfig 1153 from gateway.platforms.feishu import FeishuAdapter 1154 1155 adapter = FeishuAdapter(PlatformConfig()) 1156 message = SimpleNamespace( 1157 message_type="post", 1158 content=( 1159 '{"en_us":{"title":"Rich message","content":[' 1160 '[{"tag":"img","alt":"diagram"}],' 1161 '[{"tag":"at","user_name":"Alice"},{"tag":"text","text":" please check the attachment"}],' 1162 '[{"tag":"media","file_name":"spec.pdf"}],' 1163 '[{"tag":"emotion","emoji_type":"smile"}]' 1164 ']}}' 1165 ), 1166 message_id="om_post_rich", 1167 ) 1168 1169 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1170 1171 self.assertEqual(text, "Rich message\n[Image: diagram]\n@Alice please check the attachment\n[Attachment: spec.pdf]\n:smile:") 1172 self.assertEqual(msg_type.value, "text") 1173 self.assertEqual(media_urls, []) 1174 self.assertEqual(media_types, []) 1175 1176 @patch.dict(os.environ, {}, clear=True) 1177 def test_extract_post_message_downloads_embedded_resources(self): 1178 from gateway.config import PlatformConfig 1179 from gateway.platforms.feishu import FeishuAdapter 1180 1181 adapter = FeishuAdapter(PlatformConfig()) 1182 adapter._download_feishu_image = AsyncMock(return_value=("/tmp/feishu-image.png", "image/png")) 1183 adapter._download_feishu_message_resource = AsyncMock(return_value=("/tmp/spec.pdf", "application/pdf")) 1184 message = SimpleNamespace( 1185 message_type="post", 1186 content=( 1187 '{"en_us":{"title":"Rich message","content":[' 1188 '[{"tag":"img","image_key":"img_123","alt":"diagram"}],' 1189 '[{"tag":"media","file_key":"file_123","file_name":"spec.pdf"}]' 1190 ']}}' 1191 ), 1192 message_id="om_post_media", 1193 ) 1194 1195 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1196 1197 self.assertEqual(text, "Rich message\n[Image: diagram]\n[Attachment: spec.pdf]") 1198 self.assertEqual(msg_type.value, "text") 1199 self.assertEqual(media_urls, ["/tmp/feishu-image.png", "/tmp/spec.pdf"]) 1200 self.assertEqual(media_types, ["image/png", "application/pdf"]) 1201 adapter._download_feishu_image.assert_awaited_once_with( 1202 message_id="om_post_media", 1203 image_key="img_123", 1204 ) 1205 adapter._download_feishu_message_resource.assert_awaited_once_with( 1206 message_id="om_post_media", 1207 file_key="file_123", 1208 resource_type="file", 1209 fallback_filename="spec.pdf", 1210 ) 1211 1212 @patch.dict(os.environ, {}, clear=True) 1213 def test_extract_merge_forward_message_as_text_summary(self): 1214 from gateway.config import PlatformConfig 1215 from gateway.platforms.feishu import FeishuAdapter 1216 1217 adapter = FeishuAdapter(PlatformConfig()) 1218 message = SimpleNamespace( 1219 message_type="merge_forward", 1220 content=json.dumps( 1221 { 1222 "title": "Forwarded updates", 1223 "messages": [ 1224 {"sender_name": "Alice", "text": "Investigating the incident"}, 1225 {"sender_name": "Bob", "text": "ETA 10 minutes"}, 1226 ], 1227 } 1228 ), 1229 message_id="om_merge_forward", 1230 ) 1231 1232 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1233 1234 self.assertEqual( 1235 text, 1236 "Forwarded updates\n- Alice: Investigating the incident\n- Bob: ETA 10 minutes", 1237 ) 1238 self.assertEqual(msg_type.value, "text") 1239 self.assertEqual(media_urls, []) 1240 self.assertEqual(media_types, []) 1241 1242 @patch.dict(os.environ, {}, clear=True) 1243 def test_extract_share_chat_message_as_text_summary(self): 1244 from gateway.config import PlatformConfig 1245 from gateway.platforms.feishu import FeishuAdapter 1246 1247 adapter = FeishuAdapter(PlatformConfig()) 1248 message = SimpleNamespace( 1249 message_type="share_chat", 1250 content='{"chat_id":"oc_shared","chat_name":"Platform Ops"}', 1251 message_id="om_share_chat", 1252 ) 1253 1254 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1255 1256 self.assertEqual(text, "Shared chat: Platform Ops\nChat ID: oc_shared") 1257 self.assertEqual(msg_type.value, "text") 1258 self.assertEqual(media_urls, []) 1259 self.assertEqual(media_types, []) 1260 1261 @patch.dict(os.environ, {}, clear=True) 1262 def test_extract_interactive_message_as_text_summary(self): 1263 from gateway.config import PlatformConfig 1264 from gateway.platforms.feishu import FeishuAdapter 1265 1266 adapter = FeishuAdapter(PlatformConfig()) 1267 message = SimpleNamespace( 1268 message_type="interactive", 1269 content=json.dumps( 1270 { 1271 "card": { 1272 "header": {"title": {"tag": "plain_text", "content": "Approval Request"}}, 1273 "elements": [ 1274 {"tag": "div", "text": {"tag": "plain_text", "content": "Requester: Alice"}}, 1275 { 1276 "tag": "action", 1277 "actions": [ 1278 {"tag": "button", "text": {"tag": "plain_text", "content": "Approve"}}, 1279 ], 1280 }, 1281 ], 1282 } 1283 } 1284 ), 1285 message_id="om_interactive", 1286 ) 1287 1288 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1289 1290 self.assertEqual(text, "Approval Request\nRequester: Alice\nApprove\nActions: Approve") 1291 self.assertEqual(msg_type.value, "text") 1292 self.assertEqual(media_urls, []) 1293 self.assertEqual(media_types, []) 1294 1295 @patch.dict(os.environ, {}, clear=True) 1296 def test_extract_image_message_downloads_and_caches(self): 1297 from gateway.config import PlatformConfig 1298 from gateway.platforms.feishu import FeishuAdapter 1299 1300 adapter = FeishuAdapter(PlatformConfig()) 1301 adapter._download_feishu_image = AsyncMock(return_value=("/tmp/feishu-image.png", "image/png")) 1302 message = SimpleNamespace( 1303 message_type="image", 1304 content='{"image_key":"img_123"}', 1305 message_id="om_image", 1306 ) 1307 1308 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1309 1310 self.assertEqual(text, "") 1311 self.assertEqual(msg_type.value, "photo") 1312 self.assertEqual(media_urls, ["/tmp/feishu-image.png"]) 1313 self.assertEqual(media_types, ["image/png"]) 1314 adapter._download_feishu_image.assert_awaited_once_with( 1315 message_id="om_image", 1316 image_key="img_123", 1317 ) 1318 1319 @patch.dict(os.environ, {}, clear=True) 1320 def test_extract_audio_message_downloads_and_caches(self): 1321 from gateway.config import PlatformConfig 1322 from gateway.platforms.feishu import FeishuAdapter 1323 1324 adapter = FeishuAdapter(PlatformConfig()) 1325 adapter._download_feishu_message_resource = AsyncMock( 1326 return_value=("/tmp/feishu-audio.ogg", "audio/ogg") 1327 ) 1328 message = SimpleNamespace( 1329 message_type="audio", 1330 content='{"file_key":"file_audio","file_name":"voice.ogg"}', 1331 message_id="om_audio", 1332 ) 1333 1334 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1335 1336 self.assertEqual(text, "") 1337 self.assertEqual(msg_type.value, "audio") 1338 self.assertEqual(media_urls, ["/tmp/feishu-audio.ogg"]) 1339 self.assertEqual(media_types, ["audio/ogg"]) 1340 1341 @patch.dict(os.environ, {}, clear=True) 1342 def test_extract_file_message_downloads_and_caches(self): 1343 from gateway.config import PlatformConfig 1344 from gateway.platforms.feishu import FeishuAdapter 1345 1346 adapter = FeishuAdapter(PlatformConfig()) 1347 adapter._download_feishu_message_resource = AsyncMock( 1348 return_value=("/tmp/doc_123_report.pdf", "application/pdf") 1349 ) 1350 message = SimpleNamespace( 1351 message_type="file", 1352 content='{"file_key":"file_doc","file_name":"report.pdf"}', 1353 message_id="om_file", 1354 ) 1355 1356 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1357 1358 self.assertEqual(text, "") 1359 self.assertEqual(msg_type.value, "document") 1360 self.assertEqual(media_urls, ["/tmp/doc_123_report.pdf"]) 1361 self.assertEqual(media_types, ["application/pdf"]) 1362 1363 @patch.dict(os.environ, {}, clear=True) 1364 def test_extract_media_message_with_image_mime_becomes_photo(self): 1365 from gateway.config import PlatformConfig 1366 from gateway.platforms.feishu import FeishuAdapter 1367 1368 adapter = FeishuAdapter(PlatformConfig()) 1369 adapter._download_feishu_message_resource = AsyncMock( 1370 return_value=("/tmp/feishu-media.jpg", "image/jpeg") 1371 ) 1372 message = SimpleNamespace( 1373 message_type="media", 1374 content='{"file_key":"file_media","file_name":"photo.jpg"}', 1375 message_id="om_media", 1376 ) 1377 1378 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1379 1380 self.assertEqual(text, "") 1381 self.assertEqual(msg_type.value, "photo") 1382 self.assertEqual(media_urls, ["/tmp/feishu-media.jpg"]) 1383 self.assertEqual(media_types, ["image/jpeg"]) 1384 1385 @patch.dict(os.environ, {}, clear=True) 1386 def test_extract_media_message_with_video_mime_becomes_video(self): 1387 from gateway.config import PlatformConfig 1388 from gateway.platforms.feishu import FeishuAdapter 1389 1390 adapter = FeishuAdapter(PlatformConfig()) 1391 adapter._download_feishu_message_resource = AsyncMock( 1392 return_value=("/tmp/feishu-video.mp4", "video/mp4") 1393 ) 1394 message = SimpleNamespace( 1395 message_type="media", 1396 content='{"file_key":"file_video","file_name":"clip.mp4"}', 1397 message_id="om_video", 1398 ) 1399 1400 text, msg_type, media_urls, media_types, _mentions = asyncio.run(adapter._extract_message_content(message)) 1401 1402 self.assertEqual(text, "") 1403 self.assertEqual(msg_type.value, "video") 1404 self.assertEqual(media_urls, ["/tmp/feishu-video.mp4"]) 1405 self.assertEqual(media_types, ["video/mp4"]) 1406 1407 @patch.dict(os.environ, {}, clear=True) 1408 def test_extract_text_from_raw_content_uses_relation_message_fallbacks(self): 1409 from gateway.config import PlatformConfig 1410 from gateway.platforms.feishu import FeishuAdapter 1411 1412 adapter = FeishuAdapter(PlatformConfig()) 1413 1414 shared = adapter._extract_text_from_raw_content( 1415 msg_type="share_chat", 1416 raw_content='{"chat_id":"oc_shared","chat_name":"Platform Ops"}', 1417 ) 1418 attachment = adapter._extract_text_from_raw_content( 1419 msg_type="file", 1420 raw_content='{"file_key":"file_1","file_name":"report.pdf"}', 1421 ) 1422 1423 self.assertEqual(shared, "Shared chat: Platform Ops\nChat ID: oc_shared") 1424 self.assertEqual(attachment, "[Attachment: report.pdf]") 1425 1426 @patch.dict(os.environ, {}, clear=True) 1427 def test_extract_text_message_starting_with_slash_becomes_command(self): 1428 from gateway.config import PlatformConfig 1429 from gateway.platforms.feishu import FeishuAdapter 1430 1431 adapter = FeishuAdapter(PlatformConfig()) 1432 adapter._dispatch_inbound_event = AsyncMock() 1433 adapter.get_chat_info = AsyncMock( 1434 return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"} 1435 ) 1436 adapter._resolve_sender_profile = AsyncMock( 1437 return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None} 1438 ) 1439 message = SimpleNamespace( 1440 chat_id="oc_chat", 1441 thread_id=None, 1442 parent_id=None, 1443 upper_message_id=None, 1444 message_type="text", 1445 content='{"text":"/help test"}', 1446 message_id="om_command", 1447 ) 1448 1449 asyncio.run( 1450 adapter._process_inbound_message( 1451 data=SimpleNamespace(event=SimpleNamespace(message=message)), 1452 message=message, 1453 sender_id=SimpleNamespace(open_id="ou_user", user_id=None, union_id=None), 1454 is_bot=False, 1455 chat_type="p2p", 1456 message_id="om_command", 1457 ) 1458 ) 1459 1460 event = adapter._dispatch_inbound_event.await_args.args[0] 1461 self.assertEqual(event.message_type.value, "command") 1462 self.assertEqual(event.text, "/help test") 1463 1464 @patch.dict(os.environ, {}, clear=True) 1465 def test_extract_text_file_injects_content(self): 1466 from gateway.config import PlatformConfig 1467 from gateway.platforms.feishu import FeishuAdapter 1468 1469 adapter = FeishuAdapter(PlatformConfig()) 1470 with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as tmp: 1471 tmp.write("hello from feishu") 1472 path = tmp.name 1473 1474 try: 1475 text = asyncio.run(adapter._maybe_extract_text_document(path, "text/plain")) 1476 finally: 1477 os.unlink(path) 1478 1479 self.assertIn("hello from feishu", text) 1480 self.assertIn("[Content of", text) 1481 1482 @patch.dict(os.environ, {}, clear=True) 1483 def test_message_event_submits_to_adapter_loop(self): 1484 from gateway.config import PlatformConfig 1485 from gateway.platforms.feishu import FeishuAdapter 1486 1487 adapter = FeishuAdapter(PlatformConfig()) 1488 1489 class _Loop: 1490 def is_closed(self): 1491 return False 1492 1493 adapter._loop = _Loop() 1494 1495 message = SimpleNamespace( 1496 message_id="om_text", 1497 chat_type="p2p", 1498 chat_id="oc_chat", 1499 message_type="text", 1500 content='{"text":"hello"}', 1501 ) 1502 sender_id = SimpleNamespace(open_id="ou_user", user_id=None, union_id=None) 1503 sender = SimpleNamespace(sender_id=sender_id, sender_type="user") 1504 data = SimpleNamespace(event=SimpleNamespace(message=message, sender=sender)) 1505 1506 future = SimpleNamespace(add_done_callback=lambda *_args, **_kwargs: None) 1507 1508 def _submit(coro, _loop): 1509 coro.close() 1510 return future 1511 1512 with patch("gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", side_effect=_submit) as submit: 1513 adapter._on_message_event(data) 1514 1515 self.assertTrue(submit.called) 1516 1517 @patch.dict(os.environ, {}, clear=True) 1518 def test_webhook_request_uses_same_message_dispatch_path(self): 1519 from gateway.config import PlatformConfig 1520 from gateway.platforms.feishu import FeishuAdapter 1521 1522 adapter = FeishuAdapter(PlatformConfig()) 1523 adapter._on_message_event = Mock() 1524 1525 body = json.dumps({ 1526 "header": {"event_type": "im.message.receive_v1"}, 1527 "event": {"message": {"message_id": "om_test"}}, 1528 }).encode("utf-8") 1529 request = SimpleNamespace( 1530 remote="127.0.0.1", 1531 content_length=None, 1532 headers={}, 1533 read=AsyncMock(return_value=body), 1534 ) 1535 1536 response = asyncio.run(adapter._handle_webhook_request(request)) 1537 1538 self.assertEqual(response.status, 200) 1539 adapter._on_message_event.assert_called_once() 1540 1541 @patch.dict(os.environ, {}, clear=True) 1542 def test_process_inbound_message_uses_event_sender_identity_only(self): 1543 from gateway.config import PlatformConfig 1544 from gateway.platforms.base import MessageType 1545 from gateway.platforms.feishu import FeishuAdapter 1546 1547 adapter = FeishuAdapter(PlatformConfig()) 1548 adapter._dispatch_inbound_event = AsyncMock() 1549 # Sender name now comes from the contact API; mock it to return a known value. 1550 adapter._resolve_sender_name_from_api = AsyncMock(return_value="张三") 1551 adapter.get_chat_info = AsyncMock( 1552 return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"} 1553 ) 1554 message = SimpleNamespace( 1555 chat_id="oc_chat", 1556 thread_id=None, 1557 message_type="text", 1558 content='{"text":"hello"}', 1559 message_id="om_text", 1560 ) 1561 sender_id = SimpleNamespace( 1562 open_id="ou_user", 1563 user_id="u_user", 1564 union_id="on_union", 1565 ) 1566 sender = SimpleNamespace(sender_type="user", sender_id=sender_id) 1567 data = SimpleNamespace(event=SimpleNamespace(message=message, sender=sender)) 1568 1569 asyncio.run( 1570 adapter._process_inbound_message( 1571 data=data, 1572 message=message, 1573 sender_id=sender.sender_id, 1574 chat_type="p2p", 1575 message_id="om_text", 1576 ) 1577 ) 1578 1579 adapter._dispatch_inbound_event.assert_awaited_once() 1580 event = adapter._dispatch_inbound_event.await_args.args[0] 1581 self.assertEqual(event.message_type, MessageType.TEXT) 1582 self.assertEqual(event.source.user_id, "u_user") # tenant-scoped user_id preferred over app-scoped open_id 1583 self.assertEqual(event.source.user_name, "张三") 1584 self.assertEqual(event.source.user_id_alt, "on_union") 1585 self.assertEqual(event.source.chat_name, "Feishu DM") 1586 1587 @patch.dict(os.environ, {}, clear=True) 1588 def test_text_batch_merges_rapid_messages_into_single_event(self): 1589 from gateway.config import PlatformConfig 1590 from gateway.platforms.base import MessageEvent, MessageType 1591 from gateway.platforms.feishu import FeishuAdapter 1592 from gateway.session import SessionSource 1593 1594 adapter = FeishuAdapter(PlatformConfig()) 1595 adapter.handle_message = AsyncMock() 1596 source = SessionSource( 1597 platform=adapter.platform, 1598 chat_id="oc_chat", 1599 chat_name="Feishu DM", 1600 chat_type="dm", 1601 user_id="ou_user", 1602 user_name="张三", 1603 ) 1604 1605 async def _sleep(_delay): 1606 return None 1607 1608 async def _run() -> None: 1609 with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep): 1610 await adapter._dispatch_inbound_event( 1611 MessageEvent(text="A", message_type=MessageType.TEXT, source=source, message_id="om_1") 1612 ) 1613 await adapter._dispatch_inbound_event( 1614 MessageEvent(text="B", message_type=MessageType.TEXT, source=source, message_id="om_2") 1615 ) 1616 pending = list(adapter._pending_text_batch_tasks.values()) 1617 self.assertEqual(len(pending), 1) 1618 await asyncio.gather(*pending, return_exceptions=True) 1619 1620 asyncio.run(_run()) 1621 1622 adapter.handle_message.assert_awaited_once() 1623 event = adapter.handle_message.await_args.args[0] 1624 self.assertEqual(event.text, "A\nB") 1625 self.assertEqual(event.message_type, MessageType.TEXT) 1626 1627 @patch.dict( 1628 os.environ, 1629 { 1630 "HERMES_FEISHU_TEXT_BATCH_MAX_MESSAGES": "2", 1631 }, 1632 clear=True, 1633 ) 1634 def test_text_batch_flushes_when_message_count_limit_is_hit(self): 1635 from gateway.config import PlatformConfig 1636 from gateway.platforms.base import MessageEvent, MessageType 1637 from gateway.platforms.feishu import FeishuAdapter 1638 from gateway.session import SessionSource 1639 1640 adapter = FeishuAdapter(PlatformConfig()) 1641 adapter.handle_message = AsyncMock() 1642 source = SessionSource( 1643 platform=adapter.platform, 1644 chat_id="oc_chat", 1645 chat_name="Feishu DM", 1646 chat_type="dm", 1647 user_id="ou_user", 1648 user_name="张三", 1649 ) 1650 1651 async def _sleep(_delay): 1652 return None 1653 1654 async def _run() -> None: 1655 with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep): 1656 await adapter._dispatch_inbound_event( 1657 MessageEvent(text="A", message_type=MessageType.TEXT, source=source, message_id="om_1") 1658 ) 1659 await adapter._dispatch_inbound_event( 1660 MessageEvent(text="B", message_type=MessageType.TEXT, source=source, message_id="om_2") 1661 ) 1662 await adapter._dispatch_inbound_event( 1663 MessageEvent(text="C", message_type=MessageType.TEXT, source=source, message_id="om_3") 1664 ) 1665 pending = list(adapter._pending_text_batch_tasks.values()) 1666 self.assertEqual(len(pending), 1) 1667 await asyncio.gather(*pending, return_exceptions=True) 1668 1669 asyncio.run(_run()) 1670 1671 self.assertEqual(adapter.handle_message.await_count, 2) 1672 first = adapter.handle_message.await_args_list[0].args[0] 1673 second = adapter.handle_message.await_args_list[1].args[0] 1674 self.assertEqual(first.text, "A\nB") 1675 self.assertEqual(second.text, "C") 1676 1677 @patch.dict(os.environ, {}, clear=True) 1678 def test_media_batch_merges_rapid_photo_messages(self): 1679 from gateway.config import PlatformConfig 1680 from gateway.platforms.base import MessageEvent, MessageType 1681 from gateway.platforms.feishu import FeishuAdapter 1682 from gateway.session import SessionSource 1683 1684 adapter = FeishuAdapter(PlatformConfig()) 1685 adapter.handle_message = AsyncMock() 1686 source = SessionSource( 1687 platform=adapter.platform, 1688 chat_id="oc_chat", 1689 chat_name="Feishu DM", 1690 chat_type="dm", 1691 user_id="ou_user", 1692 user_name="张三", 1693 ) 1694 1695 async def _sleep(_delay): 1696 return None 1697 1698 async def _run() -> None: 1699 with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep): 1700 await adapter._dispatch_inbound_event( 1701 MessageEvent( 1702 text="第一张", 1703 message_type=MessageType.PHOTO, 1704 source=source, 1705 message_id="om_p1", 1706 media_urls=["/tmp/a.png"], 1707 media_types=["image/png"], 1708 ) 1709 ) 1710 await adapter._dispatch_inbound_event( 1711 MessageEvent( 1712 text="第二张", 1713 message_type=MessageType.PHOTO, 1714 source=source, 1715 message_id="om_p2", 1716 media_urls=["/tmp/b.png"], 1717 media_types=["image/png"], 1718 ) 1719 ) 1720 pending = list(adapter._pending_media_batch_tasks.values()) 1721 self.assertEqual(len(pending), 1) 1722 await asyncio.gather(*pending, return_exceptions=True) 1723 1724 asyncio.run(_run()) 1725 1726 adapter.handle_message.assert_awaited_once() 1727 event = adapter.handle_message.await_args.args[0] 1728 self.assertEqual(event.media_urls, ["/tmp/a.png", "/tmp/b.png"]) 1729 self.assertIn("第一张", event.text) 1730 self.assertIn("第二张", event.text) 1731 1732 @patch.dict(os.environ, {}, clear=True) 1733 def test_send_image_downloads_then_uses_native_image_send(self): 1734 from gateway.config import PlatformConfig 1735 from gateway.platforms.feishu import FeishuAdapter 1736 1737 adapter = FeishuAdapter(PlatformConfig()) 1738 adapter.send_image_file = AsyncMock(return_value=SimpleNamespace(success=True, message_id="om_img")) 1739 1740 async def _run(): 1741 with patch("gateway.platforms.feishu.cache_image_from_url", new=AsyncMock(return_value="/tmp/cached.png")): 1742 return await adapter.send_image("oc_chat", "https://example.com/cat.png", caption="cat") 1743 1744 result = asyncio.run(_run()) 1745 1746 self.assertTrue(result.success) 1747 adapter.send_image_file.assert_awaited_once() 1748 self.assertEqual(adapter.send_image_file.await_args.kwargs["image_path"], "/tmp/cached.png") 1749 1750 @patch.dict(os.environ, {}, clear=True) 1751 def test_send_animation_degrades_to_document_send(self): 1752 from gateway.config import PlatformConfig 1753 from gateway.platforms.feishu import FeishuAdapter 1754 1755 adapter = FeishuAdapter(PlatformConfig()) 1756 adapter.send_document = AsyncMock(return_value=SimpleNamespace(success=True, message_id="om_gif")) 1757 1758 async def _run(): 1759 with patch.object( 1760 adapter, 1761 "_download_remote_document", 1762 new=AsyncMock(return_value=("/tmp/anim.gif", "anim.gif")), 1763 ): 1764 return await adapter.send_animation("oc_chat", "https://example.com/anim.gif", caption="look") 1765 1766 result = asyncio.run(_run()) 1767 1768 self.assertTrue(result.success) 1769 adapter.send_document.assert_awaited_once() 1770 caption = adapter.send_document.await_args.kwargs["caption"] 1771 self.assertIn("GIF downgraded to file", caption) 1772 self.assertIn("look", caption) 1773 1774 def test_download_remote_document_reads_response_before_httpx_client_closes(self): 1775 """#18451 — snapshot Content-Type + body while the httpx.AsyncClient 1776 context is still active so pooled connections fully release on 1777 exit. Otherwise the response is only readable because httpx 1778 eagerly buffers it; a future refactor to .stream() would silently 1779 read-after-close.""" 1780 from gateway.config import PlatformConfig 1781 from gateway.platforms.feishu import FeishuAdapter 1782 1783 events: list[str] = [] 1784 1785 class _FakeResponse: 1786 headers = {"Content-Type": "application/octet-stream"} 1787 1788 def raise_for_status(self) -> None: 1789 events.append("raise_for_status") 1790 1791 @property 1792 def content(self) -> bytes: 1793 events.append("content_read") 1794 return b"doc-bytes" 1795 1796 class _FakeAsyncClient: 1797 def __init__(self, *_a: object, **_k: object) -> None: 1798 pass 1799 1800 async def __aenter__(self) -> "_FakeAsyncClient": 1801 events.append("client_enter") 1802 return self 1803 1804 async def __aexit__(self, *exc: object) -> None: 1805 events.append("client_exit") 1806 1807 async def get(self, *_a: object, **_k: object) -> _FakeResponse: 1808 events.append("get") 1809 return _FakeResponse() 1810 1811 with tempfile.TemporaryDirectory() as tmp: 1812 with patch.dict(os.environ, {"HERMES_HOME": tmp}, clear=False): 1813 adapter = FeishuAdapter(PlatformConfig()) 1814 1815 async def _run() -> tuple[str, str]: 1816 with patch("tools.url_safety.is_safe_url", return_value=True): 1817 with patch("httpx.AsyncClient", _FakeAsyncClient): 1818 with patch( 1819 "gateway.platforms.feishu.cache_document_from_bytes", 1820 return_value="/tmp/cached-doc.bin", 1821 ): 1822 return await adapter._download_remote_document( 1823 "https://example.com/doc.bin", 1824 default_ext=".bin", 1825 preferred_name="doc", 1826 ) 1827 1828 path, filename = asyncio.run(_run()) 1829 1830 self.assertEqual(path, "/tmp/cached-doc.bin") 1831 self.assertTrue(filename) 1832 # content_read MUST happen before client_exit — otherwise we're 1833 # reading response body after the connection pool has been torn 1834 # down, which only works by accident (httpx's eager buffering). 1835 self.assertLess(events.index("content_read"), events.index("client_exit")) 1836 1837 def test_dedup_state_persists_across_adapter_restart(self): 1838 from gateway.config import PlatformConfig 1839 from gateway.platforms.feishu import FeishuAdapter 1840 1841 with tempfile.TemporaryDirectory() as temp_home: 1842 with patch.dict(os.environ, {"HERMES_HOME": temp_home}, clear=False): 1843 first = FeishuAdapter(PlatformConfig()) 1844 self.assertFalse(first._is_duplicate("om_same")) 1845 second = FeishuAdapter(PlatformConfig()) 1846 self.assertTrue(second._is_duplicate("om_same")) 1847 1848 @patch.dict(os.environ, {}, clear=True) 1849 def test_process_inbound_group_message_keeps_group_type_when_chat_lookup_falls_back(self): 1850 from gateway.config import PlatformConfig 1851 from gateway.platforms.feishu import FeishuAdapter 1852 1853 adapter = FeishuAdapter(PlatformConfig()) 1854 adapter._dispatch_inbound_event = AsyncMock() 1855 adapter.get_chat_info = AsyncMock( 1856 return_value={"chat_id": "oc_group", "name": "oc_group", "type": "dm"} 1857 ) 1858 adapter._resolve_sender_profile = AsyncMock( 1859 return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None} 1860 ) 1861 message = SimpleNamespace( 1862 chat_id="oc_group", 1863 thread_id=None, 1864 message_type="text", 1865 content='{"text":"hello group"}', 1866 message_id="om_group_text", 1867 ) 1868 sender_id = SimpleNamespace(open_id="ou_user", user_id=None, union_id=None) 1869 sender = SimpleNamespace(sender_type="user", sender_id=sender_id) 1870 data = SimpleNamespace(event=SimpleNamespace(message=message)) 1871 1872 asyncio.run( 1873 adapter._process_inbound_message( 1874 data=data, 1875 message=message, 1876 sender_id=sender.sender_id, 1877 chat_type="group", 1878 message_id="om_group_text", 1879 ) 1880 ) 1881 1882 event = adapter._dispatch_inbound_event.await_args.args[0] 1883 self.assertEqual(event.source.chat_type, "group") 1884 1885 @patch.dict(os.environ, {}, clear=True) 1886 def test_process_inbound_message_fetches_reply_to_text(self): 1887 from gateway.config import PlatformConfig 1888 from gateway.platforms.feishu import FeishuAdapter 1889 1890 adapter = FeishuAdapter(PlatformConfig()) 1891 adapter._dispatch_inbound_event = AsyncMock() 1892 adapter.get_chat_info = AsyncMock( 1893 return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"} 1894 ) 1895 adapter._resolve_sender_profile = AsyncMock( 1896 return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None} 1897 ) 1898 adapter._fetch_message_text = AsyncMock(return_value="父消息内容") 1899 message = SimpleNamespace( 1900 chat_id="oc_chat", 1901 thread_id=None, 1902 parent_id="om_parent", 1903 upper_message_id=None, 1904 message_type="text", 1905 content='{"text":"reply"}', 1906 message_id="om_reply", 1907 ) 1908 1909 asyncio.run( 1910 adapter._process_inbound_message( 1911 data=SimpleNamespace(event=SimpleNamespace(message=message)), 1912 message=message, 1913 sender_id=SimpleNamespace(open_id="ou_user", user_id=None, union_id=None), 1914 is_bot=False, 1915 chat_type="p2p", 1916 message_id="om_reply", 1917 ) 1918 ) 1919 1920 event = adapter._dispatch_inbound_event.await_args.args[0] 1921 self.assertEqual(event.reply_to_message_id, "om_parent") 1922 self.assertEqual(event.reply_to_text, "父消息内容") 1923 1924 @patch.dict(os.environ, {}, clear=True) 1925 def test_send_replies_in_thread_when_thread_metadata_present(self): 1926 from gateway.config import PlatformConfig 1927 from gateway.platforms.feishu import FeishuAdapter 1928 1929 adapter = FeishuAdapter(PlatformConfig()) 1930 captured = {} 1931 1932 class _ReplyAPI: 1933 def reply(self, request): 1934 captured["request"] = request 1935 return SimpleNamespace( 1936 success=lambda: True, 1937 data=SimpleNamespace(message_id="om_reply"), 1938 ) 1939 1940 adapter._client = SimpleNamespace( 1941 im=SimpleNamespace( 1942 v1=SimpleNamespace( 1943 message=_ReplyAPI(), 1944 ) 1945 ) 1946 ) 1947 1948 async def _direct(func, *args, **kwargs): 1949 return func(*args, **kwargs) 1950 1951 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 1952 result = asyncio.run( 1953 adapter.send( 1954 chat_id="oc_chat", 1955 content="hello", 1956 reply_to="om_parent", 1957 metadata={"thread_id": "omt-thread"}, 1958 ) 1959 ) 1960 1961 self.assertTrue(result.success) 1962 self.assertEqual(result.message_id, "om_reply") 1963 self.assertTrue(captured["request"].request_body.reply_in_thread) 1964 1965 @patch.dict(os.environ, {}, clear=True) 1966 def test_send_retries_transient_failure(self): 1967 from gateway.config import PlatformConfig 1968 from gateway.platforms.feishu import FeishuAdapter 1969 1970 adapter = FeishuAdapter(PlatformConfig()) 1971 captured = {"attempts": 0} 1972 sleeps = [] 1973 1974 class _MessageAPI: 1975 def create(self, request): 1976 captured["attempts"] += 1 1977 captured["request"] = request 1978 if captured["attempts"] == 1: 1979 raise OSError("temporary send failure") 1980 return SimpleNamespace( 1981 success=lambda: True, 1982 data=SimpleNamespace(message_id="om_retry"), 1983 ) 1984 1985 adapter._client = SimpleNamespace( 1986 im=SimpleNamespace( 1987 v1=SimpleNamespace( 1988 message=_MessageAPI(), 1989 ) 1990 ) 1991 ) 1992 1993 async def _direct(func, *args, **kwargs): 1994 return func(*args, **kwargs) 1995 1996 async def _sleep(delay): 1997 sleeps.append(delay) 1998 1999 with ( 2000 patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct), 2001 patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep), 2002 ): 2003 result = asyncio.run(adapter.send(chat_id="oc_chat", content="hello retry")) 2004 2005 self.assertTrue(result.success) 2006 self.assertEqual(result.message_id, "om_retry") 2007 self.assertEqual(captured["attempts"], 2) 2008 self.assertEqual(sleeps, [1]) 2009 2010 @patch.dict(os.environ, {}, clear=True) 2011 def test_send_does_not_retry_deterministic_api_failure(self): 2012 from gateway.config import PlatformConfig 2013 from gateway.platforms.feishu import FeishuAdapter 2014 2015 adapter = FeishuAdapter(PlatformConfig()) 2016 captured = {"attempts": 0} 2017 sleeps = [] 2018 2019 class _MessageAPI: 2020 def create(self, request): 2021 captured["attempts"] += 1 2022 return SimpleNamespace( 2023 success=lambda: False, 2024 code=400, 2025 msg="bad request", 2026 ) 2027 2028 adapter._client = SimpleNamespace( 2029 im=SimpleNamespace( 2030 v1=SimpleNamespace( 2031 message=_MessageAPI(), 2032 ) 2033 ) 2034 ) 2035 2036 async def _direct(func, *args, **kwargs): 2037 return func(*args, **kwargs) 2038 2039 async def _sleep(delay): 2040 sleeps.append(delay) 2041 2042 with ( 2043 patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct), 2044 patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep), 2045 ): 2046 result = asyncio.run(adapter.send(chat_id="oc_chat", content="bad payload")) 2047 2048 self.assertFalse(result.success) 2049 self.assertEqual(result.error, "[400] bad request") 2050 self.assertEqual(captured["attempts"], 1) 2051 self.assertEqual(sleeps, []) 2052 2053 @patch.dict(os.environ, {}, clear=True) 2054 def test_send_document_reply_uses_thread_flag(self): 2055 from gateway.config import PlatformConfig 2056 from gateway.platforms.feishu import FeishuAdapter 2057 2058 adapter = FeishuAdapter(PlatformConfig()) 2059 captured = {} 2060 2061 class _FileAPI: 2062 def create(self, request): 2063 return SimpleNamespace( 2064 success=lambda: True, 2065 data=SimpleNamespace(file_key="file_123"), 2066 ) 2067 2068 class _MessageAPI: 2069 def reply(self, request): 2070 captured["request"] = request 2071 return SimpleNamespace( 2072 success=lambda: True, 2073 data=SimpleNamespace(message_id="om_file_reply"), 2074 ) 2075 2076 adapter._client = SimpleNamespace( 2077 im=SimpleNamespace( 2078 v1=SimpleNamespace( 2079 file=_FileAPI(), 2080 message=_MessageAPI(), 2081 ) 2082 ) 2083 ) 2084 2085 async def _direct(func, *args, **kwargs): 2086 return func(*args, **kwargs) 2087 2088 with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp: 2089 tmp.write(b"%PDF-1.4 test") 2090 file_path = tmp.name 2091 2092 try: 2093 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2094 result = asyncio.run( 2095 adapter.send_document( 2096 chat_id="oc_chat", 2097 file_path=file_path, 2098 reply_to="om_parent", 2099 metadata={"thread_id": "omt-thread"}, 2100 ) 2101 ) 2102 finally: 2103 os.unlink(file_path) 2104 2105 self.assertTrue(result.success) 2106 self.assertTrue(captured["request"].request_body.reply_in_thread) 2107 2108 @patch.dict(os.environ, {}, clear=True) 2109 def test_send_document_uploads_file_and_sends_file_message(self): 2110 from gateway.config import PlatformConfig 2111 from gateway.platforms.feishu import FeishuAdapter 2112 2113 adapter = FeishuAdapter(PlatformConfig()) 2114 captured = {} 2115 2116 class _FileAPI: 2117 def create(self, request): 2118 captured["upload_request"] = request 2119 return SimpleNamespace( 2120 success=lambda: True, 2121 data=SimpleNamespace(file_key="file_123"), 2122 ) 2123 2124 class _MessageAPI: 2125 def create(self, request): 2126 captured["message_request"] = request 2127 return SimpleNamespace( 2128 success=lambda: True, 2129 data=SimpleNamespace(message_id="om_file_msg"), 2130 ) 2131 2132 adapter._client = SimpleNamespace( 2133 im=SimpleNamespace( 2134 v1=SimpleNamespace( 2135 file=_FileAPI(), 2136 message=_MessageAPI(), 2137 ) 2138 ) 2139 ) 2140 2141 async def _direct(func, *args, **kwargs): 2142 return func(*args, **kwargs) 2143 2144 with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp: 2145 tmp.write(b"%PDF-1.4 test") 2146 file_path = tmp.name 2147 2148 try: 2149 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2150 result = asyncio.run(adapter.send_document(chat_id="oc_chat", file_path=file_path)) 2151 finally: 2152 os.unlink(file_path) 2153 2154 self.assertTrue(result.success) 2155 self.assertEqual(result.message_id, "om_file_msg") 2156 self.assertEqual(captured["upload_request"].request_body.file_type, "pdf") 2157 self.assertEqual( 2158 captured["message_request"].request_body.content, 2159 '{"file_key": "file_123"}', 2160 ) 2161 2162 @patch.dict(os.environ, {}, clear=True) 2163 def test_send_document_with_caption_uses_single_post_message(self): 2164 from gateway.config import PlatformConfig 2165 from gateway.platforms.feishu import FeishuAdapter 2166 2167 adapter = FeishuAdapter(PlatformConfig()) 2168 captured = {} 2169 2170 class _FileAPI: 2171 def create(self, request): 2172 return SimpleNamespace( 2173 success=lambda: True, 2174 data=SimpleNamespace(file_key="file_123"), 2175 ) 2176 2177 class _MessageAPI: 2178 def create(self, request): 2179 captured["message_request"] = request 2180 return SimpleNamespace( 2181 success=lambda: True, 2182 data=SimpleNamespace(message_id="om_post_msg"), 2183 ) 2184 2185 adapter._client = SimpleNamespace( 2186 im=SimpleNamespace( 2187 v1=SimpleNamespace( 2188 file=_FileAPI(), 2189 message=_MessageAPI(), 2190 ) 2191 ) 2192 ) 2193 2194 async def _direct(func, *args, **kwargs): 2195 return func(*args, **kwargs) 2196 2197 with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp: 2198 tmp.write(b"%PDF-1.4 test") 2199 file_path = tmp.name 2200 2201 try: 2202 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2203 result = asyncio.run( 2204 adapter.send_document(chat_id="oc_chat", file_path=file_path, caption="报告请看") 2205 ) 2206 finally: 2207 os.unlink(file_path) 2208 2209 self.assertTrue(result.success) 2210 self.assertEqual(captured["message_request"].request_body.msg_type, "post") 2211 self.assertIn('"tag": "media"', captured["message_request"].request_body.content) 2212 self.assertIn('"file_key": "file_123"', captured["message_request"].request_body.content) 2213 self.assertIn("报告请看", captured["message_request"].request_body.content) 2214 2215 @patch.dict(os.environ, {}, clear=True) 2216 def test_send_image_file_uploads_image_and_sends_image_message(self): 2217 from gateway.config import PlatformConfig 2218 from gateway.platforms.feishu import FeishuAdapter 2219 2220 adapter = FeishuAdapter(PlatformConfig()) 2221 captured = {} 2222 2223 class _ImageAPI: 2224 def create(self, request): 2225 captured["upload_request"] = request 2226 return SimpleNamespace( 2227 success=lambda: True, 2228 data=SimpleNamespace(image_key="img_123"), 2229 ) 2230 2231 class _MessageAPI: 2232 def create(self, request): 2233 captured["message_request"] = request 2234 return SimpleNamespace( 2235 success=lambda: True, 2236 data=SimpleNamespace(message_id="om_image_msg"), 2237 ) 2238 2239 adapter._client = SimpleNamespace( 2240 im=SimpleNamespace( 2241 v1=SimpleNamespace( 2242 image=_ImageAPI(), 2243 message=_MessageAPI(), 2244 ) 2245 ) 2246 ) 2247 2248 async def _direct(func, *args, **kwargs): 2249 return func(*args, **kwargs) 2250 2251 with tempfile.NamedTemporaryFile("wb", suffix=".png", delete=False) as tmp: 2252 tmp.write(b"\x89PNG\r\n\x1a\n") 2253 image_path = tmp.name 2254 2255 try: 2256 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2257 result = asyncio.run(adapter.send_image_file(chat_id="oc_chat", image_path=image_path)) 2258 finally: 2259 os.unlink(image_path) 2260 2261 self.assertTrue(result.success) 2262 self.assertEqual(result.message_id, "om_image_msg") 2263 self.assertEqual(captured["upload_request"].request_body.image_type, "message") 2264 self.assertEqual( 2265 captured["message_request"].request_body.content, 2266 '{"image_key": "img_123"}', 2267 ) 2268 2269 @patch.dict(os.environ, {}, clear=True) 2270 def test_send_image_file_with_caption_uses_single_post_message(self): 2271 from gateway.config import PlatformConfig 2272 from gateway.platforms.feishu import FeishuAdapter 2273 2274 adapter = FeishuAdapter(PlatformConfig()) 2275 captured = {} 2276 2277 class _ImageAPI: 2278 def create(self, request): 2279 return SimpleNamespace( 2280 success=lambda: True, 2281 data=SimpleNamespace(image_key="img_123"), 2282 ) 2283 2284 class _MessageAPI: 2285 def create(self, request): 2286 captured["message_request"] = request 2287 return SimpleNamespace( 2288 success=lambda: True, 2289 data=SimpleNamespace(message_id="om_post_img"), 2290 ) 2291 2292 adapter._client = SimpleNamespace( 2293 im=SimpleNamespace( 2294 v1=SimpleNamespace( 2295 image=_ImageAPI(), 2296 message=_MessageAPI(), 2297 ) 2298 ) 2299 ) 2300 2301 async def _direct(func, *args, **kwargs): 2302 return func(*args, **kwargs) 2303 2304 with tempfile.NamedTemporaryFile("wb", suffix=".png", delete=False) as tmp: 2305 tmp.write(b"\x89PNG\r\n\x1a\n") 2306 image_path = tmp.name 2307 2308 try: 2309 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2310 result = asyncio.run( 2311 adapter.send_image_file(chat_id="oc_chat", image_path=image_path, caption="截图说明") 2312 ) 2313 finally: 2314 os.unlink(image_path) 2315 2316 self.assertTrue(result.success) 2317 self.assertEqual(captured["message_request"].request_body.msg_type, "post") 2318 self.assertIn('"tag": "img"', captured["message_request"].request_body.content) 2319 self.assertIn('"image_key": "img_123"', captured["message_request"].request_body.content) 2320 self.assertIn("截图说明", captured["message_request"].request_body.content) 2321 2322 @patch.dict(os.environ, {}, clear=True) 2323 def test_send_video_uploads_file_and_sends_media_message(self): 2324 from gateway.config import PlatformConfig 2325 from gateway.platforms.feishu import FeishuAdapter 2326 2327 adapter = FeishuAdapter(PlatformConfig()) 2328 captured = {} 2329 2330 class _FileAPI: 2331 def create(self, request): 2332 captured["upload_request"] = request 2333 return SimpleNamespace( 2334 success=lambda: True, 2335 data=SimpleNamespace(file_key="file_video_123"), 2336 ) 2337 2338 class _MessageAPI: 2339 def create(self, request): 2340 captured["message_request"] = request 2341 return SimpleNamespace( 2342 success=lambda: True, 2343 data=SimpleNamespace(message_id="om_video_msg"), 2344 ) 2345 2346 adapter._client = SimpleNamespace( 2347 im=SimpleNamespace( 2348 v1=SimpleNamespace( 2349 file=_FileAPI(), 2350 message=_MessageAPI(), 2351 ) 2352 ) 2353 ) 2354 2355 async def _direct(func, *args, **kwargs): 2356 return func(*args, **kwargs) 2357 2358 with tempfile.NamedTemporaryFile("wb", suffix=".mp4", delete=False) as tmp: 2359 tmp.write(b"\x00\x00\x00\x18ftypmp42") 2360 video_path = tmp.name 2361 2362 try: 2363 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2364 result = asyncio.run(adapter.send_video(chat_id="oc_chat", video_path=video_path)) 2365 finally: 2366 os.unlink(video_path) 2367 2368 self.assertTrue(result.success) 2369 self.assertEqual(captured["upload_request"].request_body.file_type, "mp4") 2370 self.assertEqual(captured["message_request"].request_body.msg_type, "media") 2371 self.assertEqual(captured["message_request"].request_body.content, '{"file_key": "file_video_123"}') 2372 2373 @patch.dict(os.environ, {}, clear=True) 2374 def test_send_voice_uploads_opus_and_sends_audio_message(self): 2375 from gateway.config import PlatformConfig 2376 from gateway.platforms.feishu import FeishuAdapter 2377 2378 adapter = FeishuAdapter(PlatformConfig()) 2379 captured = {} 2380 2381 class _FileAPI: 2382 def create(self, request): 2383 captured["upload_request"] = request 2384 return SimpleNamespace( 2385 success=lambda: True, 2386 data=SimpleNamespace(file_key="file_audio_123"), 2387 ) 2388 2389 class _MessageAPI: 2390 def create(self, request): 2391 captured["message_request"] = request 2392 return SimpleNamespace( 2393 success=lambda: True, 2394 data=SimpleNamespace(message_id="om_audio_msg"), 2395 ) 2396 2397 adapter._client = SimpleNamespace( 2398 im=SimpleNamespace( 2399 v1=SimpleNamespace( 2400 file=_FileAPI(), 2401 message=_MessageAPI(), 2402 ) 2403 ) 2404 ) 2405 2406 async def _direct(func, *args, **kwargs): 2407 return func(*args, **kwargs) 2408 2409 with tempfile.NamedTemporaryFile("wb", suffix=".opus", delete=False) as tmp: 2410 tmp.write(b"opus") 2411 audio_path = tmp.name 2412 2413 try: 2414 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2415 result = asyncio.run(adapter.send_voice(chat_id="oc_chat", audio_path=audio_path)) 2416 finally: 2417 os.unlink(audio_path) 2418 2419 self.assertTrue(result.success) 2420 self.assertEqual(captured["upload_request"].request_body.file_type, "opus") 2421 self.assertEqual(captured["message_request"].request_body.msg_type, "audio") 2422 self.assertEqual(captured["message_request"].request_body.content, '{"file_key": "file_audio_123"}') 2423 2424 @patch.dict(os.environ, {}, clear=True) 2425 def test_build_post_payload_extracts_title_and_links(self): 2426 from gateway.config import PlatformConfig 2427 from gateway.platforms.feishu import FeishuAdapter 2428 2429 adapter = FeishuAdapter(PlatformConfig()) 2430 payload = json.loads(adapter._build_post_payload("# 标题\n访问 [文档](https://example.com)")) 2431 2432 elements = payload["zh_cn"]["content"][0] 2433 self.assertEqual(elements, [{"tag": "md", "text": "# 标题\n访问 [文档](https://example.com)"}]) 2434 2435 @patch.dict(os.environ, {}, clear=True) 2436 def test_build_post_payload_wraps_markdown_in_md_tag(self): 2437 from gateway.config import PlatformConfig 2438 from gateway.platforms.feishu import FeishuAdapter 2439 2440 adapter = FeishuAdapter(PlatformConfig()) 2441 payload = json.loads( 2442 adapter._build_post_payload("支持 **粗体**、*斜体* 和 `代码`") 2443 ) 2444 2445 elements = payload["zh_cn"]["content"][0] 2446 self.assertEqual( 2447 elements, 2448 [ 2449 {"tag": "md", "text": "支持 **粗体**、*斜体* 和 `代码`"}, 2450 ], 2451 ) 2452 2453 @patch.dict(os.environ, {}, clear=True) 2454 def test_build_post_payload_keeps_full_markdown_text(self): 2455 from gateway.config import PlatformConfig 2456 from gateway.platforms.feishu import FeishuAdapter 2457 2458 adapter = FeishuAdapter(PlatformConfig()) 2459 payload = json.loads( 2460 adapter._build_post_payload( 2461 "---\n1. 第一项\n 2. 子项\n- 外层\n - 内层\n<u>下划线</u> 和 ~~删除线~~" 2462 ) 2463 ) 2464 2465 rows = payload["zh_cn"]["content"] 2466 self.assertEqual( 2467 rows, 2468 [[{"tag": "md", "text": "---\n1. 第一项\n 2. 子项\n- 外层\n - 内层\n<u>下划线</u> 和 ~~删除线~~"}]], 2469 ) 2470 2471 @patch.dict(os.environ, {}, clear=True) 2472 def test_send_uses_post_for_inline_markdown(self): 2473 from gateway.config import PlatformConfig 2474 from gateway.platforms.feishu import FeishuAdapter 2475 2476 adapter = FeishuAdapter(PlatformConfig()) 2477 captured = {} 2478 2479 class _MessageAPI: 2480 def create(self, request): 2481 captured["request"] = request 2482 return SimpleNamespace( 2483 success=lambda: True, 2484 data=SimpleNamespace(message_id="om_markdown"), 2485 ) 2486 2487 adapter._client = SimpleNamespace( 2488 im=SimpleNamespace( 2489 v1=SimpleNamespace( 2490 message=_MessageAPI(), 2491 ) 2492 ) 2493 ) 2494 2495 async def _direct(func, *args, **kwargs): 2496 return func(*args, **kwargs) 2497 2498 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2499 result = asyncio.run( 2500 adapter.send( 2501 chat_id="oc_chat", 2502 content="可以用 **粗体** 和 *斜体*。", 2503 ) 2504 ) 2505 2506 self.assertTrue(result.success) 2507 self.assertEqual(captured["request"].request_body.msg_type, "post") 2508 payload = json.loads(captured["request"].request_body.content) 2509 elements = payload["zh_cn"]["content"][0] 2510 self.assertEqual(elements, [{"tag": "md", "text": "可以用 **粗体** 和 *斜体*。"}]) 2511 2512 @patch.dict(os.environ, {}, clear=True) 2513 def test_send_splits_fenced_code_blocks_into_separate_post_rows(self): 2514 from gateway.config import PlatformConfig 2515 from gateway.platforms.feishu import FeishuAdapter 2516 2517 adapter = FeishuAdapter(PlatformConfig()) 2518 captured = {} 2519 2520 class _MessageAPI: 2521 def create(self, request): 2522 captured["request"] = request 2523 return SimpleNamespace( 2524 success=lambda: True, 2525 data=SimpleNamespace(message_id="om_codeblock"), 2526 ) 2527 2528 adapter._client = SimpleNamespace( 2529 im=SimpleNamespace( 2530 v1=SimpleNamespace( 2531 message=_MessageAPI(), 2532 ) 2533 ) 2534 ) 2535 2536 async def _direct(func, *args, **kwargs): 2537 return func(*args, **kwargs) 2538 2539 content = ( 2540 "确认已入库 ✓\n" 2541 "文件路径:`/root/.hermes/profiles/agent_cto/cron/jobs.json`\n" 2542 "**解码后的内容:**\n" 2543 "```json\n" 2544 '{"cron": "list"}\n' 2545 "```\n" 2546 "后续说明仍应保留。" 2547 ) 2548 2549 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2550 result = asyncio.run( 2551 adapter.send( 2552 chat_id="oc_chat", 2553 content=content, 2554 ) 2555 ) 2556 2557 self.assertTrue(result.success) 2558 self.assertEqual(captured["request"].request_body.msg_type, "post") 2559 payload = json.loads(captured["request"].request_body.content) 2560 rows = payload["zh_cn"]["content"] 2561 self.assertEqual( 2562 rows, 2563 [ 2564 [ 2565 { 2566 "tag": "md", 2567 "text": "确认已入库 ✓\n文件路径:`/root/.hermes/profiles/agent_cto/cron/jobs.json`\n**解码后的内容:**", 2568 } 2569 ], 2570 [{"tag": "md", "text": "```json\n{\"cron\": \"list\"}\n```"}], 2571 [{"tag": "md", "text": "后续说明仍应保留。"}], 2572 ], 2573 ) 2574 2575 @patch.dict(os.environ, {}, clear=True) 2576 def test_build_post_payload_keeps_fence_like_code_lines_inside_code_block(self): 2577 from gateway.config import PlatformConfig 2578 from gateway.platforms.feishu import FeishuAdapter 2579 2580 adapter = FeishuAdapter(PlatformConfig()) 2581 payload = json.loads( 2582 adapter._build_post_payload( 2583 "before\n```python\n```oops\n```\nafter" 2584 ) 2585 ) 2586 2587 self.assertEqual( 2588 payload["zh_cn"]["content"], 2589 [ 2590 [{"tag": "md", "text": "before"}], 2591 [{"tag": "md", "text": "```python\n```oops\n```"}], 2592 [{"tag": "md", "text": "after"}], 2593 ], 2594 ) 2595 2596 @patch.dict(os.environ, {}, clear=True) 2597 def test_build_post_payload_preserves_trailing_spaces_in_code_block(self): 2598 from gateway.config import PlatformConfig 2599 from gateway.platforms.feishu import FeishuAdapter 2600 2601 adapter = FeishuAdapter(PlatformConfig()) 2602 payload = json.loads( 2603 adapter._build_post_payload( 2604 "before\n```python\nline with two spaces \n```\nafter" 2605 ) 2606 ) 2607 2608 self.assertEqual( 2609 payload["zh_cn"]["content"], 2610 [ 2611 [{"tag": "md", "text": "before"}], 2612 [{"tag": "md", "text": "```python\nline with two spaces \n```"}], 2613 [{"tag": "md", "text": "after"}], 2614 ], 2615 ) 2616 2617 @patch.dict(os.environ, {}, clear=True) 2618 def test_build_post_payload_splits_multiple_fenced_code_blocks(self): 2619 from gateway.config import PlatformConfig 2620 from gateway.platforms.feishu import FeishuAdapter 2621 2622 adapter = FeishuAdapter(PlatformConfig()) 2623 payload = json.loads( 2624 adapter._build_post_payload( 2625 "before\n```python\nprint(1)\n```\nmiddle\n```json\n{}\n```\nafter" 2626 ) 2627 ) 2628 2629 self.assertEqual( 2630 payload["zh_cn"]["content"], 2631 [ 2632 [{"tag": "md", "text": "before"}], 2633 [{"tag": "md", "text": "```python\nprint(1)\n```"}], 2634 [{"tag": "md", "text": "middle"}], 2635 [{"tag": "md", "text": "```json\n{}\n```"}], 2636 [{"tag": "md", "text": "after"}], 2637 ], 2638 ) 2639 2640 @patch.dict(os.environ, {}, clear=True) 2641 def test_send_falls_back_to_text_when_post_payload_is_rejected(self): 2642 from gateway.config import PlatformConfig 2643 from gateway.platforms.feishu import FeishuAdapter 2644 2645 adapter = FeishuAdapter(PlatformConfig()) 2646 captured = {"calls": []} 2647 2648 class _MessageAPI: 2649 def create(self, request): 2650 captured["calls"].append(request) 2651 if len(captured["calls"]) == 1: 2652 raise RuntimeError("content format of the post type is incorrect") 2653 return SimpleNamespace( 2654 success=lambda: True, 2655 data=SimpleNamespace(message_id="om_plain"), 2656 ) 2657 2658 adapter._client = SimpleNamespace( 2659 im=SimpleNamespace( 2660 v1=SimpleNamespace( 2661 message=_MessageAPI(), 2662 ) 2663 ) 2664 ) 2665 2666 async def _direct(func, *args, **kwargs): 2667 return func(*args, **kwargs) 2668 2669 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2670 result = asyncio.run( 2671 adapter.send( 2672 chat_id="oc_chat", 2673 content="可以用 **粗体** 和 *斜体*。", 2674 ) 2675 ) 2676 2677 self.assertTrue(result.success) 2678 self.assertEqual(captured["calls"][0].request_body.msg_type, "post") 2679 self.assertEqual(captured["calls"][1].request_body.msg_type, "text") 2680 self.assertEqual( 2681 captured["calls"][1].request_body.content, 2682 json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False), 2683 ) 2684 2685 @patch.dict(os.environ, {}, clear=True) 2686 def test_send_falls_back_to_text_when_post_response_is_unsuccessful(self): 2687 from gateway.config import PlatformConfig 2688 from gateway.platforms.feishu import FeishuAdapter 2689 2690 adapter = FeishuAdapter(PlatformConfig()) 2691 captured = {"calls": []} 2692 2693 class _MessageAPI: 2694 def create(self, request): 2695 captured["calls"].append(request) 2696 if len(captured["calls"]) == 1: 2697 return SimpleNamespace(success=lambda: False, code=230001, msg="content format of the post type is incorrect") 2698 return SimpleNamespace( 2699 success=lambda: True, 2700 data=SimpleNamespace(message_id="om_plain_response"), 2701 ) 2702 2703 adapter._client = SimpleNamespace( 2704 im=SimpleNamespace( 2705 v1=SimpleNamespace( 2706 message=_MessageAPI(), 2707 ) 2708 ) 2709 ) 2710 2711 async def _direct(func, *args, **kwargs): 2712 return func(*args, **kwargs) 2713 2714 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2715 result = asyncio.run( 2716 adapter.send( 2717 chat_id="oc_chat", 2718 content="可以用 **粗体** 和 *斜体*。", 2719 ) 2720 ) 2721 2722 self.assertTrue(result.success) 2723 self.assertEqual(captured["calls"][0].request_body.msg_type, "post") 2724 self.assertEqual(captured["calls"][1].request_body.msg_type, "text") 2725 self.assertEqual( 2726 captured["calls"][1].request_body.content, 2727 json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False), 2728 ) 2729 2730 @patch.dict(os.environ, {}, clear=True) 2731 def test_send_uses_post_for_advanced_markdown_lines(self): 2732 from gateway.config import PlatformConfig 2733 from gateway.platforms.feishu import FeishuAdapter 2734 2735 adapter = FeishuAdapter(PlatformConfig()) 2736 captured = {} 2737 2738 class _MessageAPI: 2739 def create(self, request): 2740 captured["request"] = request 2741 return SimpleNamespace( 2742 success=lambda: True, 2743 data=SimpleNamespace(message_id="om_markdown_advanced"), 2744 ) 2745 2746 adapter._client = SimpleNamespace( 2747 im=SimpleNamespace( 2748 v1=SimpleNamespace( 2749 message=_MessageAPI(), 2750 ) 2751 ) 2752 ) 2753 2754 async def _direct(func, *args, **kwargs): 2755 return func(*args, **kwargs) 2756 2757 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 2758 result = asyncio.run( 2759 adapter.send( 2760 chat_id="oc_chat", 2761 content="---\n1. 第一项\n<u>下划线</u>\n~~删除线~~", 2762 ) 2763 ) 2764 2765 self.assertTrue(result.success) 2766 self.assertEqual(captured["request"].request_body.msg_type, "post") 2767 payload = json.loads(captured["request"].request_body.content) 2768 rows = payload["zh_cn"]["content"] 2769 self.assertEqual( 2770 rows, 2771 [[{"tag": "md", "text": "---\n1. 第一项\n<u>下划线</u>\n~~删除线~~"}]], 2772 ) 2773 2774 2775 @unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") 2776 class TestHydrateBotIdentity(unittest.TestCase): 2777 """Hydration of bot identity via ``/open-apis/bot/v3/info``. 2778 2779 Covers the manual-setup path where ``FEISHU_BOT_OPEN_ID`` / 2780 ``FEISHU_BOT_NAME`` are not configured — hydration populates them so 2781 self-echo protection and group @mention gating both have something to 2782 match against. 2783 """ 2784 2785 def _make_adapter(self): 2786 from gateway.config import PlatformConfig 2787 from gateway.platforms.feishu import FeishuAdapter 2788 2789 return FeishuAdapter(PlatformConfig()) 2790 2791 @patch.dict(os.environ, {}, clear=True) 2792 def test_hydration_populates_open_id_from_bot_info(self): 2793 adapter = self._make_adapter() 2794 adapter._client = Mock() 2795 payload = json.dumps( 2796 { 2797 "code": 0, 2798 "bot": { 2799 "bot_name": "Hermes Bot", 2800 "open_id": "ou_hermes_hydrated", 2801 }, 2802 } 2803 ).encode("utf-8") 2804 response = SimpleNamespace(raw=SimpleNamespace(content=payload)) 2805 adapter._client.request = Mock(return_value=response) 2806 2807 asyncio.run(adapter._hydrate_bot_identity()) 2808 2809 self.assertEqual(adapter._bot_open_id, "ou_hermes_hydrated") 2810 self.assertEqual(adapter._bot_name, "Hermes Bot") 2811 2812 @patch.dict( 2813 os.environ, 2814 { 2815 "FEISHU_BOT_OPEN_ID": "ou_env", 2816 "FEISHU_BOT_NAME": "Env Hermes", 2817 }, 2818 clear=True, 2819 ) 2820 def test_hydration_skipped_when_env_vars_supply_both_fields(self): 2821 adapter = self._make_adapter() 2822 adapter._client = Mock() 2823 adapter._client.request = Mock() 2824 2825 asyncio.run(adapter._hydrate_bot_identity()) 2826 2827 adapter._client.request.assert_not_called() 2828 self.assertEqual(adapter._bot_open_id, "ou_env") 2829 self.assertEqual(adapter._bot_name, "Env Hermes") 2830 2831 @patch.dict(os.environ, {"FEISHU_BOT_OPEN_ID": "ou_env"}, clear=True) 2832 def test_hydration_fills_only_missing_fields(self): 2833 """Env-var open_id must NOT be overwritten by a different probe value.""" 2834 adapter = self._make_adapter() 2835 adapter._client = Mock() 2836 payload = json.dumps( 2837 { 2838 "code": 0, 2839 "bot": { 2840 "bot_name": "Hermes Bot", 2841 "open_id": "ou_probe_DIFFERENT", 2842 }, 2843 } 2844 ).encode("utf-8") 2845 adapter._client.request = Mock(return_value=SimpleNamespace(raw=SimpleNamespace(content=payload))) 2846 2847 asyncio.run(adapter._hydrate_bot_identity()) 2848 2849 self.assertEqual(adapter._bot_open_id, "ou_env") # preserved 2850 self.assertEqual(adapter._bot_name, "Hermes Bot") # filled in 2851 2852 @patch.dict(os.environ, {}, clear=True) 2853 def test_hydration_tolerates_probe_failure_and_falls_back_to_app_info(self): 2854 adapter = self._make_adapter() 2855 adapter._client = Mock() 2856 adapter._client.request = Mock(side_effect=RuntimeError("network down")) 2857 2858 # Make the application-info fallback succeed for _bot_name. 2859 app_response = Mock() 2860 app_response.success = Mock(return_value=True) 2861 app_response.data = SimpleNamespace(app=SimpleNamespace(app_name="Fallback Bot")) 2862 adapter._client.application.v6.application.get = Mock(return_value=app_response) 2863 adapter._build_get_application_request = Mock(return_value=object()) 2864 2865 asyncio.run(adapter._hydrate_bot_identity()) 2866 2867 # Primary probe failed — open_id stays empty, but bot_name came from app-info. 2868 self.assertEqual(adapter._bot_open_id, "") 2869 self.assertEqual(adapter._bot_name, "Fallback Bot") 2870 2871 2872 @unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") 2873 class TestPendingInboundQueue(unittest.TestCase): 2874 """Tests for the loop-not-ready race (#5499): inbound events arriving 2875 before or during adapter loop transitions must be queued for replay 2876 rather than silently dropped.""" 2877 2878 @patch.dict(os.environ, {}, clear=True) 2879 def test_event_queued_when_loop_not_ready(self): 2880 from gateway.config import PlatformConfig 2881 from gateway.platforms.feishu import FeishuAdapter 2882 2883 adapter = FeishuAdapter(PlatformConfig()) 2884 adapter._loop = None # Simulate "before start()" or "during reconnect" 2885 2886 with patch("gateway.platforms.feishu.threading.Thread") as thread_cls: 2887 adapter._on_message_event(SimpleNamespace(tag="evt-1")) 2888 adapter._on_message_event(SimpleNamespace(tag="evt-2")) 2889 adapter._on_message_event(SimpleNamespace(tag="evt-3")) 2890 2891 # All three queued, none dropped. 2892 self.assertEqual(len(adapter._pending_inbound_events), 3) 2893 # Only ONE drainer thread scheduled, not one per event. 2894 self.assertEqual(thread_cls.call_count, 1) 2895 # Drain scheduled flag set. 2896 self.assertTrue(adapter._pending_drain_scheduled) 2897 2898 @patch.dict(os.environ, {}, clear=True) 2899 def test_drainer_replays_queued_events_when_loop_becomes_ready(self): 2900 from gateway.config import PlatformConfig 2901 from gateway.platforms.feishu import FeishuAdapter 2902 2903 adapter = FeishuAdapter(PlatformConfig()) 2904 adapter._loop = None 2905 adapter._running = True 2906 2907 class _ReadyLoop: 2908 def is_closed(self): 2909 return False 2910 2911 # Queue three events while loop is None (simulate the race). 2912 events = [SimpleNamespace(tag=f"evt-{i}") for i in range(3)] 2913 with patch("gateway.platforms.feishu.threading.Thread"): 2914 for ev in events: 2915 adapter._on_message_event(ev) 2916 2917 self.assertEqual(len(adapter._pending_inbound_events), 3) 2918 2919 # Now the loop becomes ready; run the drainer inline (not as a thread) 2920 # to verify it replays the queue. 2921 adapter._loop = _ReadyLoop() 2922 2923 future = SimpleNamespace(add_done_callback=lambda *_a, **_kw: None) 2924 submitted: list = [] 2925 2926 def _submit(coro, _loop): 2927 submitted.append(coro) 2928 coro.close() 2929 return future 2930 2931 with patch( 2932 "gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", 2933 side_effect=_submit, 2934 ) as submit: 2935 adapter._drain_pending_inbound_events() 2936 2937 # All three events dispatched to the loop. 2938 self.assertEqual(submit.call_count, 3) 2939 # Queue emptied. 2940 self.assertEqual(len(adapter._pending_inbound_events), 0) 2941 # Drain flag reset so a future race can schedule a new drainer. 2942 self.assertFalse(adapter._pending_drain_scheduled) 2943 2944 @patch.dict(os.environ, {}, clear=True) 2945 def test_drainer_drops_queue_when_adapter_shuts_down(self): 2946 from gateway.config import PlatformConfig 2947 from gateway.platforms.feishu import FeishuAdapter 2948 2949 adapter = FeishuAdapter(PlatformConfig()) 2950 adapter._loop = None 2951 adapter._running = False # Shutdown state 2952 2953 with patch("gateway.platforms.feishu.threading.Thread"): 2954 adapter._on_message_event(SimpleNamespace(tag="evt-lost")) 2955 2956 self.assertEqual(len(adapter._pending_inbound_events), 1) 2957 2958 # Drainer should drop the queue immediately since _running is False. 2959 adapter._drain_pending_inbound_events() 2960 2961 self.assertEqual(len(adapter._pending_inbound_events), 0) 2962 self.assertFalse(adapter._pending_drain_scheduled) 2963 2964 @patch.dict(os.environ, {}, clear=True) 2965 def test_queue_cap_evicts_oldest_beyond_max_depth(self): 2966 from gateway.config import PlatformConfig 2967 from gateway.platforms.feishu import FeishuAdapter 2968 2969 adapter = FeishuAdapter(PlatformConfig()) 2970 adapter._loop = None 2971 adapter._pending_inbound_max_depth = 3 # Shrink for test 2972 2973 with patch("gateway.platforms.feishu.threading.Thread"): 2974 for i in range(5): 2975 adapter._on_message_event(SimpleNamespace(tag=f"evt-{i}")) 2976 2977 # Only the last 3 should remain; evt-0 and evt-1 dropped. 2978 self.assertEqual(len(adapter._pending_inbound_events), 3) 2979 tags = [getattr(e, "tag", None) for e in adapter._pending_inbound_events] 2980 self.assertEqual(tags, ["evt-2", "evt-3", "evt-4"]) 2981 2982 @patch.dict(os.environ, {}, clear=True) 2983 def test_normal_path_unchanged_when_loop_ready(self): 2984 """When the loop is ready, events should dispatch directly without 2985 ever touching the pending queue.""" 2986 from gateway.config import PlatformConfig 2987 from gateway.platforms.feishu import FeishuAdapter 2988 2989 adapter = FeishuAdapter(PlatformConfig()) 2990 2991 class _ReadyLoop: 2992 def is_closed(self): 2993 return False 2994 2995 adapter._loop = _ReadyLoop() 2996 2997 future = SimpleNamespace(add_done_callback=lambda *_a, **_kw: None) 2998 2999 def _submit(coro, _loop): 3000 coro.close() 3001 return future 3002 3003 with patch( 3004 "gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", 3005 side_effect=_submit, 3006 ) as submit, patch( 3007 "gateway.platforms.feishu.threading.Thread" 3008 ) as thread_cls: 3009 adapter._on_message_event(SimpleNamespace(tag="evt")) 3010 3011 self.assertEqual(submit.call_count, 1) 3012 self.assertEqual(len(adapter._pending_inbound_events), 0) 3013 self.assertFalse(adapter._pending_drain_scheduled) 3014 # No drainer thread spawned when the happy path runs. 3015 self.assertEqual(thread_cls.call_count, 0) 3016 3017 3018 @unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") 3019 class TestWebhookSecurity(unittest.TestCase): 3020 """Tests for webhook signature verification, rate limiting, and body size limits.""" 3021 3022 def _make_adapter(self, encrypt_key: str = "") -> "FeishuAdapter": 3023 from gateway.config import PlatformConfig 3024 from gateway.platforms.feishu import FeishuAdapter 3025 3026 with patch.dict(os.environ, {"FEISHU_APP_ID": "cli", "FEISHU_APP_SECRET": "sec", "FEISHU_ENCRYPT_KEY": encrypt_key}, clear=True): 3027 return FeishuAdapter(PlatformConfig()) 3028 3029 def test_signature_valid_passes(self): 3030 import hashlib 3031 from gateway.platforms.feishu import FeishuAdapter 3032 from gateway.config import PlatformConfig 3033 3034 encrypt_key = "test_secret" 3035 adapter = self._make_adapter(encrypt_key) 3036 body = b'{"type":"event"}' 3037 timestamp = "1700000000" 3038 nonce = "abc123" 3039 content = f"{timestamp}{nonce}{encrypt_key}" + body.decode("utf-8") 3040 sig = hashlib.sha256(content.encode("utf-8")).hexdigest() 3041 headers = {"x-lark-request-timestamp": timestamp, "x-lark-request-nonce": nonce, "x-lark-signature": sig} 3042 self.assertTrue(adapter._is_webhook_signature_valid(headers, body)) 3043 3044 def test_signature_invalid_rejected(self): 3045 adapter = self._make_adapter("test_secret") 3046 headers = { 3047 "x-lark-request-timestamp": "1700000000", 3048 "x-lark-request-nonce": "abc", 3049 "x-lark-signature": "deadbeef" * 8, 3050 } 3051 self.assertFalse(adapter._is_webhook_signature_valid(headers, b'{"type":"event"}')) 3052 3053 def test_signature_missing_headers_rejected(self): 3054 adapter = self._make_adapter("test_secret") 3055 self.assertFalse(adapter._is_webhook_signature_valid({}, b'{}')) 3056 3057 def test_rate_limit_allows_requests_within_window(self): 3058 adapter = self._make_adapter() 3059 for _ in range(5): 3060 self.assertTrue(adapter._check_webhook_rate_limit("10.0.0.1")) 3061 3062 def test_rate_limit_blocks_after_exceeding_max(self): 3063 from gateway.platforms.feishu import _FEISHU_WEBHOOK_RATE_LIMIT_MAX 3064 adapter = self._make_adapter() 3065 for _ in range(_FEISHU_WEBHOOK_RATE_LIMIT_MAX): 3066 adapter._check_webhook_rate_limit("10.0.0.2") 3067 self.assertFalse(adapter._check_webhook_rate_limit("10.0.0.2")) 3068 3069 def test_rate_limit_resets_after_window_expires(self): 3070 from gateway.platforms.feishu import _FEISHU_WEBHOOK_RATE_LIMIT_MAX, _FEISHU_WEBHOOK_RATE_WINDOW_SECONDS 3071 adapter = self._make_adapter() 3072 ip = "10.0.0.3" 3073 for _ in range(_FEISHU_WEBHOOK_RATE_LIMIT_MAX): 3074 adapter._check_webhook_rate_limit(ip) 3075 self.assertFalse(adapter._check_webhook_rate_limit(ip)) 3076 # Simulate window expiry by backdating the stored entry. 3077 count, window_start = adapter._webhook_rate_counts[ip] 3078 adapter._webhook_rate_counts[ip] = (count, window_start - _FEISHU_WEBHOOK_RATE_WINDOW_SECONDS - 1) 3079 self.assertTrue(adapter._check_webhook_rate_limit(ip)) 3080 3081 @patch.dict(os.environ, {}, clear=True) 3082 def test_webhook_request_rejects_oversized_body(self): 3083 from gateway.config import PlatformConfig 3084 from gateway.platforms.feishu import FeishuAdapter, _FEISHU_WEBHOOK_MAX_BODY_BYTES 3085 3086 adapter = FeishuAdapter(PlatformConfig()) 3087 # Simulate a request whose Content-Length already signals oversize. 3088 request = SimpleNamespace( 3089 remote="127.0.0.1", 3090 content_length=_FEISHU_WEBHOOK_MAX_BODY_BYTES + 1, 3091 ) 3092 response = asyncio.run(adapter._handle_webhook_request(request)) 3093 self.assertEqual(response.status, 413) 3094 3095 @patch.dict(os.environ, {}, clear=True) 3096 def test_webhook_request_rejects_invalid_json(self): 3097 from gateway.config import PlatformConfig 3098 from gateway.platforms.feishu import FeishuAdapter 3099 3100 adapter = FeishuAdapter(PlatformConfig()) 3101 request = SimpleNamespace( 3102 remote="127.0.0.1", 3103 content_length=None, 3104 read=AsyncMock(return_value=b"not-json"), 3105 ) 3106 response = asyncio.run(adapter._handle_webhook_request(request)) 3107 self.assertEqual(response.status, 400) 3108 3109 @patch.dict(os.environ, {"FEISHU_ENCRYPT_KEY": "secret"}, clear=True) 3110 def test_webhook_request_rejects_bad_signature(self): 3111 from gateway.config import PlatformConfig 3112 from gateway.platforms.feishu import FeishuAdapter 3113 3114 adapter = FeishuAdapter(PlatformConfig()) 3115 body = json.dumps({"header": {"event_type": "im.message.receive_v1"}}).encode() 3116 request = SimpleNamespace( 3117 remote="127.0.0.1", 3118 content_length=None, 3119 headers={"x-lark-request-timestamp": "123", "x-lark-request-nonce": "abc", "x-lark-signature": "bad"}, 3120 read=AsyncMock(return_value=body), 3121 ) 3122 response = asyncio.run(adapter._handle_webhook_request(request)) 3123 self.assertEqual(response.status, 401) 3124 3125 @patch.dict(os.environ, {}, clear=True) 3126 def test_webhook_url_verification_challenge_passes_without_signature(self): 3127 """Challenge requests must succeed even when no encrypt_key is set.""" 3128 from gateway.config import PlatformConfig 3129 from gateway.platforms.feishu import FeishuAdapter 3130 3131 adapter = FeishuAdapter(PlatformConfig()) 3132 body = json.dumps({"type": "url_verification", "challenge": "test_challenge_token"}).encode() 3133 request = SimpleNamespace( 3134 remote="127.0.0.1", 3135 content_length=None, 3136 read=AsyncMock(return_value=body), 3137 ) 3138 response = asyncio.run(adapter._handle_webhook_request(request)) 3139 self.assertEqual(response.status, 200) 3140 self.assertIn(b"test_challenge_token", response.body) 3141 3142 3143 class TestDedupTTL(unittest.TestCase): 3144 """Tests for TTL-aware deduplication.""" 3145 3146 @patch.dict(os.environ, {}, clear=True) 3147 def test_duplicate_within_ttl_is_rejected(self): 3148 from gateway.config import PlatformConfig 3149 from gateway.platforms.feishu import FeishuAdapter 3150 3151 adapter = FeishuAdapter(PlatformConfig()) 3152 with patch.object(adapter, "_persist_seen_message_ids"): 3153 adapter._seen_message_ids = {"om_dup": time.time()} 3154 adapter._seen_message_order = ["om_dup"] 3155 self.assertTrue(adapter._is_duplicate("om_dup")) 3156 3157 @patch.dict(os.environ, {}, clear=True) 3158 def test_expired_entry_is_not_considered_duplicate(self): 3159 from gateway.config import PlatformConfig 3160 from gateway.platforms.feishu import FeishuAdapter, _FEISHU_DEDUP_TTL_SECONDS 3161 3162 adapter = FeishuAdapter(PlatformConfig()) 3163 # Plant an entry that expired well past the TTL. 3164 stale_ts = time.time() - _FEISHU_DEDUP_TTL_SECONDS - 60 3165 adapter._seen_message_ids = {"om_old": stale_ts} 3166 adapter._seen_message_order = ["om_old"] 3167 with patch.object(adapter, "_persist_seen_message_ids"): 3168 self.assertFalse(adapter._is_duplicate("om_old")) 3169 3170 @patch.dict(os.environ, {}, clear=True) 3171 def test_persist_saves_timestamps_as_dict(self): 3172 from gateway.config import PlatformConfig 3173 from gateway.platforms.feishu import FeishuAdapter 3174 3175 adapter = FeishuAdapter(PlatformConfig()) 3176 ts = time.time() 3177 adapter._seen_message_ids = {"om_ts1": ts} 3178 adapter._seen_message_order = ["om_ts1"] 3179 with tempfile.TemporaryDirectory() as tmpdir: 3180 adapter._dedup_state_path = Path(tmpdir) / "dedup.json" 3181 adapter._persist_seen_message_ids() 3182 saved = json.loads(adapter._dedup_state_path.read_text()) 3183 self.assertIsInstance(saved["message_ids"], dict) 3184 self.assertAlmostEqual(saved["message_ids"]["om_ts1"], ts, places=1) 3185 3186 @patch.dict(os.environ, {}, clear=True) 3187 def test_load_backward_compat_list_format(self): 3188 from gateway.config import PlatformConfig 3189 from gateway.platforms.feishu import FeishuAdapter 3190 3191 adapter = FeishuAdapter(PlatformConfig()) 3192 with tempfile.TemporaryDirectory() as tmpdir: 3193 path = Path(tmpdir) / "dedup.json" 3194 path.write_text(json.dumps({"message_ids": ["om_a", "om_b"]}), encoding="utf-8") 3195 adapter._dedup_state_path = path 3196 adapter._load_seen_message_ids() 3197 self.assertIn("om_a", adapter._seen_message_ids) 3198 self.assertIn("om_b", adapter._seen_message_ids) 3199 3200 3201 class TestGroupMentionAtAll(unittest.TestCase): 3202 """Tests for @_all (Feishu @everyone) group mention routing.""" 3203 3204 @patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True) 3205 def test_at_all_in_content_accepts_without_explicit_bot_mention(self): 3206 from gateway.config import PlatformConfig 3207 from gateway.platforms.feishu import FeishuAdapter 3208 3209 adapter = FeishuAdapter(PlatformConfig()) 3210 message = SimpleNamespace( 3211 content='{"text":"@_all 请注意"}', 3212 mentions=[], 3213 ) 3214 sender_id = SimpleNamespace(open_id="ou_any", user_id=None) 3215 self.assertTrue(_admits_group(adapter, message, sender_id, "")) 3216 3217 @patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "allowlist", "FEISHU_ALLOWED_USERS": "ou_allowed"}, clear=True) 3218 def test_at_all_still_requires_policy_gate(self): 3219 """@_all bypasses mention gating but NOT the allowlist policy.""" 3220 from gateway.config import PlatformConfig 3221 from gateway.platforms.feishu import FeishuAdapter 3222 3223 adapter = FeishuAdapter(PlatformConfig()) 3224 message = SimpleNamespace(content='{"text":"@_all attention"}', mentions=[]) 3225 # Non-allowlisted user — should be blocked even with @_all. 3226 blocked_sender = SimpleNamespace(open_id="ou_blocked", user_id=None) 3227 self.assertFalse(_admits_group(adapter, message, blocked_sender, "")) 3228 # Allowlisted user — should pass. 3229 allowed_sender = SimpleNamespace(open_id="ou_allowed", user_id=None) 3230 self.assertTrue(_admits_group(adapter, message, allowed_sender, "")) 3231 3232 3233 @unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") 3234 class TestSenderNameResolution(unittest.TestCase): 3235 """Tests for _resolve_sender_name_from_api (contact API + cache).""" 3236 3237 @patch.dict(os.environ, {}, clear=True) 3238 def test_returns_none_when_client_is_none(self): 3239 from gateway.config import PlatformConfig 3240 from gateway.platforms.feishu import FeishuAdapter 3241 3242 adapter = FeishuAdapter(PlatformConfig()) 3243 adapter._client = None 3244 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_abc")) 3245 self.assertIsNone(result) 3246 3247 @patch.dict(os.environ, {}, clear=True) 3248 def test_returns_cached_name_within_ttl(self): 3249 from gateway.config import PlatformConfig 3250 from gateway.platforms.feishu import FeishuAdapter 3251 3252 adapter = FeishuAdapter(PlatformConfig()) 3253 adapter._client = SimpleNamespace() 3254 future_expire = time.time() + 600 3255 adapter._sender_name_cache["ou_cached"] = ("Alice", future_expire) 3256 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_cached")) 3257 self.assertEqual(result, "Alice") 3258 3259 @patch.dict(os.environ, {}, clear=True) 3260 def test_fetches_and_caches_name_from_api(self): 3261 from gateway.config import PlatformConfig 3262 from gateway.platforms.feishu import FeishuAdapter 3263 3264 adapter = FeishuAdapter(PlatformConfig()) 3265 user_obj = SimpleNamespace(name="Bob", display_name=None, nickname=None, en_name=None) 3266 mock_response = SimpleNamespace( 3267 success=lambda: True, 3268 data=SimpleNamespace(user=user_obj), 3269 ) 3270 3271 async def _direct(func, *args, **kwargs): 3272 return func(*args, **kwargs) 3273 3274 class _ContactAPI: 3275 def get(self, request): 3276 return mock_response 3277 3278 adapter._client = SimpleNamespace( 3279 contact=SimpleNamespace(v3=SimpleNamespace(user=_ContactAPI())) 3280 ) 3281 3282 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 3283 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_bob")) 3284 3285 self.assertEqual(result, "Bob") 3286 self.assertIn("ou_bob", adapter._sender_name_cache) 3287 3288 @patch.dict(os.environ, {}, clear=True) 3289 def test_expired_cache_triggers_new_api_call(self): 3290 from gateway.config import PlatformConfig 3291 from gateway.platforms.feishu import FeishuAdapter 3292 3293 adapter = FeishuAdapter(PlatformConfig()) 3294 # Expired cache entry. 3295 adapter._sender_name_cache["ou_expired"] = ("OldName", time.time() - 1) 3296 3297 async def _direct(func, *args, **kwargs): 3298 return func(*args, **kwargs) 3299 3300 user_obj = SimpleNamespace(name="NewName", display_name=None, nickname=None, en_name=None) 3301 3302 class _ContactAPI: 3303 def get(self, request): 3304 return SimpleNamespace(success=lambda: True, data=SimpleNamespace(user=user_obj)) 3305 3306 adapter._client = SimpleNamespace( 3307 contact=SimpleNamespace(v3=SimpleNamespace(user=_ContactAPI())) 3308 ) 3309 3310 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 3311 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_expired")) 3312 3313 self.assertEqual(result, "NewName") 3314 3315 @patch.dict(os.environ, {}, clear=True) 3316 def test_api_failure_returns_none_without_raising(self): 3317 from gateway.config import PlatformConfig 3318 from gateway.platforms.feishu import FeishuAdapter 3319 3320 adapter = FeishuAdapter(PlatformConfig()) 3321 3322 class _BrokenContactAPI: 3323 def get(self, _request): 3324 raise RuntimeError("API down") 3325 3326 adapter._client = SimpleNamespace( 3327 contact=SimpleNamespace(v3=SimpleNamespace(user=_BrokenContactAPI())) 3328 ) 3329 3330 async def _direct(func, *args, **kwargs): 3331 return func(*args, **kwargs) 3332 3333 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 3334 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_broken")) 3335 3336 self.assertIsNone(result) 3337 3338 3339 @unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") 3340 class TestBotNameResolution(unittest.TestCase): 3341 """Tests for the bot branch of _resolve_sender_name_from_api (basic_batch API + shared cache).""" 3342 3343 @staticmethod 3344 def _batch_payload(bots: Dict[str, str]): 3345 import json as _json 3346 body = { 3347 oid: {"bot_id": oid, "name": name, "i18n_names": {"en_us": name}} 3348 for oid, name in bots.items() 3349 } 3350 return _json.dumps({"code": 0, "msg": "", "data": {"bots": body, "failed_bots": {}}}).encode() 3351 3352 def _build_adapter_with_bots(self, bots: Dict[str, str]): 3353 from gateway.config import PlatformConfig 3354 from gateway.platforms.feishu import FeishuAdapter 3355 3356 adapter = FeishuAdapter(PlatformConfig()) 3357 calls = [] 3358 3359 def _fake_request(request): 3360 calls.append(request) 3361 return SimpleNamespace(raw=SimpleNamespace(content=self._batch_payload(bots))) 3362 3363 adapter._client = SimpleNamespace(request=_fake_request) 3364 return adapter, calls 3365 3366 @patch.dict(os.environ, {}, clear=True) 3367 def test_returns_cached_bot_name_without_api_call(self): 3368 from gateway.config import PlatformConfig 3369 from gateway.platforms.feishu import FeishuAdapter 3370 3371 adapter = FeishuAdapter(PlatformConfig()) 3372 adapter._sender_name_cache["ou_peer"] = ("Peer Bot", time.time() + 600) 3373 adapter._client = SimpleNamespace( 3374 request=lambda _r: (_ for _ in ()).throw(RuntimeError("should not fetch")) 3375 ) 3376 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_peer", is_bot=True)) 3377 self.assertEqual(result, "Peer Bot") 3378 3379 @patch.dict(os.environ, {}, clear=True) 3380 def test_fetches_and_caches_bot_name(self): 3381 adapter, calls = self._build_adapter_with_bots({"ou_peer": "Peer Bot"}) 3382 3383 async def _direct(func, *args, **kwargs): 3384 return func(*args, **kwargs) 3385 3386 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 3387 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_peer", is_bot=True)) 3388 3389 self.assertEqual(result, "Peer Bot") 3390 self.assertEqual(adapter._sender_name_cache["ou_peer"][0], "Peer Bot") 3391 self.assertEqual(len(calls), 1) 3392 self.assertIn("/open-apis/bot/v3/bots/basic_batch", calls[0].uri) 3393 # Feishu expects repeated ?bot_ids= params, not comma-joined. 3394 self.assertEqual(calls[0].queries, [("bot_ids", "ou_peer")]) 3395 3396 @patch.dict(os.environ, {}, clear=True) 3397 def test_api_failure_returns_none_and_does_not_poison_cache(self): 3398 from gateway.config import PlatformConfig 3399 from gateway.platforms.feishu import FeishuAdapter 3400 3401 adapter = FeishuAdapter(PlatformConfig()) 3402 3403 def _broken_request(_req): 3404 raise RuntimeError("API down") 3405 3406 adapter._client = SimpleNamespace(request=_broken_request) 3407 3408 async def _direct(func, *args, **kwargs): 3409 return func(*args, **kwargs) 3410 3411 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 3412 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_peer", is_bot=True)) 3413 3414 self.assertIsNone(result) 3415 self.assertNotIn("ou_peer", adapter._sender_name_cache) 3416 3417 @patch.dict(os.environ, {}, clear=True) 3418 def test_bot_absent_from_response_is_not_cached(self): 3419 """Bot not in ``data.bots`` (e.g. landed in ``failed_bots``) → no 3420 cache entry, next lookup re-fetches.""" 3421 adapter, _ = self._build_adapter_with_bots({"ou_other": "Other Bot"}) 3422 3423 async def _direct(func, *args, **kwargs): 3424 return func(*args, **kwargs) 3425 3426 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 3427 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_ghost", is_bot=True)) 3428 3429 self.assertIsNone(result) 3430 self.assertNotIn("ou_ghost", adapter._sender_name_cache) 3431 3432 @patch.dict(os.environ, {}, clear=True) 3433 def test_empty_name_in_response_is_negative_cached(self): 3434 """API returns name="" → cache "" so repeat lookups short-circuit.""" 3435 adapter, calls = self._build_adapter_with_bots({"ou_nameless": ""}) 3436 3437 async def _direct(func, *args, **kwargs): 3438 return func(*args, **kwargs) 3439 3440 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 3441 first = asyncio.run(adapter._resolve_sender_name_from_api("ou_nameless", is_bot=True)) 3442 second = asyncio.run(adapter._resolve_sender_name_from_api("ou_nameless", is_bot=True)) 3443 3444 self.assertIsNone(first) 3445 self.assertIsNone(second) 3446 self.assertEqual(adapter._sender_name_cache["ou_nameless"][0], "") 3447 self.assertEqual(len(calls), 1) 3448 3449 @patch.dict(os.environ, {}, clear=True) 3450 def test_non_zero_code_returns_none(self): 3451 from gateway.config import PlatformConfig 3452 from gateway.platforms.feishu import FeishuAdapter 3453 3454 adapter = FeishuAdapter(PlatformConfig()) 3455 error_payload = b'{"code":99991663,"msg":"permission denied"}' 3456 adapter._client = SimpleNamespace( 3457 request=lambda _r: SimpleNamespace(raw=SimpleNamespace(content=error_payload)) 3458 ) 3459 3460 async def _direct(func, *args, **kwargs): 3461 return func(*args, **kwargs) 3462 3463 with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct): 3464 result = asyncio.run(adapter._resolve_sender_name_from_api("ou_peer", is_bot=True)) 3465 3466 self.assertIsNone(result) 3467 self.assertNotIn("ou_peer", adapter._sender_name_cache) 3468 3469 3470 @unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed") 3471 class TestProcessingReactions(unittest.TestCase): 3472 """Typing on start → removed on SUCCESS, swapped for CrossMark on FAILURE, 3473 removed (no replacement) on CANCELLED.""" 3474 3475 @staticmethod 3476 def _run(coro): 3477 return asyncio.run(coro) 3478 3479 def _build_adapter( 3480 self, 3481 create_success: bool = True, 3482 delete_success: bool = True, 3483 next_reaction_id: str = "r1", 3484 ): 3485 from gateway.config import PlatformConfig 3486 from gateway.platforms.feishu import FeishuAdapter 3487 3488 adapter = FeishuAdapter(PlatformConfig()) 3489 tracker = SimpleNamespace( 3490 create_calls=[], 3491 delete_calls=[], 3492 next_reaction_id=next_reaction_id, 3493 create_success=create_success, 3494 delete_success=delete_success, 3495 ) 3496 3497 def _create(request): 3498 tracker.create_calls.append( 3499 request.request_body.reaction_type["emoji_type"] 3500 ) 3501 if tracker.create_success: 3502 return SimpleNamespace( 3503 success=lambda: True, 3504 data=SimpleNamespace(reaction_id=tracker.next_reaction_id), 3505 ) 3506 return SimpleNamespace( 3507 success=lambda: False, code=99, msg="rejected", data=None, 3508 ) 3509 3510 def _delete(request): 3511 tracker.delete_calls.append(request.reaction_id) 3512 return SimpleNamespace( 3513 success=lambda: tracker.delete_success, 3514 code=0 if tracker.delete_success else 99, 3515 msg="success" if tracker.delete_success else "rejected", 3516 ) 3517 3518 adapter._client = SimpleNamespace( 3519 im=SimpleNamespace( 3520 v1=SimpleNamespace( 3521 message_reaction=SimpleNamespace(create=_create, delete=_delete), 3522 ), 3523 ), 3524 ) 3525 return adapter, tracker 3526 3527 @staticmethod 3528 def _event(message_id: str = "om_msg"): 3529 return SimpleNamespace(message_id=message_id) 3530 3531 def _patch_to_thread(self): 3532 async def _direct(func, *args, **kwargs): 3533 return func(*args, **kwargs) 3534 3535 return patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct) 3536 3537 # ------------------------------------------------------------------ start 3538 @patch.dict(os.environ, {}, clear=True) 3539 def test_start_adds_typing_and_caches_reaction_id(self): 3540 adapter, tracker = self._build_adapter(next_reaction_id="r_typing") 3541 with self._patch_to_thread(): 3542 self._run(adapter.on_processing_start(self._event())) 3543 self.assertEqual(tracker.create_calls, ["Typing"]) 3544 self.assertEqual(adapter._pending_processing_reactions["om_msg"], "r_typing") 3545 3546 @patch.dict(os.environ, {}, clear=True) 3547 def test_start_is_idempotent_for_same_message_id(self): 3548 adapter, tracker = self._build_adapter(next_reaction_id="r_typing") 3549 with self._patch_to_thread(): 3550 self._run(adapter.on_processing_start(self._event())) 3551 self._run(adapter.on_processing_start(self._event())) 3552 self.assertEqual(tracker.create_calls, ["Typing"]) 3553 3554 @patch.dict(os.environ, {}, clear=True) 3555 def test_start_does_not_cache_when_create_fails(self): 3556 adapter, tracker = self._build_adapter(create_success=False) 3557 with self._patch_to_thread(): 3558 self._run(adapter.on_processing_start(self._event())) 3559 self.assertEqual(tracker.create_calls, ["Typing"]) 3560 self.assertNotIn("om_msg", adapter._pending_processing_reactions) 3561 3562 # --------------------------------------------------------------- complete 3563 @patch.dict(os.environ, {}, clear=True) 3564 def test_success_removes_typing_and_adds_nothing(self): 3565 adapter, tracker = self._build_adapter(next_reaction_id="r_typing") 3566 with self._patch_to_thread(): 3567 self._run(adapter.on_processing_start(self._event())) 3568 self._run( 3569 adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS) 3570 ) 3571 self.assertEqual(tracker.create_calls, ["Typing"]) 3572 self.assertEqual(tracker.delete_calls, ["r_typing"]) 3573 self.assertNotIn("om_msg", adapter._pending_processing_reactions) 3574 3575 @patch.dict(os.environ, {}, clear=True) 3576 def test_failure_removes_typing_then_adds_cross_mark(self): 3577 adapter, tracker = self._build_adapter(next_reaction_id="r_typing") 3578 with self._patch_to_thread(): 3579 self._run(adapter.on_processing_start(self._event())) 3580 self._run( 3581 adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE) 3582 ) 3583 self.assertEqual(tracker.create_calls, ["Typing", "CrossMark"]) 3584 self.assertEqual(tracker.delete_calls, ["r_typing"]) 3585 3586 @patch.dict(os.environ, {}, clear=True) 3587 def test_cancelled_removes_typing_and_adds_nothing(self): 3588 adapter, tracker = self._build_adapter(next_reaction_id="r_typing") 3589 with self._patch_to_thread(): 3590 self._run(adapter.on_processing_start(self._event())) 3591 self._run( 3592 adapter.on_processing_complete(self._event(), ProcessingOutcome.CANCELLED) 3593 ) 3594 self.assertEqual(tracker.create_calls, ["Typing"]) 3595 self.assertEqual(tracker.delete_calls, ["r_typing"]) 3596 self.assertNotIn("om_msg", adapter._pending_processing_reactions) 3597 3598 @patch.dict(os.environ, {}, clear=True) 3599 def test_failure_without_preceding_start_still_adds_cross_mark(self): 3600 adapter, tracker = self._build_adapter() 3601 with self._patch_to_thread(): 3602 self._run( 3603 adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE) 3604 ) 3605 self.assertEqual(tracker.create_calls, ["CrossMark"]) 3606 self.assertEqual(tracker.delete_calls, []) 3607 3608 @patch.dict(os.environ, {}, clear=True) 3609 def test_success_without_preceding_start_is_full_noop(self): 3610 adapter, tracker = self._build_adapter() 3611 with self._patch_to_thread(): 3612 self._run( 3613 adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS) 3614 ) 3615 self.assertEqual(tracker.create_calls, []) 3616 self.assertEqual(tracker.delete_calls, []) 3617 3618 # ------------------------- delete failure: don't stack badges ----------- 3619 @patch.dict(os.environ, {}, clear=True) 3620 def test_delete_failure_on_failure_outcome_skips_cross_mark(self): 3621 # Removing Typing is best-effort — but if it fails, we must NOT 3622 # additionally add CrossMark, or the UI would show two contradictory 3623 # badges. The handle stays in the cache for LRU to clean up later. 3624 adapter, tracker = self._build_adapter( 3625 next_reaction_id="r_typing", delete_success=False, 3626 ) 3627 with self._patch_to_thread(): 3628 self._run(adapter.on_processing_start(self._event())) 3629 self._run( 3630 adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE) 3631 ) 3632 self.assertEqual(tracker.create_calls, ["Typing"]) # CrossMark NOT added 3633 self.assertEqual(tracker.delete_calls, ["r_typing"]) # delete was attempted 3634 self.assertEqual( 3635 adapter._pending_processing_reactions["om_msg"], "r_typing", 3636 ) # handle retained 3637 3638 @patch.dict(os.environ, {}, clear=True) 3639 def test_delete_failure_on_success_outcome_retains_handle(self): 3640 adapter, tracker = self._build_adapter( 3641 next_reaction_id="r_typing", delete_success=False, 3642 ) 3643 with self._patch_to_thread(): 3644 self._run(adapter.on_processing_start(self._event())) 3645 self._run( 3646 adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS) 3647 ) 3648 self.assertEqual(tracker.create_calls, ["Typing"]) 3649 self.assertEqual(tracker.delete_calls, ["r_typing"]) 3650 self.assertEqual( 3651 adapter._pending_processing_reactions["om_msg"], "r_typing", 3652 ) 3653 3654 # ------------------------------------------------------------- env toggle 3655 @patch.dict(os.environ, {"FEISHU_REACTIONS": "false"}, clear=True) 3656 def test_env_disable_short_circuits_both_hooks(self): 3657 adapter, tracker = self._build_adapter() 3658 with self._patch_to_thread(): 3659 self._run(adapter.on_processing_start(self._event())) 3660 self._run( 3661 adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE) 3662 ) 3663 self.assertEqual(tracker.create_calls, []) 3664 self.assertEqual(tracker.delete_calls, []) 3665 3666 # ------------------------------------------------------------- LRU bounds 3667 @patch.dict(os.environ, {}, clear=True) 3668 def test_cache_evicts_oldest_entry_beyond_size_limit(self): 3669 from gateway.platforms.feishu import _FEISHU_PROCESSING_REACTION_CACHE_SIZE 3670 3671 adapter, _ = self._build_adapter() 3672 counter = {"n": 0} 3673 3674 def _create(_request): 3675 counter["n"] += 1 3676 return SimpleNamespace( 3677 success=lambda: True, 3678 data=SimpleNamespace(reaction_id=f"r{counter['n']}"), 3679 ) 3680 3681 adapter._client.im.v1.message_reaction.create = _create 3682 3683 with self._patch_to_thread(): 3684 for i in range(_FEISHU_PROCESSING_REACTION_CACHE_SIZE + 1): 3685 self._run(adapter.on_processing_start(self._event(f"om_{i}"))) 3686 3687 self.assertNotIn("om_0", adapter._pending_processing_reactions) 3688 self.assertIn( 3689 f"om_{_FEISHU_PROCESSING_REACTION_CACHE_SIZE}", 3690 adapter._pending_processing_reactions, 3691 ) 3692 self.assertEqual( 3693 len(adapter._pending_processing_reactions), 3694 _FEISHU_PROCESSING_REACTION_CACHE_SIZE, 3695 ) 3696 3697 3698 class TestFeishuMentionMap(unittest.TestCase): 3699 def test_build_mentions_map_handles_at_all(self): 3700 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity, FeishuMentionRef 3701 3702 mention = SimpleNamespace(key="@_all", id=None, name="") 3703 result = _build_mentions_map( 3704 [mention], 3705 _FeishuBotIdentity(open_id="ou_bot", name="Hermes"), 3706 ) 3707 self.assertEqual(result["@_all"], FeishuMentionRef(is_all=True)) 3708 3709 def test_build_mentions_map_marks_self_by_open_id(self): 3710 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity 3711 3712 mention = SimpleNamespace( 3713 key="@_user_1", 3714 id=SimpleNamespace(open_id="ou_bot", user_id=""), 3715 name="Hermes", 3716 ) 3717 ref = _build_mentions_map([mention], _FeishuBotIdentity(open_id="ou_bot"))["@_user_1"] 3718 self.assertTrue(ref.is_self) 3719 self.assertEqual(ref.open_id, "ou_bot") 3720 self.assertEqual(ref.name, "Hermes") 3721 3722 def test_build_mentions_map_marks_self_by_name_fallback(self): 3723 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity 3724 3725 mention = SimpleNamespace( 3726 key="@_user_1", 3727 id=SimpleNamespace(open_id="", user_id=""), 3728 name="Hermes", 3729 ) 3730 result = _build_mentions_map([mention], _FeishuBotIdentity(name="Hermes")) 3731 self.assertTrue(result["@_user_1"].is_self) 3732 3733 def test_build_mentions_map_name_match_does_not_override_mismatching_open_id(self): 3734 """Regression: a human user whose display name matches the bot must 3735 NOT be flagged as self when their open_id differs. Before the fix, 3736 name-match fired even when open_id was present and different, causing 3737 their messages to be silently stripped/dropped.""" 3738 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity 3739 3740 human_with_same_name = SimpleNamespace( 3741 key="@_user_1", 3742 id=SimpleNamespace(open_id="ou_human", user_id=""), 3743 name="Hermes Bot", 3744 ) 3745 result = _build_mentions_map( 3746 [human_with_same_name], 3747 _FeishuBotIdentity(open_id="ou_bot", name="Hermes Bot"), 3748 ) 3749 self.assertFalse(result["@_user_1"].is_self) 3750 3751 def test_build_mentions_map_falls_back_to_name_when_bot_open_id_not_hydrated(self): 3752 """Regression: right after gateway startup, _hydrate_bot_identity may 3753 not have populated _bot_open_id yet. During that window, a mention 3754 carrying a real open_id should still match via name — otherwise 3755 @bot messages silently fail admission.""" 3756 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity 3757 3758 bot_mention = SimpleNamespace( 3759 key="@_user_1", 3760 id=SimpleNamespace(open_id="ou_bot_actual", user_id=""), 3761 name="Hermes Bot", 3762 ) 3763 # Bot identity has name but no open_id yet (hydration pending). 3764 result = _build_mentions_map( 3765 [bot_mention], 3766 _FeishuBotIdentity(open_id="", name="Hermes Bot"), 3767 ) 3768 self.assertTrue(result["@_user_1"].is_self) 3769 3770 def test_build_mentions_map_non_self_user(self): 3771 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity 3772 3773 mention = SimpleNamespace( 3774 key="@_user_1", 3775 id=SimpleNamespace(open_id="ou_alice", user_id=""), 3776 name="Alice", 3777 ) 3778 ref = _build_mentions_map([mention], _FeishuBotIdentity(open_id="ou_bot"))["@_user_1"] 3779 self.assertFalse(ref.is_self) 3780 self.assertEqual(ref.open_id, "ou_alice") 3781 self.assertEqual(ref.name, "Alice") 3782 3783 def test_build_mentions_map_returns_empty_for_none_input(self): 3784 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity 3785 3786 self.assertEqual(_build_mentions_map(None, _FeishuBotIdentity(open_id="ou_bot")), {}) 3787 3788 def test_build_mentions_map_tolerates_missing_id_object(self): 3789 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity 3790 3791 mention = SimpleNamespace(key="@_user_9", id=None, name="") 3792 ref = _build_mentions_map([mention], _FeishuBotIdentity(open_id="ou_bot"))["@_user_9"] 3793 self.assertEqual(ref.open_id, "") 3794 self.assertFalse(ref.is_self) 3795 3796 3797 class TestFeishuMentionHint(unittest.TestCase): 3798 def test_hint_single_user(self): 3799 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3800 3801 refs = [FeishuMentionRef(name="Alice", open_id="ou_alice")] 3802 self.assertEqual( 3803 _build_mention_hint(refs), 3804 "[Mentioned: Alice (open_id=ou_alice)]", 3805 ) 3806 3807 def test_hint_multiple_users(self): 3808 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3809 3810 refs = [ 3811 FeishuMentionRef(name="Alice", open_id="ou_alice"), 3812 FeishuMentionRef(name="Bob", open_id="ou_bob"), 3813 ] 3814 self.assertEqual( 3815 _build_mention_hint(refs), 3816 "[Mentioned: Alice (open_id=ou_alice), Bob (open_id=ou_bob)]", 3817 ) 3818 3819 def test_hint_at_all(self): 3820 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3821 3822 refs = [FeishuMentionRef(is_all=True)] 3823 self.assertEqual(_build_mention_hint(refs), "[Mentioned: @all]") 3824 3825 def test_hint_filters_self_mentions(self): 3826 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3827 3828 refs = [ 3829 FeishuMentionRef(name="Hermes", open_id="ou_bot", is_self=True), 3830 FeishuMentionRef(name="Alice", open_id="ou_alice"), 3831 ] 3832 self.assertEqual( 3833 _build_mention_hint(refs), 3834 "[Mentioned: Alice (open_id=ou_alice)]", 3835 ) 3836 3837 def test_hint_returns_empty_when_only_self(self): 3838 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3839 3840 refs = [FeishuMentionRef(name="Hermes", open_id="ou_bot", is_self=True)] 3841 self.assertEqual(_build_mention_hint(refs), "") 3842 3843 def test_hint_returns_empty_for_no_refs(self): 3844 from gateway.platforms.feishu import _build_mention_hint 3845 3846 self.assertEqual(_build_mention_hint([]), "") 3847 3848 def test_hint_falls_back_when_open_id_missing(self): 3849 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3850 3851 refs = [FeishuMentionRef(name="Alice", open_id="")] 3852 self.assertEqual(_build_mention_hint(refs), "[Mentioned: Alice]") 3853 3854 def test_hint_uses_unknown_placeholder_when_name_missing(self): 3855 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3856 3857 refs = [FeishuMentionRef(name="", open_id="ou_xxx")] 3858 self.assertEqual(_build_mention_hint(refs), "[Mentioned: unknown (open_id=ou_xxx)]") 3859 3860 def test_hint_dedupes_repeated_user(self): 3861 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3862 3863 refs = [ 3864 FeishuMentionRef(name="Alice", open_id="ou_alice"), 3865 FeishuMentionRef(name="Alice", open_id="ou_alice"), 3866 FeishuMentionRef(name="Bob", open_id="ou_bob"), 3867 ] 3868 self.assertEqual( 3869 _build_mention_hint(refs), 3870 "[Mentioned: Alice (open_id=ou_alice), Bob (open_id=ou_bob)]", 3871 ) 3872 3873 def test_hint_dedupes_repeated_at_all(self): 3874 from gateway.platforms.feishu import FeishuMentionRef, _build_mention_hint 3875 3876 refs = [FeishuMentionRef(is_all=True), FeishuMentionRef(is_all=True)] 3877 self.assertEqual(_build_mention_hint(refs), "[Mentioned: @all]") 3878 3879 3880 class TestFeishuStripLeadingSelf(unittest.TestCase): 3881 def _make_refs(self, *, self_name="Hermes", other_name=None): 3882 from gateway.platforms.feishu import FeishuMentionRef 3883 3884 refs = [FeishuMentionRef(name=self_name, open_id="ou_bot", is_self=True)] 3885 if other_name: 3886 refs.append(FeishuMentionRef(name=other_name, open_id="ou_alice")) 3887 return refs 3888 3889 def test_strips_leading_self(self): 3890 from gateway.platforms.feishu import _strip_edge_self_mentions 3891 3892 result = _strip_edge_self_mentions("@Hermes /help", self._make_refs()) 3893 self.assertEqual(result, "/help") 3894 3895 def test_strips_consecutive_leading_self(self): 3896 from gateway.platforms.feishu import _strip_edge_self_mentions 3897 3898 result = _strip_edge_self_mentions("@Hermes @Hermes hi", self._make_refs()) 3899 self.assertEqual(result, "hi") 3900 3901 def test_stops_at_first_non_self_token(self): 3902 from gateway.platforms.feishu import _strip_edge_self_mentions 3903 3904 result = _strip_edge_self_mentions( 3905 "@Hermes @Alice make a group", self._make_refs(other_name="Alice") 3906 ) 3907 self.assertEqual(result, "@Alice make a group") 3908 3909 def test_preserves_mid_text_self(self): 3910 from gateway.platforms.feishu import _strip_edge_self_mentions 3911 3912 result = _strip_edge_self_mentions("check @Hermes said yesterday", self._make_refs()) 3913 self.assertEqual(result, "check @Hermes said yesterday") 3914 3915 def test_strips_trailing_self_at_end_of_text(self): 3916 from gateway.platforms.feishu import _strip_edge_self_mentions 3917 3918 result = _strip_edge_self_mentions("look up docs @Hermes", self._make_refs()) 3919 self.assertEqual(result, "look up docs") 3920 3921 def test_strips_trailing_self_with_terminal_punct(self): 3922 from gateway.platforms.feishu import _strip_edge_self_mentions 3923 3924 # Terminal punct after the mention — strip the mention, keep the punct. 3925 result = _strip_edge_self_mentions("look up docs @Hermes.", self._make_refs()) 3926 self.assertEqual(result, "look up docs.") 3927 3928 def test_preserves_trailing_self_before_non_terminal_char(self): 3929 from gateway.platforms.feishu import _strip_edge_self_mentions 3930 3931 # Non-terminal char (here a Chinese particle) follows — preserve. 3932 result = _strip_edge_self_mentions( 3933 "please don't @Hermes anymore", self._make_refs() 3934 ) 3935 self.assertEqual(result, "please don't @Hermes anymore") 3936 3937 def test_returns_input_when_refs_empty(self): 3938 from gateway.platforms.feishu import _strip_edge_self_mentions 3939 3940 self.assertEqual(_strip_edge_self_mentions("@Hermes /help", []), "@Hermes /help") 3941 3942 def test_returns_input_when_no_self_refs(self): 3943 from gateway.platforms.feishu import _strip_edge_self_mentions, FeishuMentionRef 3944 3945 refs = [FeishuMentionRef(name="Alice", open_id="ou_alice")] 3946 self.assertEqual(_strip_edge_self_mentions("@Alice hi", refs), "@Alice hi") 3947 3948 def test_uses_open_id_fallback_when_name_missing(self): 3949 from gateway.platforms.feishu import _strip_edge_self_mentions, FeishuMentionRef 3950 3951 refs = [FeishuMentionRef(name="", open_id="ou_bot", is_self=True)] 3952 self.assertEqual(_strip_edge_self_mentions("@ou_bot hi", refs), "hi") 3953 3954 def test_word_boundary_prevents_prefix_collision(self): 3955 """A bot named 'Al' must not eat the leading '@Alice' of a different user.""" 3956 from gateway.platforms.feishu import _strip_edge_self_mentions, FeishuMentionRef 3957 3958 refs = [FeishuMentionRef(name="Al", open_id="ou_bot", is_self=True)] 3959 self.assertEqual(_strip_edge_self_mentions("@Alice hi", refs), "@Alice hi") 3960 3961 3962 class TestFeishuNormalizeText(unittest.TestCase): 3963 def test_renders_mention_with_display_name(self): 3964 from gateway.platforms.feishu import _normalize_feishu_text, FeishuMentionRef 3965 3966 refs = {"@_user_1": FeishuMentionRef(name="Alice", open_id="ou_alice")} 3967 self.assertEqual(_normalize_feishu_text("@_user_1 hello", refs), "@Alice hello") 3968 3969 def test_renders_self_mention_with_name(self): 3970 from gateway.platforms.feishu import _normalize_feishu_text, FeishuMentionRef 3971 3972 refs = {"@_user_1": FeishuMentionRef(name="Hermes", open_id="ou_bot", is_self=True)} 3973 self.assertEqual( 3974 _normalize_feishu_text("stop pinging @_user_1 please", refs), 3975 "stop pinging @Hermes please", 3976 ) 3977 3978 def test_at_all_rendered_as_english_literal(self): 3979 from gateway.platforms.feishu import _normalize_feishu_text 3980 3981 self.assertEqual(_normalize_feishu_text("@_all notice", None), "@all notice") 3982 3983 def test_unknown_placeholder_degrades_to_space(self): 3984 from gateway.platforms.feishu import _normalize_feishu_text 3985 3986 # No map: fall back to the old behavior (substitute with space, then collapse). 3987 self.assertEqual(_normalize_feishu_text("@_user_9 hello", None), "hello") 3988 3989 def test_backward_compatible_without_map(self): 3990 from gateway.platforms.feishu import _normalize_feishu_text 3991 3992 self.assertEqual(_normalize_feishu_text("hello world"), "hello world") 3993 3994 def test_mention_for_missing_map_entry_degrades_to_space(self): 3995 from gateway.platforms.feishu import _normalize_feishu_text, FeishuMentionRef 3996 3997 refs = {"@_user_1": FeishuMentionRef(name="Alice")} 3998 # @_user_2 has no entry — should degrade to a space (legacy behavior) 3999 self.assertEqual( 4000 _normalize_feishu_text("@_user_1 @_user_2 hi", refs), 4001 "@Alice hi", 4002 ) 4003 4004 4005 class TestFeishuPostMentionParsing(unittest.TestCase): 4006 def test_post_at_tag_renders_via_mentions_map(self): 4007 """Post <at>.user_id is a placeholder ('@_user_N'); the real display 4008 name comes from the mentions_map lookup. Confirmed via live 4009 im.v1.message.get payload.""" 4010 from gateway.platforms.feishu import parse_feishu_post_payload, FeishuMentionRef 4011 4012 payload = { 4013 "en_us": { 4014 "content": [[ 4015 {"tag": "at", "user_id": "@_user_1", "user_name": "ignored"}, 4016 {"tag": "text", "text": " hello"}, 4017 ]] 4018 } 4019 } 4020 mentions_map = { 4021 "@_user_1": FeishuMentionRef(name="Alice", open_id="ou_alice"), 4022 } 4023 result = parse_feishu_post_payload(payload, mentions_map=mentions_map) 4024 self.assertEqual(result.text_content, "@Alice hello") 4025 4026 def test_post_at_tag_falls_back_to_inline_user_name_when_map_misses(self): 4027 """When the mentions payload is missing a placeholder, fall back to the 4028 inline user_name in the <at> tag itself.""" 4029 from gateway.platforms.feishu import parse_feishu_post_payload 4030 4031 payload = { 4032 "en_us": { 4033 "content": [[ 4034 {"tag": "at", "user_id": "@_user_7", "user_name": "Unknown"}, 4035 {"tag": "text", "text": " hi"}, 4036 ]] 4037 } 4038 } 4039 result = parse_feishu_post_payload(payload, mentions_map={}) 4040 self.assertEqual(result.text_content, "@Unknown hi") 4041 4042 def test_post_at_all_tag_renders_as_at_all(self): 4043 """Post-format @everyone has user_id == '@_all' (confirmed via live 4044 im.v1.message.get). Rendered as literal '@all' regardless of map.""" 4045 from gateway.platforms.feishu import parse_feishu_post_payload 4046 4047 payload = { 4048 "en_us": { 4049 "content": [[ 4050 {"tag": "at", "user_id": "@_all", "user_name": "everyone"}, 4051 {"tag": "text", "text": " meeting"}, 4052 ]] 4053 } 4054 } 4055 result = parse_feishu_post_payload(payload) 4056 self.assertIn("@all", result.text_content) 4057 4058 4059 class TestFeishuNormalizeWithMentions(unittest.TestCase): 4060 def test_text_message_renders_mention_by_name(self): 4061 from gateway.platforms.feishu import normalize_feishu_message, _FeishuBotIdentity 4062 4063 mention = SimpleNamespace( 4064 key="@_user_1", 4065 id=SimpleNamespace(open_id="ou_alice", user_id=""), 4066 name="Alice", 4067 ) 4068 normalized = normalize_feishu_message( 4069 message_type="text", 4070 raw_content=json.dumps({"text": "@_user_1 hello"}), 4071 mentions=[mention], 4072 bot=_FeishuBotIdentity(open_id="ou_bot"), 4073 ) 4074 self.assertEqual(normalized.text_content, "@Alice hello") 4075 self.assertEqual(len(normalized.mentions), 1) 4076 self.assertEqual(normalized.mentions[0].open_id, "ou_alice") 4077 self.assertFalse(normalized.mentions[0].is_self) 4078 4079 def test_text_message_marks_bot_self_mention(self): 4080 from gateway.platforms.feishu import normalize_feishu_message, _FeishuBotIdentity 4081 4082 mention = SimpleNamespace( 4083 key="@_user_1", 4084 id=SimpleNamespace(open_id="ou_bot", user_id=""), 4085 name="Hermes", 4086 ) 4087 normalized = normalize_feishu_message( 4088 message_type="text", 4089 raw_content=json.dumps({"text": "@_user_1 /help"}), 4090 mentions=[mention], 4091 bot=_FeishuBotIdentity(open_id="ou_bot"), 4092 ) 4093 self.assertTrue(normalized.mentions[0].is_self) 4094 # self mention is still rendered — strip is a separate adapter-level pass 4095 self.assertEqual(normalized.text_content, "@Hermes /help") 4096 4097 def test_text_message_at_all_surfaces_ref(self): 4098 from gateway.platforms.feishu import normalize_feishu_message 4099 4100 mention = SimpleNamespace(key="@_all", id=None, name="") 4101 normalized = normalize_feishu_message( 4102 message_type="text", 4103 raw_content=json.dumps({"text": "@_all meeting"}), 4104 mentions=[mention], 4105 ) 4106 self.assertEqual(normalized.text_content, "@all meeting") 4107 self.assertEqual(len(normalized.mentions), 1) 4108 self.assertTrue(normalized.mentions[0].is_all) 4109 4110 def test_text_message_at_all_in_text_without_mentions_payload(self): 4111 """Feishu SDK sometimes omits @_all from the mentions payload (confirmed 4112 via im.v1.message.get). The fallback scan on raw text must still yield 4113 an is_all ref so [Mentioned: @all] gets injected.""" 4114 from gateway.platforms.feishu import normalize_feishu_message 4115 4116 normalized = normalize_feishu_message( 4117 message_type="text", 4118 raw_content=json.dumps({"text": "@_all hello"}), 4119 mentions=None, 4120 ) 4121 self.assertEqual(normalized.text_content, "@all hello") 4122 self.assertEqual(len(normalized.mentions), 1) 4123 self.assertTrue(normalized.mentions[0].is_all) 4124 4125 def test_text_message_at_all_not_synthesized_if_absent_from_text(self): 4126 """No @_all in text → no synthetic ref even if mentions_map is empty.""" 4127 from gateway.platforms.feishu import normalize_feishu_message 4128 4129 normalized = normalize_feishu_message( 4130 message_type="text", 4131 raw_content=json.dumps({"text": "plain hello"}), 4132 mentions=None, 4133 ) 4134 self.assertEqual(normalized.mentions, []) 4135 4136 def test_text_message_without_mentions_param_is_backward_compatible(self): 4137 from gateway.platforms.feishu import normalize_feishu_message 4138 4139 normalized = normalize_feishu_message( 4140 message_type="text", 4141 raw_content=json.dumps({"text": "hello world"}), 4142 ) 4143 self.assertEqual(normalized.text_content, "hello world") 4144 self.assertEqual(normalized.mentions, []) 4145 4146 def test_post_message_marks_self_via_mentions_map_lookup(self): 4147 """Real Feishu post: <at user_id="@_user_N"> + top-level mentions array 4148 resolves to open_id via placeholder lookup, not direct tag fields.""" 4149 from gateway.platforms.feishu import normalize_feishu_message, _FeishuBotIdentity 4150 4151 raw = json.dumps({ 4152 "en_us": { 4153 "content": [ 4154 [ 4155 {"tag": "at", "user_id": "@_user_1", "user_name": "Hermes"}, 4156 {"tag": "text", "text": " check this"}, 4157 ] 4158 ] 4159 } 4160 }) 4161 bot_mention = SimpleNamespace( 4162 key="@_user_1", 4163 id=SimpleNamespace(open_id="ou_bot", user_id=""), 4164 name="Hermes", 4165 ) 4166 normalized = normalize_feishu_message( 4167 message_type="post", 4168 raw_content=raw, 4169 mentions=[bot_mention], 4170 bot=_FeishuBotIdentity(open_id="ou_bot"), 4171 ) 4172 self.assertEqual(len(normalized.mentions), 1) 4173 self.assertTrue(normalized.mentions[0].is_self) 4174 self.assertEqual(normalized.mentions[0].open_id, "ou_bot") 4175 4176 4177 class TestFeishuPostMentionsBot(unittest.TestCase): 4178 def _build_adapter(self, bot_open_id="ou_bot", bot_user_id="", bot_name=""): 4179 from gateway.platforms.feishu import FeishuAdapter 4180 4181 adapter = FeishuAdapter.__new__(FeishuAdapter) 4182 adapter._bot_open_id = bot_open_id 4183 adapter._bot_user_id = bot_user_id 4184 adapter._bot_name = bot_name 4185 return adapter 4186 4187 def test_post_mentions_bot_uses_is_self_flag(self): 4188 from gateway.platforms.feishu import FeishuMentionRef 4189 4190 adapter = self._build_adapter() 4191 self.assertTrue( 4192 adapter._post_mentions_bot( 4193 [FeishuMentionRef(name="Hermes", open_id="ou_bot", is_self=True)] 4194 ) 4195 ) 4196 self.assertFalse( 4197 adapter._post_mentions_bot( 4198 [FeishuMentionRef(name="Alice", open_id="ou_alice")] 4199 ) 4200 ) 4201 4202 def test_post_mentions_bot_empty_returns_false(self): 4203 adapter = self._build_adapter() 4204 self.assertFalse(adapter._post_mentions_bot([])) 4205 4206 4207 class TestFeishuExtractMessageContent(unittest.TestCase): 4208 def _build_adapter(self): 4209 from gateway.platforms.feishu import FeishuAdapter 4210 4211 adapter = FeishuAdapter.__new__(FeishuAdapter) 4212 adapter._bot_open_id = "ou_bot" 4213 adapter._bot_user_id = "" 4214 adapter._bot_name = "Hermes" 4215 adapter._download_feishu_message_resources = AsyncMock(return_value=([], [])) 4216 return adapter 4217 4218 def test_returns_five_tuple_with_mentions(self): 4219 adapter = self._build_adapter() 4220 message = SimpleNamespace( 4221 content=json.dumps({"text": "@_user_1 hello"}), 4222 message_type="text", 4223 message_id="m1", 4224 mentions=[ 4225 SimpleNamespace( 4226 key="@_user_1", 4227 id=SimpleNamespace(open_id="ou_alice", user_id=""), 4228 name="Alice", 4229 ) 4230 ], 4231 ) 4232 4233 text, inbound_type, media_urls, media_types, mentions = asyncio.run( 4234 adapter._extract_message_content(message) 4235 ) 4236 self.assertEqual(text, "@Alice hello") 4237 self.assertEqual(len(mentions), 1) 4238 self.assertEqual(mentions[0].open_id, "ou_alice") 4239 4240 def test_returns_empty_mentions_when_missing(self): 4241 adapter = self._build_adapter() 4242 message = SimpleNamespace( 4243 content=json.dumps({"text": "plain hello"}), 4244 message_type="text", 4245 message_id="m2", 4246 mentions=None, 4247 ) 4248 4249 text, _, _, _, mentions = asyncio.run(adapter._extract_message_content(message)) 4250 self.assertEqual(text, "plain hello") 4251 self.assertEqual(mentions, []) 4252 4253 4254 class TestFeishuProcessInboundMessage(unittest.TestCase): 4255 def _build_adapter(self): 4256 from gateway.platforms.feishu import FeishuAdapter 4257 4258 adapter = FeishuAdapter.__new__(FeishuAdapter) 4259 adapter._bot_open_id = "ou_bot" 4260 adapter._bot_user_id = "" 4261 adapter._bot_name = "Hermes" 4262 adapter._download_feishu_message_resources = AsyncMock(return_value=([], [])) 4263 adapter._fetch_message_text = AsyncMock(return_value=None) 4264 adapter.get_chat_info = AsyncMock(return_value={"name": "Test Chat"}) 4265 adapter._resolve_sender_profile = AsyncMock( 4266 return_value={"user_id": "u1", "user_name": "Alice", "user_id_alt": None} 4267 ) 4268 adapter._resolve_source_chat_type = Mock(return_value="group") 4269 adapter.build_source = Mock(return_value=SimpleNamespace(thread_id=None)) 4270 adapter._dispatch_inbound_event = AsyncMock() 4271 return adapter 4272 4273 def test_leading_self_mention_stripped_for_command(self): 4274 from gateway.platforms.base import MessageType 4275 4276 adapter = self._build_adapter() 4277 bot_mention = SimpleNamespace( 4278 key="@_user_1", 4279 id=SimpleNamespace(open_id="ou_bot", user_id=""), 4280 name="Hermes", 4281 ) 4282 message = SimpleNamespace( 4283 content=json.dumps({"text": "@_user_1 /help"}), 4284 message_type="text", 4285 message_id="m1", 4286 mentions=[bot_mention], 4287 chat_id="oc_chat", 4288 parent_id=None, 4289 upper_message_id=None, 4290 thread_id=None, 4291 ) 4292 asyncio.run( 4293 adapter._process_inbound_message( 4294 data=message, 4295 message=message, 4296 sender_id=None, 4297 chat_type="group", 4298 message_id="m1", 4299 ) 4300 ) 4301 event = adapter._dispatch_inbound_event.call_args.args[0] 4302 self.assertEqual(event.text, "/help") 4303 self.assertEqual(event.message_type, MessageType.COMMAND) 4304 4305 def test_non_command_message_with_mentions_injects_hint(self): 4306 from gateway.platforms.base import MessageType 4307 4308 adapter = self._build_adapter() 4309 alice = SimpleNamespace( 4310 key="@_user_1", 4311 id=SimpleNamespace(open_id="ou_alice", user_id=""), 4312 name="Alice", 4313 ) 4314 bob = SimpleNamespace( 4315 key="@_user_2", 4316 id=SimpleNamespace(open_id="ou_bob", user_id=""), 4317 name="Bob", 4318 ) 4319 message = SimpleNamespace( 4320 content=json.dumps({"text": "@_user_1 @_user_2 make a group"}), 4321 message_type="text", 4322 message_id="m2", 4323 mentions=[alice, bob], 4324 chat_id="oc_chat", 4325 parent_id=None, 4326 upper_message_id=None, 4327 thread_id=None, 4328 ) 4329 asyncio.run( 4330 adapter._process_inbound_message( 4331 data=message, 4332 message=message, 4333 sender_id=None, 4334 chat_type="group", 4335 message_id="m2", 4336 ) 4337 ) 4338 event = adapter._dispatch_inbound_event.call_args.args[0] 4339 self.assertEqual(event.message_type, MessageType.TEXT) 4340 self.assertIn("[Mentioned: Alice (open_id=ou_alice), Bob (open_id=ou_bob)]", event.text) 4341 self.assertIn("@Alice @Bob make a group", event.text) 4342 4343 def test_command_message_never_injects_hint(self): 4344 adapter = self._build_adapter() 4345 bot_mention = SimpleNamespace( 4346 key="@_user_1", 4347 id=SimpleNamespace(open_id="ou_bot", user_id=""), 4348 name="Hermes", 4349 ) 4350 alice = SimpleNamespace( 4351 key="@_user_2", 4352 id=SimpleNamespace(open_id="ou_alice", user_id=""), 4353 name="Alice", 4354 ) 4355 message = SimpleNamespace( 4356 content=json.dumps({"text": "@_user_1 /model @_user_2"}), 4357 message_type="text", 4358 message_id="m3", 4359 mentions=[bot_mention, alice], 4360 chat_id="oc_chat", 4361 parent_id=None, 4362 upper_message_id=None, 4363 thread_id=None, 4364 ) 4365 asyncio.run( 4366 adapter._process_inbound_message( 4367 data=message, 4368 message=message, 4369 sender_id=None, 4370 chat_type="group", 4371 message_id="m3", 4372 ) 4373 ) 4374 event = adapter._dispatch_inbound_event.call_args.args[0] 4375 self.assertNotIn("[Mentioned:", event.text) 4376 self.assertTrue(event.text.startswith("/model")) 4377 4378 def test_mid_text_self_mention_preserved(self): 4379 adapter = self._build_adapter() 4380 bot_mention = SimpleNamespace( 4381 key="@_user_1", 4382 id=SimpleNamespace(open_id="ou_bot", user_id=""), 4383 name="Hermes", 4384 ) 4385 message = SimpleNamespace( 4386 content=json.dumps({"text": "stop pinging @_user_1 please"}), 4387 message_type="text", 4388 message_id="m4", 4389 mentions=[bot_mention], 4390 chat_id="oc_chat", 4391 parent_id=None, 4392 upper_message_id=None, 4393 thread_id=None, 4394 ) 4395 asyncio.run( 4396 adapter._process_inbound_message( 4397 data=message, 4398 message=message, 4399 sender_id=None, 4400 chat_type="group", 4401 message_id="m4", 4402 ) 4403 ) 4404 event = adapter._dispatch_inbound_event.call_args.args[0] 4405 self.assertEqual(event.text, "stop pinging @Hermes please") 4406 4407 def test_pure_self_mention_message_is_ignored(self): 4408 """A message containing only '@Bot' (no body, no media) must not dispatch. 4409 4410 Regression guard: the rendered '@Hermes' slips past the pre-strip empty 4411 guard; the post-strip guard must catch it. 4412 """ 4413 adapter = self._build_adapter() 4414 bot_mention = SimpleNamespace( 4415 key="@_user_1", 4416 id=SimpleNamespace(open_id="ou_bot", user_id=""), 4417 name="Hermes", 4418 ) 4419 message = SimpleNamespace( 4420 content=json.dumps({"text": "@_user_1"}), 4421 message_type="text", 4422 message_id="m5", 4423 mentions=[bot_mention], 4424 chat_id="oc_chat", 4425 parent_id=None, 4426 upper_message_id=None, 4427 thread_id=None, 4428 ) 4429 asyncio.run( 4430 adapter._process_inbound_message( 4431 data=message, message=message, sender_id=None, 4432 chat_type="group", message_id="m5", 4433 ) 4434 ) 4435 adapter._dispatch_inbound_event.assert_not_called() 4436 4437 4438 class TestFeishuFetchMessageText(unittest.TestCase): 4439 def _build_adapter(self): 4440 from gateway.platforms.feishu import FeishuAdapter 4441 4442 adapter = FeishuAdapter.__new__(FeishuAdapter) 4443 adapter._bot_open_id = "ou_bot" 4444 adapter._bot_user_id = "" 4445 adapter._bot_name = "Hermes" 4446 adapter._message_text_cache = {} 4447 adapter._client = Mock() 4448 adapter._build_get_message_request = Mock(return_value=object()) 4449 return adapter 4450 4451 def test_fetch_message_text_renders_mentions_without_hint_prefix(self): 4452 adapter = self._build_adapter() 4453 4454 alice_mention = SimpleNamespace( 4455 key="@_user_1", 4456 id="ou_alice", 4457 id_type="open_id", 4458 name="Alice", 4459 ) 4460 parent = SimpleNamespace( 4461 body=SimpleNamespace(content=json.dumps({"text": "@_user_1 hi"})), 4462 msg_type="text", 4463 mentions=[alice_mention], 4464 ) 4465 response = Mock() 4466 response.success = Mock(return_value=True) 4467 response.data = SimpleNamespace(items=[parent]) 4468 adapter._client.im.v1.message.get = Mock(return_value=response) 4469 4470 result = asyncio.run(adapter._fetch_message_text("m_parent")) 4471 self.assertEqual(result, "@Alice hi") 4472 # No [Mentioned:] wrapper — reply-context path intentionally skips the hint. 4473 self.assertNotIn("[Mentioned:", result) 4474 4475 def test_extract_text_from_raw_content_accepts_mentions_kwarg(self): 4476 from gateway.platforms.feishu import FeishuAdapter 4477 4478 adapter = FeishuAdapter.__new__(FeishuAdapter) 4479 adapter._bot_open_id = "" 4480 adapter._bot_user_id = "" 4481 adapter._bot_name = "" 4482 4483 alice_mention = SimpleNamespace( 4484 key="@_user_1", 4485 id=SimpleNamespace(open_id="ou_alice", user_id=""), 4486 name="Alice", 4487 ) 4488 self.assertEqual( 4489 adapter._extract_text_from_raw_content( 4490 msg_type="text", 4491 raw_content=json.dumps({"text": "@_user_1 hello"}), 4492 mentions=[alice_mention], 4493 ), 4494 "@Alice hello", 4495 ) 4496 4497 def test_fetch_message_text_marks_is_self_via_string_id_shape(self): 4498 """History-path Mention objects carry id as str + id_type; is_self must still work.""" 4499 adapter = self._build_adapter() 4500 # bot_name is empty — is_self must be detected via open_id alone 4501 adapter._bot_name = "" 4502 4503 bot_mention = SimpleNamespace( 4504 key="@_user_1", 4505 id="ou_bot", 4506 id_type="open_id", 4507 name="Hermes", 4508 ) 4509 parent = SimpleNamespace( 4510 body=SimpleNamespace(content=json.dumps({"text": "@_user_1 hi"})), 4511 msg_type="text", 4512 mentions=[bot_mention], 4513 ) 4514 response = Mock() 4515 response.success = Mock(return_value=True) 4516 response.data = SimpleNamespace(items=[parent]) 4517 adapter._client.im.v1.message.get = Mock(return_value=response) 4518 4519 # The rendered text should still have the bot name substituted. 4520 result = asyncio.run(adapter._fetch_message_text("m_parent")) 4521 self.assertEqual(result, "@Hermes hi") 4522 4523 def test_build_mentions_map_string_id_shape(self): 4524 """_build_mentions_map accepts the reply-history shape (id as str + 4525 id_type='open_id'). user_id id_type is not load-bearing for self 4526 detection — inbound mention payloads always include an open_id.""" 4527 from gateway.platforms.feishu import _build_mentions_map, _FeishuBotIdentity 4528 4529 # open_id discriminator, non-self 4530 alice = SimpleNamespace(key="@_user_1", id="ou_alice", id_type="open_id", name="Alice") 4531 ref = _build_mentions_map([alice], _FeishuBotIdentity(open_id="ou_bot"))["@_user_1"] 4532 self.assertEqual(ref.open_id, "ou_alice") 4533 self.assertFalse(ref.is_self) 4534 4535 # open_id discriminator, is_self matches via open_id 4536 bot_oid = SimpleNamespace(key="@_user_3", id="ou_bot", id_type="open_id", name="Hermes") 4537 self.assertTrue( 4538 _build_mentions_map([bot_oid], _FeishuBotIdentity(open_id="ou_bot"))["@_user_3"].is_self 4539 ) 4540 4541 4542 class TestFeishuMentionEndToEnd(unittest.TestCase): 4543 """High-level scenarios from the design spec — verify the full pipeline.""" 4544 4545 def _build_adapter(self): 4546 from gateway.platforms.feishu import FeishuAdapter 4547 4548 adapter = FeishuAdapter.__new__(FeishuAdapter) 4549 adapter._bot_open_id = "ou_bot" 4550 adapter._bot_user_id = "" 4551 adapter._bot_name = "Hermes" 4552 adapter._download_feishu_message_resources = AsyncMock(return_value=([], [])) 4553 adapter._fetch_message_text = AsyncMock(return_value=None) 4554 adapter.get_chat_info = AsyncMock(return_value={"name": "Test Chat"}) 4555 adapter._resolve_sender_profile = AsyncMock( 4556 return_value={"user_id": "u1", "user_name": "Alice", "user_id_alt": None} 4557 ) 4558 adapter._resolve_source_chat_type = Mock(return_value="group") 4559 adapter.build_source = Mock(return_value=SimpleNamespace(thread_id=None)) 4560 adapter._dispatch_inbound_event = AsyncMock() 4561 return adapter 4562 4563 def _run(self, adapter, text, mentions): 4564 raw_mentions = [ 4565 SimpleNamespace( 4566 key=m["key"], 4567 id=SimpleNamespace(open_id=m.get("open_id", ""), user_id=m.get("user_id", "")), 4568 name=m.get("name", ""), 4569 ) 4570 for m in mentions 4571 ] 4572 message = SimpleNamespace( 4573 content=json.dumps({"text": text}), 4574 message_type="text", 4575 message_id="m", 4576 mentions=raw_mentions, 4577 chat_id="oc_chat", 4578 parent_id=None, 4579 upper_message_id=None, 4580 thread_id=None, 4581 ) 4582 asyncio.run( 4583 adapter._process_inbound_message( 4584 data=message, message=message, sender_id=None, chat_type="group", message_id="m", 4585 ) 4586 ) 4587 return adapter._dispatch_inbound_event.call_args.args[0] 4588 4589 def test_scenario_bot_plus_alice_plus_bob_build_group(self): 4590 adapter = self._build_adapter() 4591 event = self._run( 4592 adapter, 4593 "@_user_1 @_user_2 @_user_3 build me a group", 4594 [ 4595 {"key": "@_user_1", "open_id": "ou_bot", "name": "Hermes"}, 4596 {"key": "@_user_2", "open_id": "ou_alice", "name": "Alice"}, 4597 {"key": "@_user_3", "open_id": "ou_bob", "name": "Bob"}, 4598 ], 4599 ) 4600 self.assertIn("[Mentioned: Alice (open_id=ou_alice), Bob (open_id=ou_bob)]", event.text) 4601 self.assertIn("@Alice @Bob build me a group", event.text) 4602 self.assertNotIn("@Hermes", event.text) 4603 4604 def test_scenario_at_all_announcement(self): 4605 adapter = self._build_adapter() 4606 event = self._run( 4607 adapter, 4608 "@_all meeting at 3pm", 4609 [{"key": "@_all"}], 4610 ) 4611 self.assertTrue(event.text.startswith("[Mentioned: @all]")) 4612 self.assertIn("@all meeting at 3pm", event.text) 4613 4614 def test_scenario_trailing_self_mention_stripped(self): 4615 """Trailing @bot at the end of a message is routing noise, not content — 4616 strip it so the agent sees a clean instruction body.""" 4617 adapter = self._build_adapter() 4618 event = self._run( 4619 adapter, 4620 "who are you @_user_1", 4621 [{"key": "@_user_1", "open_id": "ou_bot", "name": "Hermes"}], 4622 ) 4623 self.assertEqual(event.text, "who are you") 4624 4625 def test_scenario_mid_text_self_mention_preserved(self): 4626 """Self mention in the middle of a sentence (followed by a non-terminal 4627 character) is meaningful content — preserve it.""" 4628 adapter = self._build_adapter() 4629 event = self._run( 4630 adapter, 4631 "please don't @_user_1 anymore", 4632 [{"key": "@_user_1", "open_id": "ou_bot", "name": "Hermes"}], 4633 ) 4634 self.assertEqual(event.text, "please don't @Hermes anymore") 4635 4636 def test_scenario_no_mentions_zero_regression(self): 4637 adapter = self._build_adapter() 4638 event = self._run(adapter, "plain message", []) 4639 self.assertEqual(event.text, "plain message") 4640 self.assertNotIn("[Mentioned:", event.text) 4641 4642 def test_scenario_post_at_alice_exposes_open_id(self): 4643 """Post-type @mention: <at> placeholder resolves via top-level mentions, 4644 agent gets real open_id in the hint (mirrors text-type behavior).""" 4645 adapter = self._build_adapter() 4646 alice_mention = SimpleNamespace( 4647 key="@_user_1", 4648 id=SimpleNamespace(open_id="ou_alice", user_id=""), 4649 name="Alice", 4650 ) 4651 post_content = json.dumps({ 4652 "zh_cn": { 4653 "content": [[ 4654 {"tag": "at", "user_id": "@_user_1", "user_name": "Alice"}, 4655 {"tag": "text", "text": " lookup this doc"}, 4656 ]] 4657 } 4658 }) 4659 message = SimpleNamespace( 4660 content=post_content, 4661 message_type="post", 4662 message_id="m_post", 4663 mentions=[alice_mention], 4664 chat_id="oc_chat", 4665 parent_id=None, 4666 upper_message_id=None, 4667 thread_id=None, 4668 ) 4669 asyncio.run( 4670 adapter._process_inbound_message( 4671 data=message, message=message, sender_id=None, 4672 chat_type="group", message_id="m_post", 4673 ) 4674 ) 4675 event = adapter._dispatch_inbound_event.call_args.args[0] 4676 self.assertIn("[Mentioned: Alice (open_id=ou_alice)]", event.text) 4677 self.assertIn("@Alice lookup this doc", event.text) 4678 4679 def test_scenario_post_bot_plus_alice_filters_self_from_hint(self): 4680 """Post-type message @-ing both the bot and Alice: leading bot is 4681 stripped from the body, self is filtered from the [Mentioned: ...] 4682 hint, and Alice's real open_id is surfaced for the agent.""" 4683 adapter = self._build_adapter() 4684 bot_mention = SimpleNamespace( 4685 key="@_user_1", 4686 id=SimpleNamespace(open_id="ou_bot", user_id=""), 4687 name="Hermes", 4688 ) 4689 alice_mention = SimpleNamespace( 4690 key="@_user_2", 4691 id=SimpleNamespace(open_id="ou_alice", user_id=""), 4692 name="Alice", 4693 ) 4694 post_content = json.dumps({ 4695 "zh_cn": { 4696 "content": [[ 4697 {"tag": "at", "user_id": "@_user_1", "user_name": "Hermes"}, 4698 {"tag": "at", "user_id": "@_user_2", "user_name": "Alice"}, 4699 {"tag": "text", "text": " review the spec with Alice"}, 4700 ]] 4701 } 4702 }) 4703 message = SimpleNamespace( 4704 content=post_content, 4705 message_type="post", 4706 message_id="m_post_both", 4707 mentions=[bot_mention, alice_mention], 4708 chat_id="oc_chat", 4709 parent_id=None, 4710 upper_message_id=None, 4711 thread_id=None, 4712 ) 4713 asyncio.run( 4714 adapter._process_inbound_message( 4715 data=message, message=message, sender_id=None, 4716 chat_type="group", message_id="m_post_both", 4717 ) 4718 ) 4719 event = adapter._dispatch_inbound_event.call_args.args[0] 4720 # Hint surfaces Alice; bot excluded because is_self=True. 4721 self.assertIn("[Mentioned: Alice (open_id=ou_alice)]", event.text) 4722 self.assertNotIn("Hermes (open_id=", event.text) 4723 # Body: leading @Hermes stripped, Alice preserved, trailing text intact. 4724 self.assertIn("@Alice review the spec with Alice", event.text) 4725 self.assertNotIn("@Hermes @Alice", event.text)