feishu_helpers.py
1 """Shared fixtures for Feishu adapter tests (admission, group policy, dispatch).""" 2 3 from __future__ import annotations 4 5 import threading 6 from types import SimpleNamespace 7 from typing import Any, Optional 8 9 10 def make_sender(sender_type: str = "user", open_id: str = "ou_human", 11 user_id: Optional[str] = None, union_id: Optional[str] = None) -> Any: 12 return SimpleNamespace( 13 sender_type=sender_type, 14 sender_id=SimpleNamespace(open_id=open_id, user_id=user_id, union_id=union_id), 15 ) 16 17 18 def make_message(message_id: str = "om_xxx", chat_type: str = "p2p", 19 chat_id: str = "oc_1", mentions: Optional[list] = None) -> Any: 20 return SimpleNamespace( 21 message_id=message_id, 22 chat_type=chat_type, 23 chat_id=chat_id, 24 mentions=mentions, 25 content="", 26 message_type="text", 27 ) 28 29 30 def make_adapter_skeleton( 31 *, 32 bot_open_id: str = "ou_me", 33 bot_user_id: str = "", 34 allow_bots: str = "none", 35 require_mention: bool = True, 36 group_policy: str = "allowlist", 37 ) -> Any: 38 from gateway.platforms.feishu import FeishuAdapter 39 40 adapter = object.__new__(FeishuAdapter) 41 adapter._bot_open_id = bot_open_id 42 adapter._bot_user_id = bot_user_id 43 adapter._bot_name = "" 44 adapter._app_id = "" 45 adapter._admins = set() 46 adapter._group_rules = {} 47 adapter._group_policy = group_policy 48 adapter._default_group_policy = group_policy 49 adapter._allowed_group_users = frozenset() 50 adapter._allow_bots = allow_bots 51 adapter._require_mention = require_mention 52 return adapter 53 54 55 def install_dedup_state(adapter: Any, seen: Optional[dict] = None) -> None: 56 adapter._seen_message_ids = dict(seen) if seen else {} 57 adapter._seen_message_order = list((seen or {}).keys()) 58 adapter._dedup_cache_size = 100 59 adapter._dedup_lock = threading.Lock() 60 adapter._dedup_state_path = None 61 adapter._persist_seen_message_ids = lambda: None 62 63 64 def stub_mention(adapter: Any, mentions_self: bool) -> None: 65 adapter._mentions_self = lambda _message: mentions_self