send_message_tool.py
1 """Send Message Tool -- cross-channel messaging via platform APIs. 2 3 Sends a message to a user or channel on any connected messaging platform 4 (Telegram, Discord, Slack). Supports listing available targets and resolving 5 human-friendly channel names to IDs. Works in both CLI and gateway contexts. 6 """ 7 8 import asyncio 9 import json 10 import logging 11 import os 12 import re 13 import ssl 14 import time 15 from email.utils import formatdate 16 from typing import Dict, Optional 17 18 from agent.redact import redact_sensitive_text 19 20 logger = logging.getLogger(__name__) 21 22 _TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$") 23 _FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::([-A-Za-z0-9_]+))?\s*$") 24 # Slack conversation IDs: C (public channel), G (private/group channel), D (DM). 25 # Must be uppercase alphanumeric, 9+ chars. User IDs (U...) and workspace IDs 26 # (W...) are NOT valid chat.postMessage channel values — posting to them fails 27 # because the API requires a conversation ID. To DM a user you must first call 28 # conversations.open to obtain a D... ID. Without this gate, Slack IDs fall 29 # through to channel-name resolution, which only matches by name and fails. 30 _SLACK_TARGET_RE = re.compile(r"^\s*([CGD][A-Z0-9]{8,})\s*$") 31 _WEIXIN_TARGET_RE = re.compile(r"^\s*((?:wxid|gh|v\d+|wm|wb)_[A-Za-z0-9_-]+|[A-Za-z0-9._-]+@chatroom|filehelper)\s*$") 32 _YUANBAO_TARGET_RE = re.compile(r"^\s*((?:group|direct):[^:]+)\s*$") 33 # Discord snowflake IDs are numeric, same regex pattern as Telegram topic targets. 34 _NUMERIC_TOPIC_RE = _TELEGRAM_TOPIC_TARGET_RE 35 # Platforms that address recipients by phone number and accept E.164 format 36 # (with a leading '+'). Without this, "+15551234567" fails the isdigit() check 37 # below and falls through to channel-name resolution, which has no way to 38 # resolve a raw phone number. Keeping the '+' preserves the E.164 form that 39 # downstream adapters (signal, etc.) expect. 40 _PHONE_PLATFORMS = frozenset({"signal", "sms", "whatsapp"}) 41 _E164_TARGET_RE = re.compile(r"^\s*\+(\d{7,15})\s*$") 42 _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"} 43 _VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"} 44 _AUDIO_EXTS = {".ogg", ".opus", ".mp3", ".wav", ".m4a", ".flac"} 45 _VOICE_EXTS = {".ogg", ".opus"} 46 # Telegram's Bot API sendAudio only accepts MP3 / M4A. Other audio 47 # formats either route through sendVoice (Opus/OGG) or fall back to 48 # document delivery. 49 _TELEGRAM_SEND_AUDIO_EXTS = {".mp3", ".m4a"} 50 _URL_SECRET_QUERY_RE = re.compile( 51 r"([?&](?:access_token|api[_-]?key|auth[_-]?token|token|signature|sig)=)([^&#\s]+)", 52 re.IGNORECASE, 53 ) 54 _GENERIC_SECRET_ASSIGN_RE = re.compile( 55 r"\b(access_token|api[_-]?key|auth[_-]?token|signature|sig)\s*=\s*([^\s,;]+)", 56 re.IGNORECASE, 57 ) 58 59 60 def _sanitize_error_text(text) -> str: 61 """Redact secrets from error text before surfacing it to users/models.""" 62 redacted = redact_sensitive_text(text) 63 redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}***", redacted) 64 redacted = _GENERIC_SECRET_ASSIGN_RE.sub(lambda m: f"{m.group(1)}=***", redacted) 65 return redacted 66 67 68 def _error(message: str) -> dict: 69 """Build a standardized error payload with redacted content.""" 70 return {"error": _sanitize_error_text(message)} 71 72 73 def _telegram_retry_delay(exc: Exception, attempt: int) -> float | None: 74 retry_after = getattr(exc, "retry_after", None) 75 if retry_after is not None: 76 try: 77 return max(float(retry_after), 0.0) 78 except (TypeError, ValueError): 79 return 1.0 80 81 text = str(exc).lower() 82 if "timed out" in text or "timeout" in text: 83 return None 84 if ( 85 "bad gateway" in text 86 or "502" in text 87 or "too many requests" in text 88 or "429" in text 89 or "service unavailable" in text 90 or "503" in text 91 or "gateway timeout" in text 92 or "504" in text 93 ): 94 return float(2 ** attempt) 95 return None 96 97 98 async def _send_telegram_message_with_retry(bot, *, attempts: int = 3, **kwargs): 99 for attempt in range(attempts): 100 try: 101 return await bot.send_message(**kwargs) 102 except Exception as exc: 103 delay = _telegram_retry_delay(exc, attempt) 104 if delay is None or attempt >= attempts - 1: 105 raise 106 logger.warning( 107 "Transient Telegram send failure (attempt %d/%d), retrying in %.1fs: %s", 108 attempt + 1, 109 attempts, 110 delay, 111 _sanitize_error_text(exc), 112 ) 113 await asyncio.sleep(delay) 114 115 116 SEND_MESSAGE_SCHEMA = { 117 "name": "send_message", 118 "description": ( 119 "Send a message to a connected messaging platform, or list available targets.\n\n" 120 "IMPORTANT: When the user asks to send to a specific channel or person " 121 "(not just a bare platform name), call send_message(action='list') FIRST to see " 122 "available targets, then send to the correct one.\n" 123 "If the user just says a platform name like 'send to telegram', send directly " 124 "to the home channel without listing first." 125 ), 126 "parameters": { 127 "type": "object", 128 "properties": { 129 "action": { 130 "type": "string", 131 "enum": ["send", "list"], 132 "description": "Action to perform. 'send' (default) sends a message. 'list' returns all available channels/contacts across connected platforms." 133 }, 134 "target": { 135 "type": "string", 136 "description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org', 'yuanbao:direct:<account_id>' (DM), 'yuanbao:group:<group_code>' (group chat)" 137 }, 138 "message": { 139 "type": "string", 140 "description": "The message text to send. To send an image or file, include MEDIA:<local_path> (e.g. 'MEDIA:/tmp/hermes/cache/img_xxx.jpg') in the message — the platform will deliver it as a native media attachment." 141 } 142 }, 143 "required": [] 144 } 145 } 146 147 148 def send_message_tool(args, **kw): 149 """Handle cross-channel send_message tool calls.""" 150 action = args.get("action", "send") 151 152 if action == "list": 153 return _handle_list() 154 155 return _handle_send(args) 156 157 158 def _handle_list(): 159 """Return formatted list of available messaging targets.""" 160 try: 161 from gateway.channel_directory import format_directory_for_display 162 return json.dumps({"targets": format_directory_for_display()}) 163 except Exception as e: 164 return json.dumps(_error(f"Failed to load channel directory: {e}")) 165 166 167 def _handle_send(args): 168 """Send a message to a platform target.""" 169 target = args.get("target", "") 170 message = args.get("message", "") 171 if not target or not message: 172 return tool_error("Both 'target' and 'message' are required when action='send'") 173 174 parts = target.split(":", 1) 175 platform_name = parts[0].strip().lower() 176 target_ref = parts[1].strip() if len(parts) > 1 else None 177 chat_id = None 178 thread_id = None 179 180 if target_ref: 181 chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref) 182 else: 183 is_explicit = False 184 185 # Resolve human-friendly channel names to numeric IDs 186 if target_ref and not is_explicit: 187 try: 188 from gateway.channel_directory import resolve_channel_name 189 resolved = resolve_channel_name(platform_name, target_ref) 190 if resolved: 191 chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved) 192 else: 193 return json.dumps({ 194 "error": f"Could not resolve '{target_ref}' on {platform_name}. " 195 f"Use send_message(action='list') to see available targets." 196 }) 197 except Exception: 198 return json.dumps({ 199 "error": f"Could not resolve '{target_ref}' on {platform_name}. " 200 f"Try using a numeric channel ID instead." 201 }) 202 203 from tools.interrupt import is_interrupted 204 if is_interrupted(): 205 return tool_error("Interrupted") 206 207 try: 208 from gateway.config import load_gateway_config, Platform 209 config = load_gateway_config() 210 except Exception as e: 211 return json.dumps(_error(f"Failed to load gateway config: {e}")) 212 213 # Accept any platform name — built-in names resolve to their enum 214 # member, plugin platform names create dynamic members via _missing_(). 215 try: 216 platform = Platform(platform_name) 217 except (ValueError, KeyError): 218 return tool_error(f"Unknown platform: {platform_name}") 219 220 pconfig = config.platforms.get(platform) 221 if not pconfig or not pconfig.enabled: 222 # Weixin can be configured purely via .env; synthesize a pconfig so 223 # send_message and cron delivery work without a gateway.yaml entry. 224 if platform_name == "weixin": 225 wx_token = os.getenv("WEIXIN_TOKEN", "").strip() 226 wx_account = os.getenv("WEIXIN_ACCOUNT_ID", "").strip() 227 if wx_token and wx_account: 228 from gateway.config import PlatformConfig 229 pconfig = PlatformConfig( 230 enabled=True, 231 token=wx_token, 232 extra={ 233 "account_id": wx_account, 234 "base_url": os.getenv("WEIXIN_BASE_URL", "").strip(), 235 "cdn_base_url": os.getenv("WEIXIN_CDN_BASE_URL", "").strip(), 236 }, 237 ) 238 else: 239 return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.") 240 else: 241 return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.") 242 243 from gateway.platforms.base import BasePlatformAdapter 244 245 media_files, cleaned_message = BasePlatformAdapter.extract_media(message) 246 mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files) 247 248 used_home_channel = False 249 if not chat_id: 250 home = config.get_home_channel(platform) 251 if not home and platform_name == "weixin": 252 wx_home = os.getenv("WEIXIN_HOME_CHANNEL", "").strip() 253 if wx_home: 254 from gateway.config import HomeChannel 255 home = HomeChannel(platform=platform, chat_id=wx_home, name="Weixin Home") 256 if home: 257 chat_id = home.chat_id 258 used_home_channel = True 259 else: 260 return json.dumps({ 261 "error": f"No home channel set for {platform_name} to determine where to send the message. " 262 f"Either specify a channel directly with '{platform_name}:CHANNEL_NAME', " 263 f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>" 264 }) 265 266 duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id) 267 if duplicate_skip: 268 return json.dumps(duplicate_skip) 269 270 try: 271 from model_tools import _run_async 272 result = _run_async( 273 _send_to_platform( 274 platform, 275 pconfig, 276 chat_id, 277 cleaned_message, 278 thread_id=thread_id, 279 media_files=media_files, 280 ) 281 ) 282 if used_home_channel and isinstance(result, dict) and result.get("success"): 283 result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})" 284 285 # Mirror the sent message into the target's gateway session 286 if isinstance(result, dict) and result.get("success") and mirror_text: 287 try: 288 from gateway.mirror import mirror_to_session 289 from gateway.session_context import get_session_env 290 source_label = get_session_env("HERMES_SESSION_PLATFORM", "cli") 291 user_id = get_session_env("HERMES_SESSION_USER_ID", "") or None 292 if mirror_to_session( 293 platform_name, 294 chat_id, 295 mirror_text, 296 source_label=source_label, 297 thread_id=thread_id, 298 user_id=user_id, 299 ): 300 result["mirrored"] = True 301 except Exception: 302 pass 303 304 if isinstance(result, dict) and "error" in result: 305 result["error"] = _sanitize_error_text(result["error"]) 306 return json.dumps(result) 307 except Exception as e: 308 return json.dumps(_error(f"Send failed: {e}")) 309 310 311 def _parse_target_ref(platform_name: str, target_ref: str): 312 """Parse a tool target into chat_id/thread_id and whether it is explicit.""" 313 if platform_name == "telegram": 314 match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref) 315 if match: 316 return match.group(1), match.group(2), True 317 if platform_name == "feishu": 318 match = _FEISHU_TARGET_RE.fullmatch(target_ref) 319 if match: 320 return match.group(1), match.group(2), True 321 if platform_name == "discord": 322 match = _NUMERIC_TOPIC_RE.fullmatch(target_ref) 323 if match: 324 return match.group(1), match.group(2), True 325 if platform_name == "slack": 326 match = _SLACK_TARGET_RE.fullmatch(target_ref) 327 if match: 328 return match.group(1), None, True 329 if platform_name == "weixin": 330 match = _WEIXIN_TARGET_RE.fullmatch(target_ref) 331 if match: 332 return match.group(1), None, True 333 if platform_name == "yuanbao": 334 match = _YUANBAO_TARGET_RE.fullmatch(target_ref) 335 if match: 336 return match.group(1), None, True 337 if target_ref.strip().isdigit(): 338 return f"group:{target_ref.strip()}", None, True 339 return None, None, False 340 if platform_name in _PHONE_PLATFORMS: 341 match = _E164_TARGET_RE.fullmatch(target_ref) 342 if match: 343 # Preserve the leading '+' — signal-cli and sms/whatsapp adapters 344 # expect E.164 format for direct recipients. 345 return target_ref.strip(), None, True 346 if target_ref.lstrip("-").isdigit(): 347 return target_ref, None, True 348 # Matrix room IDs (start with !) and user IDs (start with @) are explicit 349 if platform_name == "matrix" and (target_ref.startswith("!") or target_ref.startswith("@")): 350 return target_ref, None, True 351 return None, None, False 352 353 354 def _describe_media_for_mirror(media_files): 355 """Return a human-readable mirror summary when a message only contains media.""" 356 if not media_files: 357 return "" 358 if len(media_files) == 1: 359 media_path, is_voice = media_files[0] 360 ext = os.path.splitext(media_path)[1].lower() 361 if is_voice and ext in _VOICE_EXTS: 362 return "[Sent voice message]" 363 if ext in _IMAGE_EXTS: 364 return "[Sent image attachment]" 365 if ext in _VIDEO_EXTS: 366 return "[Sent video attachment]" 367 if ext in _AUDIO_EXTS: 368 return "[Sent audio attachment]" 369 return "[Sent document attachment]" 370 return f"[Sent {len(media_files)} media attachments]" 371 372 373 def _get_cron_auto_delivery_target(): 374 """Return the cron scheduler's auto-delivery target for the current run, if any.""" 375 from gateway.session_context import get_session_env 376 platform = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower() 377 chat_id = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip() 378 if not platform or not chat_id: 379 return None 380 thread_id = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None 381 return { 382 "platform": platform, 383 "chat_id": chat_id, 384 "thread_id": thread_id, 385 } 386 387 388 def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None): 389 """Skip redundant cron send_message calls when the scheduler will auto-deliver there.""" 390 auto_target = _get_cron_auto_delivery_target() 391 if not auto_target: 392 return None 393 394 same_target = ( 395 auto_target["platform"] == platform_name 396 and str(auto_target["chat_id"]) == str(chat_id) 397 and auto_target.get("thread_id") == thread_id 398 ) 399 if not same_target: 400 return None 401 402 target_label = f"{platform_name}:{chat_id}" 403 if thread_id is not None: 404 target_label += f":{thread_id}" 405 406 return { 407 "success": True, 408 "skipped": True, 409 "reason": "cron_auto_delivery_duplicate_target", 410 "target": target_label, 411 "note": ( 412 f"Skipped send_message to {target_label}. This cron job will already auto-deliver " 413 "its final response to that same target. Put the intended user-facing content in " 414 "your final response instead, or use a different target if you want an additional message." 415 ), 416 } 417 418 419 async def _send_via_adapter(platform, pconfig, chat_id, chunk): 420 """Send a message via a live gateway adapter (for plugin platforms). 421 422 Falls back to error if no adapter is connected for this platform. 423 """ 424 try: 425 from gateway.run import _gateway_runner_ref 426 runner = _gateway_runner_ref() 427 if runner: 428 adapter = runner.adapters.get(platform) 429 if adapter: 430 from gateway.platforms.base import SendResult 431 result = await adapter.send(chat_id=chat_id, content=chunk) 432 if result.success: 433 return {"success": True, "message_id": result.message_id} 434 return {"error": f"Adapter send failed: {result.error}"} 435 except Exception as e: 436 return {"error": f"Plugin platform send failed: {e}"} 437 return {"error": f"No live adapter for platform '{platform.value}'. Is the gateway running with this platform connected?"} 438 439 440 async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None): 441 """Route a message to the appropriate platform sender. 442 443 Long messages are automatically chunked to fit within platform limits 444 using the same smart-splitting algorithm as the gateway adapters 445 (preserves code-block boundaries, adds part indicators). 446 """ 447 from gateway.config import Platform 448 from gateway.platforms.base import BasePlatformAdapter, utf16_len 449 from gateway.platforms.discord import DiscordAdapter 450 from gateway.platforms.slack import SlackAdapter 451 452 # Telegram adapter import is optional (requires python-telegram-bot) 453 try: 454 from gateway.platforms.telegram import TelegramAdapter 455 _telegram_available = True 456 except ImportError: 457 _telegram_available = False 458 459 # Feishu adapter import is optional (requires lark-oapi) 460 try: 461 from gateway.platforms.feishu import FeishuAdapter 462 _feishu_available = True 463 except ImportError: 464 _feishu_available = False 465 466 media_files = media_files or [] 467 468 if platform == Platform.SLACK and message: 469 try: 470 slack_adapter = SlackAdapter.__new__(SlackAdapter) 471 message = slack_adapter.format_message(message) 472 except Exception: 473 logger.debug("Failed to apply Slack mrkdwn formatting in _send_to_platform", exc_info=True) 474 475 # Platform message length limits (from adapter class attributes) 476 _MAX_LENGTHS = { 477 Platform.TELEGRAM: TelegramAdapter.MAX_MESSAGE_LENGTH if _telegram_available else 4096, 478 Platform.DISCORD: DiscordAdapter.MAX_MESSAGE_LENGTH, 479 Platform.SLACK: SlackAdapter.MAX_MESSAGE_LENGTH, 480 } 481 if _feishu_available: 482 _MAX_LENGTHS[Platform.FEISHU] = FeishuAdapter.MAX_MESSAGE_LENGTH 483 484 # Check plugin registry for max_message_length 485 if platform not in _MAX_LENGTHS: 486 try: 487 from gateway.platform_registry import platform_registry 488 entry = platform_registry.get(platform.value) 489 if entry and entry.max_message_length > 0: 490 _MAX_LENGTHS[platform] = entry.max_message_length 491 except Exception: 492 pass 493 494 # Smart-chunk the message to fit within platform limits. 495 # For short messages or platforms without a known limit this is a no-op. 496 # Telegram measures length in UTF-16 code units, not Unicode codepoints. 497 max_len = _MAX_LENGTHS.get(platform) 498 if max_len: 499 _len_fn = utf16_len if platform == Platform.TELEGRAM else None 500 chunks = BasePlatformAdapter.truncate_message(message, max_len, len_fn=_len_fn) 501 else: 502 chunks = [message] 503 504 # --- Telegram: special handling for media attachments --- 505 if platform == Platform.TELEGRAM: 506 last_result = None 507 disable_link_previews = bool(getattr(pconfig, "extra", {}) and pconfig.extra.get("disable_link_previews")) 508 for i, chunk in enumerate(chunks): 509 is_last = (i == len(chunks) - 1) 510 result = await _send_telegram( 511 pconfig.token, 512 chat_id, 513 chunk, 514 media_files=media_files if is_last else [], 515 thread_id=thread_id, 516 disable_link_previews=disable_link_previews, 517 ) 518 if isinstance(result, dict) and result.get("error"): 519 return result 520 last_result = result 521 return last_result 522 523 # --- Weixin: use the native one-shot adapter helper for text + media --- 524 if platform == Platform.WEIXIN: 525 return await _send_weixin(pconfig, chat_id, message, media_files=media_files) 526 527 # --- Discord: special handling for media attachments --- 528 if platform == Platform.DISCORD: 529 last_result = None 530 for i, chunk in enumerate(chunks): 531 is_last = (i == len(chunks) - 1) 532 result = await _send_discord( 533 pconfig.token, 534 chat_id, 535 chunk, 536 media_files=media_files if is_last else [], 537 thread_id=thread_id, 538 ) 539 if isinstance(result, dict) and result.get("error"): 540 return result 541 last_result = result 542 return last_result 543 544 # --- Matrix: use the native adapter helper when media is present --- 545 if platform == Platform.MATRIX and media_files: 546 last_result = None 547 for i, chunk in enumerate(chunks): 548 is_last = (i == len(chunks) - 1) 549 result = await _send_matrix_via_adapter( 550 pconfig, 551 chat_id, 552 chunk, 553 media_files=media_files if is_last else [], 554 thread_id=thread_id, 555 ) 556 if isinstance(result, dict) and result.get("error"): 557 return result 558 last_result = result 559 return last_result 560 561 # --- Signal: native attachment support via JSON-RPC attachments param --- 562 if platform == Platform.SIGNAL and media_files: 563 last_result = None 564 for i, chunk in enumerate(chunks): 565 is_last = (i == len(chunks) - 1) 566 result = await _send_signal( 567 pconfig.extra, 568 chat_id, 569 chunk, 570 media_files=media_files if is_last else [], 571 ) 572 if isinstance(result, dict) and result.get("error"): 573 return result 574 last_result = result 575 return last_result 576 577 # --- Yuanbao: native media attachment support via running gateway adapter --- 578 if platform == Platform.YUANBAO and media_files: 579 last_result = None 580 for i, chunk in enumerate(chunks): 581 is_last = (i == len(chunks) - 1) 582 result = await _send_yuanbao( 583 chat_id, 584 chunk, 585 media_files=media_files if is_last else None, 586 ) 587 if isinstance(result, dict) and result.get("error"): 588 return result 589 last_result = result 590 return last_result 591 592 # --- Feishu: native media attachment support via adapter --- 593 if platform == Platform.FEISHU and media_files: 594 last_result = None 595 for i, chunk in enumerate(chunks): 596 is_last = (i == len(chunks) - 1) 597 result = await _send_feishu( 598 pconfig, 599 chat_id, 600 chunk, 601 media_files=media_files if is_last else None, 602 thread_id=thread_id, 603 ) 604 if isinstance(result, dict) and result.get("error"): 605 return result 606 last_result = result 607 return last_result 608 609 # --- Non-media platforms --- 610 if media_files and not message.strip(): 611 return { 612 "error": ( 613 f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, signal, yuanbao and feishu; " 614 f"target {platform.value} had only media attachments" 615 ) 616 } 617 warning = None 618 if media_files: 619 warning = ( 620 f"MEDIA attachments were omitted for {platform.value}; " 621 "native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, signal, yuanbao and feishu" 622 ) 623 624 last_result = None 625 for chunk in chunks: 626 if platform == Platform.SLACK: 627 result = await _send_slack(pconfig.token, chat_id, chunk) 628 elif platform == Platform.WHATSAPP: 629 result = await _send_whatsapp(pconfig.extra, chat_id, chunk) 630 elif platform == Platform.SIGNAL: 631 result = await _send_signal(pconfig.extra, chat_id, chunk) 632 elif platform == Platform.EMAIL: 633 result = await _send_email(pconfig.extra, chat_id, chunk) 634 elif platform == Platform.SMS: 635 result = await _send_sms(pconfig.api_key, chat_id, chunk) 636 elif platform == Platform.MATTERMOST: 637 result = await _send_mattermost(pconfig.token, pconfig.extra, chat_id, chunk) 638 elif platform == Platform.MATRIX: 639 result = await _send_matrix(pconfig.token, pconfig.extra, chat_id, chunk) 640 elif platform == Platform.HOMEASSISTANT: 641 result = await _send_homeassistant(pconfig.token, pconfig.extra, chat_id, chunk) 642 elif platform == Platform.DINGTALK: 643 result = await _send_dingtalk(pconfig.extra, chat_id, chunk) 644 elif platform == Platform.FEISHU: 645 result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id) 646 elif platform == Platform.WECOM: 647 result = await _send_wecom(pconfig.extra, chat_id, chunk) 648 elif platform == Platform.BLUEBUBBLES: 649 result = await _send_bluebubbles(pconfig.extra, chat_id, chunk) 650 elif platform == Platform.QQBOT: 651 result = await _send_qqbot(pconfig, chat_id, chunk) 652 elif platform == Platform.YUANBAO: 653 result = await _send_yuanbao(chat_id, chunk) 654 else: 655 # Plugin platform — route through the gateway's live adapter 656 # if available, otherwise report the error. 657 result = await _send_via_adapter(platform, pconfig, chat_id, chunk) 658 659 if isinstance(result, dict) and result.get("error"): 660 return result 661 last_result = result 662 663 if warning and isinstance(last_result, dict) and last_result.get("success"): 664 warnings = list(last_result.get("warnings", [])) 665 warnings.append(warning) 666 last_result["warnings"] = warnings 667 return last_result 668 669 670 async def _send_telegram(token, chat_id, message, media_files=None, thread_id=None, disable_link_previews=False): 671 """Send via Telegram Bot API (one-shot, no polling needed). 672 673 Applies markdown→MarkdownV2 formatting (same as the gateway adapter) 674 so that bold, links, and headers render correctly. If the message 675 already contains HTML tags, it is sent with ``parse_mode='HTML'`` 676 instead, bypassing MarkdownV2 conversion. 677 """ 678 try: 679 from telegram import Bot 680 from telegram.constants import ParseMode 681 682 # Auto-detect HTML tags — if present, skip MarkdownV2 and send as HTML. 683 # Inspired by github.com/ashaney — PR #1568. 684 _has_html = bool(re.search(r'<[a-zA-Z/][^>]*>', message)) 685 686 if _has_html: 687 formatted = message 688 send_parse_mode = ParseMode.HTML 689 else: 690 # Reuse the gateway adapter's format_message for markdown→MarkdownV2 691 try: 692 from gateway.platforms.telegram import TelegramAdapter 693 _adapter = TelegramAdapter.__new__(TelegramAdapter) 694 formatted = _adapter.format_message(message) 695 except Exception: 696 # Fallback: send as-is if formatting unavailable 697 formatted = message 698 send_parse_mode = ParseMode.MARKDOWN_V2 699 700 bot = Bot(token=token) 701 int_chat_id = int(chat_id) 702 media_files = media_files or [] 703 thread_kwargs = {} 704 if thread_id is not None: 705 thread_kwargs["message_thread_id"] = int(thread_id) 706 if disable_link_previews: 707 thread_kwargs["disable_web_page_preview"] = True 708 709 last_msg = None 710 warnings = [] 711 712 if formatted.strip(): 713 try: 714 last_msg = await _send_telegram_message_with_retry( 715 bot, 716 chat_id=int_chat_id, text=formatted, 717 parse_mode=send_parse_mode, **thread_kwargs 718 ) 719 except Exception as md_error: 720 # Parse failed, fall back to plain text 721 if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower() or "html" in str(md_error).lower(): 722 logger.warning( 723 "Parse mode %s failed in _send_telegram, falling back to plain text: %s", 724 send_parse_mode, 725 _sanitize_error_text(md_error), 726 ) 727 if not _has_html: 728 try: 729 from gateway.platforms.telegram import _strip_mdv2 730 plain = _strip_mdv2(formatted) 731 except Exception: 732 plain = message 733 else: 734 plain = message 735 last_msg = await _send_telegram_message_with_retry( 736 bot, 737 chat_id=int_chat_id, text=plain, 738 parse_mode=None, **thread_kwargs 739 ) 740 else: 741 raise 742 743 for media_path, is_voice in media_files: 744 if not os.path.exists(media_path): 745 warning = f"Media file not found, skipping: {media_path}" 746 logger.warning(warning) 747 warnings.append(warning) 748 continue 749 750 ext = os.path.splitext(media_path)[1].lower() 751 try: 752 with open(media_path, "rb") as f: 753 if ext in _IMAGE_EXTS: 754 last_msg = await bot.send_photo( 755 chat_id=int_chat_id, photo=f, **thread_kwargs 756 ) 757 elif ext in _VIDEO_EXTS: 758 last_msg = await bot.send_video( 759 chat_id=int_chat_id, video=f, **thread_kwargs 760 ) 761 elif ext in _VOICE_EXTS and is_voice: 762 last_msg = await bot.send_voice( 763 chat_id=int_chat_id, voice=f, **thread_kwargs 764 ) 765 elif ext in _TELEGRAM_SEND_AUDIO_EXTS: 766 last_msg = await bot.send_audio( 767 chat_id=int_chat_id, audio=f, **thread_kwargs 768 ) 769 else: 770 last_msg = await bot.send_document( 771 chat_id=int_chat_id, document=f, **thread_kwargs 772 ) 773 except Exception as e: 774 warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}") 775 logger.error(warning) 776 warnings.append(warning) 777 778 if last_msg is None: 779 error = "No deliverable text or media remained after processing MEDIA tags" 780 if warnings: 781 return {"error": error, "warnings": warnings} 782 return {"error": error} 783 784 result = { 785 "success": True, 786 "platform": "telegram", 787 "chat_id": chat_id, 788 "message_id": str(last_msg.message_id), 789 } 790 if warnings: 791 result["warnings"] = warnings 792 return result 793 except ImportError: 794 return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"} 795 except Exception as e: 796 return _error(f"Telegram send failed: {e}") 797 798 799 def _derive_forum_thread_name(message: str) -> str: 800 """Derive a thread name from the first line of the message, capped at 100 chars.""" 801 first_line = message.strip().split("\n", 1)[0].strip() 802 # Strip common markdown heading prefixes 803 first_line = first_line.lstrip("#").strip() 804 if not first_line: 805 first_line = "New Post" 806 return first_line[:100] 807 808 809 # Process-local cache for Discord channel-type probes. Avoids re-probing the 810 # same channel on every send when the directory cache has no entry (e.g. fresh 811 # install, or channel created after the last directory build). 812 _DISCORD_CHANNEL_TYPE_PROBE_CACHE: Dict[str, bool] = {} 813 814 815 def _remember_channel_is_forum(chat_id: str, is_forum: bool) -> None: 816 _DISCORD_CHANNEL_TYPE_PROBE_CACHE[str(chat_id)] = bool(is_forum) 817 818 819 def _probe_is_forum_cached(chat_id: str) -> Optional[bool]: 820 return _DISCORD_CHANNEL_TYPE_PROBE_CACHE.get(str(chat_id)) 821 822 823 async def _send_discord(token, chat_id, message, thread_id=None, media_files=None): 824 """Send a single message via Discord REST API (no websocket client needed). 825 826 Chunking is handled by _send_to_platform() before this is called. 827 828 When thread_id is provided, the message is sent directly to that thread 829 via the /channels/{thread_id}/messages endpoint. 830 831 Media files are uploaded one-by-one via multipart/form-data after the 832 text message is sent (same pattern as Telegram). 833 834 Forum channels (type 15) reject POST /messages — a thread post is created 835 automatically via POST /channels/{id}/threads. Media files are uploaded 836 as multipart attachments on the starter message of the new thread. 837 838 Channel type is resolved from the channel directory first, then a 839 process-local probe cache, and only as a last resort with a live 840 GET /channels/{id} probe (whose result is memoized). 841 """ 842 try: 843 import aiohttp 844 except ImportError: 845 return {"error": "aiohttp not installed. Run: pip install aiohttp"} 846 try: 847 from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp 848 _proxy = resolve_proxy_url(platform_env_var="DISCORD_PROXY") 849 _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy) 850 auth_headers = {"Authorization": f"Bot {token}"} 851 json_headers = {**auth_headers, "Content-Type": "application/json"} 852 media_files = media_files or [] 853 last_data = None 854 warnings = [] 855 856 # Thread endpoint: Discord threads are channels; send directly to the thread ID. 857 if thread_id: 858 url = f"https://discord.com/api/v10/channels/{thread_id}/messages" 859 else: 860 # Check if the target channel is a forum channel (type 15). 861 # Forum channels reject POST /messages — create a thread post instead. 862 # Three-layer detection: directory cache → process-local probe 863 # cache → GET /channels/{id} probe (with result memoized). 864 _channel_type = None 865 try: 866 from gateway.channel_directory import lookup_channel_type 867 _channel_type = lookup_channel_type("discord", chat_id) 868 except Exception: 869 pass 870 871 if _channel_type == "forum": 872 is_forum = True 873 elif _channel_type is not None: 874 is_forum = False 875 else: 876 cached = _probe_is_forum_cached(chat_id) 877 if cached is not None: 878 is_forum = cached 879 else: 880 is_forum = False 881 try: 882 info_url = f"https://discord.com/api/v10/channels/{chat_id}" 883 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), **_sess_kw) as info_sess: 884 async with info_sess.get(info_url, headers=json_headers, **_req_kw) as info_resp: 885 if info_resp.status == 200: 886 info = await info_resp.json() 887 is_forum = info.get("type") == 15 888 _remember_channel_is_forum(chat_id, is_forum) 889 except Exception: 890 logger.debug("Failed to probe channel type for %s", chat_id, exc_info=True) 891 892 if is_forum: 893 thread_name = _derive_forum_thread_name(message) 894 thread_url = f"https://discord.com/api/v10/channels/{chat_id}/threads" 895 896 # Filter to readable media files up front so we can pick the 897 # right code path (JSON vs multipart) before opening a session. 898 valid_media = [] 899 for media_path, _is_voice in media_files: 900 if not os.path.exists(media_path): 901 warning = f"Media file not found, skipping: {media_path}" 902 logger.warning(warning) 903 warnings.append(warning) 904 continue 905 valid_media.append(media_path) 906 907 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60), **_sess_kw) as session: 908 if valid_media: 909 # Multipart: payload_json + files[N] creates a forum 910 # thread with the starter message plus attachments in 911 # a single API call. 912 attachments_meta = [ 913 {"id": str(idx), "filename": os.path.basename(path)} 914 for idx, path in enumerate(valid_media) 915 ] 916 starter_message = {"content": message, "attachments": attachments_meta} 917 payload_json = json.dumps({"name": thread_name, "message": starter_message}) 918 919 form = aiohttp.FormData() 920 form.add_field("payload_json", payload_json, content_type="application/json") 921 922 # Buffer file bytes up front — aiohttp's FormData can 923 # read lazily and we don't want handles closing under 924 # it on retry. 925 try: 926 for idx, media_path in enumerate(valid_media): 927 with open(media_path, "rb") as fh: 928 form.add_field( 929 f"files[{idx}]", 930 fh.read(), 931 filename=os.path.basename(media_path), 932 ) 933 async with session.post(thread_url, headers=auth_headers, data=form, **_req_kw) as resp: 934 if resp.status not in (200, 201): 935 body = await resp.text() 936 return _error(f"Discord forum thread creation error ({resp.status}): {body}") 937 data = await resp.json() 938 except Exception as e: 939 return _error(_sanitize_error_text(f"Discord forum thread upload failed: {e}")) 940 else: 941 # No media — simple JSON POST creates the thread with 942 # just the text starter. 943 async with session.post( 944 thread_url, 945 headers=json_headers, 946 json={ 947 "name": thread_name, 948 "message": {"content": message}, 949 }, 950 **_req_kw, 951 ) as resp: 952 if resp.status not in (200, 201): 953 body = await resp.text() 954 return _error(f"Discord forum thread creation error ({resp.status}): {body}") 955 data = await resp.json() 956 957 thread_id_created = data.get("id") 958 starter_msg_id = (data.get("message") or {}).get("id", thread_id_created) 959 result = { 960 "success": True, 961 "platform": "discord", 962 "chat_id": chat_id, 963 "thread_id": thread_id_created, 964 "message_id": starter_msg_id, 965 } 966 if warnings: 967 result["warnings"] = warnings 968 return result 969 970 url = f"https://discord.com/api/v10/channels/{chat_id}/messages" 971 972 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session: 973 # Send text message (skip if empty and media is present) 974 if message.strip() or not media_files: 975 async with session.post(url, headers=json_headers, json={"content": message}, **_req_kw) as resp: 976 if resp.status not in (200, 201): 977 body = await resp.text() 978 return _error(f"Discord API error ({resp.status}): {body}") 979 last_data = await resp.json() 980 981 # Send each media file as a separate multipart upload 982 for media_path, _is_voice in media_files: 983 if not os.path.exists(media_path): 984 warning = f"Media file not found, skipping: {media_path}" 985 logger.warning(warning) 986 warnings.append(warning) 987 continue 988 try: 989 form = aiohttp.FormData() 990 filename = os.path.basename(media_path) 991 with open(media_path, "rb") as f: 992 form.add_field("files[0]", f, filename=filename) 993 async with session.post(url, headers=auth_headers, data=form, **_req_kw) as resp: 994 if resp.status not in (200, 201): 995 body = await resp.text() 996 warning = _sanitize_error_text(f"Failed to send media {media_path}: Discord API error ({resp.status}): {body}") 997 logger.error(warning) 998 warnings.append(warning) 999 continue 1000 last_data = await resp.json() 1001 except Exception as e: 1002 warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}") 1003 logger.error(warning) 1004 warnings.append(warning) 1005 1006 if last_data is None: 1007 error = "No deliverable text or media remained after processing" 1008 if warnings: 1009 return {"error": error, "warnings": warnings} 1010 return {"error": error} 1011 1012 result = {"success": True, "platform": "discord", "chat_id": chat_id, "message_id": last_data.get("id")} 1013 if warnings: 1014 result["warnings"] = warnings 1015 return result 1016 except Exception as e: 1017 return _error(f"Discord send failed: {e}") 1018 1019 1020 async def _send_slack(token, chat_id, message): 1021 """Send via Slack Web API.""" 1022 try: 1023 import aiohttp 1024 except ImportError: 1025 return {"error": "aiohttp not installed. Run: pip install aiohttp"} 1026 try: 1027 from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp 1028 _proxy = resolve_proxy_url() 1029 _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy) 1030 url = "https://slack.com/api/chat.postMessage" 1031 headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} 1032 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session: 1033 payload = {"channel": chat_id, "text": message, "mrkdwn": True} 1034 async with session.post(url, headers=headers, json=payload, **_req_kw) as resp: 1035 data = await resp.json() 1036 if data.get("ok"): 1037 return {"success": True, "platform": "slack", "chat_id": chat_id, "message_id": data.get("ts")} 1038 return _error(f"Slack API error: {data.get('error', 'unknown')}") 1039 except Exception as e: 1040 return _error(f"Slack send failed: {e}") 1041 1042 1043 async def _send_whatsapp(extra, chat_id, message): 1044 """Send via the local WhatsApp bridge HTTP API.""" 1045 try: 1046 import aiohttp 1047 except ImportError: 1048 return {"error": "aiohttp not installed. Run: pip install aiohttp"} 1049 try: 1050 bridge_port = extra.get("bridge_port", 3000) 1051 async with aiohttp.ClientSession() as session: 1052 async with session.post( 1053 f"http://localhost:{bridge_port}/send", 1054 json={"chatId": chat_id, "message": message}, 1055 timeout=aiohttp.ClientTimeout(total=30), 1056 ) as resp: 1057 if resp.status == 200: 1058 data = await resp.json() 1059 return { 1060 "success": True, 1061 "platform": "whatsapp", 1062 "chat_id": chat_id, 1063 "message_id": data.get("messageId"), 1064 } 1065 body = await resp.text() 1066 return _error(f"WhatsApp bridge error ({resp.status}): {body}") 1067 except Exception as e: 1068 return _error(f"WhatsApp send failed: {e}") 1069 1070 1071 async def _send_signal(extra, chat_id, message, media_files=None): 1072 """Send via signal-cli JSON-RPC API. 1073 1074 Supports both text-only and text-with-attachments (images/audio/documents). 1075 Multi-attachment sends are chunked into batches of 1076 SIGNAL_MAX_ATTACHMENTS_PER_MSG and metered by the process-wide 1077 SignalAttachmentScheduler — same bucket the gateway adapter uses, so 1078 sends from this tool and inbound-driven replies share rate-limit state. 1079 """ 1080 try: 1081 import httpx 1082 except ImportError: 1083 return {"error": "httpx not installed"} 1084 1085 from gateway.platforms.signal_rate_limit import ( 1086 SIGNAL_BATCH_PACING_NOTICE_THRESHOLD, 1087 SIGNAL_MAX_ATTACHMENTS_PER_MSG, 1088 SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, 1089 _extract_retry_after_seconds, 1090 _format_wait, 1091 _is_signal_rate_limit_error, 1092 _signal_send_timeout, 1093 get_scheduler, 1094 ) 1095 1096 try: 1097 http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/") 1098 account = extra.get("account", "") 1099 if not account: 1100 return {"error": "Signal account not configured"} 1101 1102 valid_media = media_files or [] 1103 attachment_paths = [] 1104 for media_path, _is_voice in valid_media: 1105 if os.path.exists(media_path): 1106 attachment_paths.append(media_path) 1107 else: 1108 logger.warning("Signal media file not found, skipping: %s", media_path) 1109 1110 # Chunk attachments. With no attachments we still emit one batch 1111 # (text only). With attachments, the text rides on batch #0 so the 1112 # caption isn't repeated across every chunk. 1113 if attachment_paths: 1114 att_batches = [ 1115 attachment_paths[i:i + SIGNAL_MAX_ATTACHMENTS_PER_MSG] 1116 for i in range(0, len(attachment_paths), SIGNAL_MAX_ATTACHMENTS_PER_MSG) 1117 ] 1118 else: 1119 att_batches = [[]] 1120 1121 async def _post(batch_attachments, batch_message): 1122 params = {"account": account, "message": batch_message} 1123 if chat_id.startswith("group:"): 1124 params["groupId"] = chat_id[6:] 1125 else: 1126 params["recipient"] = [chat_id] 1127 if batch_attachments: 1128 params["attachments"] = batch_attachments 1129 1130 payload = { 1131 "jsonrpc": "2.0", 1132 "method": "send", 1133 "params": params, 1134 "id": f"send_{int(time.time() * 1000)}", 1135 } 1136 timeout = _signal_send_timeout(len(batch_attachments) if batch_attachments else 0) 1137 async with httpx.AsyncClient(timeout=timeout) as client: 1138 resp = await client.post(f"{http_url}/api/v1/rpc", json=payload) 1139 resp.raise_for_status() 1140 return resp.json() 1141 1142 async def _send_inline_notice(text: str) -> None: 1143 """Best-effort one-shot RPC for a user-facing pacing notice.""" 1144 notice_params = {"account": account, "message": text} 1145 if chat_id.startswith("group:"): 1146 notice_params["groupId"] = chat_id[6:] 1147 else: 1148 notice_params["recipient"] = [chat_id] 1149 try: 1150 async with httpx.AsyncClient(timeout=30.0) as _client: 1151 await _client.post( 1152 f"{http_url}/api/v1/rpc", 1153 json={ 1154 "jsonrpc": "2.0", 1155 "method": "send", 1156 "params": notice_params, 1157 "id": f"notice_{int(time.time() * 1000)}", 1158 }, 1159 ) 1160 except Exception as _e: 1161 logger.warning("Signal: inline notice failed: %s", _e) 1162 1163 scheduler = get_scheduler() 1164 logger.info( 1165 "send_message Signal: scheduler state=%s, %d attachment(s) in %d batch(es)", 1166 scheduler.state(), len(attachment_paths), len(att_batches), 1167 ) 1168 failed_batches: list[int] = [] 1169 for idx, att_batch in enumerate(att_batches): 1170 n = len(att_batch) 1171 if n > 0: 1172 estimated = scheduler.estimate_wait(n) 1173 if estimated >= SIGNAL_BATCH_PACING_NOTICE_THRESHOLD: 1174 await _send_inline_notice( 1175 f"(More images coming — pausing ~{_format_wait(estimated)} " 1176 f"for Signal rate limit, batch {idx + 1}/{len(att_batches)}.)" 1177 ) 1178 1179 batch_message = message if idx == 0 else "" 1180 1181 for attempt in range(1, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS + 1): 1182 try: 1183 await scheduler.acquire(n) 1184 _rpc_t0 = time.monotonic() 1185 data = await _post(att_batch, batch_message) 1186 _rpc_duration = time.monotonic() - _rpc_t0 1187 if "error" not in data: 1188 await scheduler.report_rpc_duration(_rpc_duration, n) 1189 break 1190 1191 err = data["error"] 1192 1193 if not _is_signal_rate_limit_error(err): 1194 return _error(f"Signal RPC error on batch {idx + 1}/{len(att_batches)}: {err}") 1195 1196 server_retry_after = _extract_retry_after_seconds(err) 1197 scheduler.feedback(server_retry_after, n) 1198 1199 if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS: 1200 failed_batches.append(idx + 1) 1201 logger.error( 1202 "Signal: rate-limit retries exhausted on batch %d/%d " 1203 "(%d attachments lost, server retry_after=%s)", 1204 idx + 1, len(att_batches), n, 1205 f"{server_retry_after:.0f}s" if server_retry_after else "unknown", 1206 ) 1207 break 1208 logger.warning( 1209 "Signal: rate-limited on batch %d/%d " 1210 "(attempt %d/%d, server retry_after=%s); " 1211 "scheduler will pace the retry", 1212 idx + 1, len(att_batches), 1213 attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, 1214 f"{server_retry_after:.0f}s" if server_retry_after else "unknown", 1215 ) 1216 except Exception as e: 1217 if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS: 1218 failed_batches.append(idx + 1) 1219 logger.error( 1220 "Signal: send error on batch %d/%d after %d attempts: %s", 1221 idx + 1, len(att_batches), attempt, str(e) 1222 ) 1223 break 1224 logger.warning( 1225 "Signal: transient error on batch %d/%d (attempt %d/%d): %s; will retry", 1226 idx + 1, len(att_batches), attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, str(e) 1227 ) 1228 1229 warnings = [] 1230 if len(attachment_paths) < len(valid_media): 1231 warnings.append("Some media files were skipped (not found on disk)") 1232 if failed_batches: 1233 warnings.append( 1234 f"Signal rate-limited {len(failed_batches)} batch(es) " 1235 f"(#{', #'.join(str(b) for b in failed_batches)})" 1236 ) 1237 1238 if failed_batches and len(failed_batches) == len(att_batches): 1239 return _error( 1240 f"Signal: every batch ({len(att_batches)}) hit rate limit; " 1241 f"no attachments delivered" 1242 ) 1243 1244 result = {"success": True, "platform": "signal", "chat_id": chat_id} 1245 if warnings: 1246 result["warnings"] = warnings 1247 return result 1248 except Exception as e: 1249 return _error(f"Signal send failed: {e}") 1250 1251 1252 async def _send_email(extra, chat_id, message): 1253 """Send via SMTP (one-shot, no persistent connection needed).""" 1254 import smtplib 1255 from email.mime.text import MIMEText 1256 from email.utils import formatdate 1257 1258 address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "") 1259 password = os.getenv("EMAIL_PASSWORD", "") 1260 smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "") 1261 try: 1262 smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587")) 1263 except (ValueError, TypeError): 1264 smtp_port = 587 1265 1266 if not all([address, password, smtp_host]): 1267 return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"} 1268 1269 try: 1270 msg = MIMEText(message, "plain", "utf-8") 1271 msg["From"] = address 1272 msg["To"] = chat_id 1273 msg["Subject"] = "Hermes Agent" 1274 msg["Date"] = formatdate(localtime=True) 1275 1276 server = smtplib.SMTP(smtp_host, smtp_port) 1277 server.starttls(context=ssl.create_default_context()) 1278 server.login(address, password) 1279 server.send_message(msg) 1280 server.quit() 1281 return {"success": True, "platform": "email", "chat_id": chat_id} 1282 except Exception as e: 1283 return _error(f"Email send failed: {e}") 1284 1285 1286 async def _send_sms(auth_token, chat_id, message): 1287 """Send a single SMS via Twilio REST API. 1288 1289 Uses HTTP Basic auth (Account SID : Auth Token) and form-encoded POST. 1290 Chunking is handled by _send_to_platform() before this is called. 1291 """ 1292 try: 1293 import aiohttp 1294 except ImportError: 1295 return {"error": "aiohttp not installed. Run: pip install aiohttp"} 1296 1297 import base64 1298 1299 account_sid = os.getenv("TWILIO_ACCOUNT_SID", "") 1300 from_number = os.getenv("TWILIO_PHONE_NUMBER", "") 1301 if not account_sid or not auth_token or not from_number: 1302 return {"error": "SMS not configured (TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE_NUMBER required)"} 1303 1304 # Strip markdown — SMS renders it as literal characters 1305 message = re.sub(r"\*\*(.+?)\*\*", r"\1", message, flags=re.DOTALL) 1306 message = re.sub(r"\*(.+?)\*", r"\1", message, flags=re.DOTALL) 1307 message = re.sub(r"__(.+?)__", r"\1", message, flags=re.DOTALL) 1308 message = re.sub(r"_(.+?)_", r"\1", message, flags=re.DOTALL) 1309 message = re.sub(r"```[a-z]*\n?", "", message) 1310 message = re.sub(r"`(.+?)`", r"\1", message) 1311 message = re.sub(r"^#{1,6}\s+", "", message, flags=re.MULTILINE) 1312 message = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", message) 1313 message = re.sub(r"\n{3,}", "\n\n", message) 1314 message = message.strip() 1315 1316 try: 1317 from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp 1318 _proxy = resolve_proxy_url() 1319 _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy) 1320 creds = f"{account_sid}:{auth_token}" 1321 encoded = base64.b64encode(creds.encode("ascii")).decode("ascii") 1322 url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json" 1323 headers = {"Authorization": f"Basic {encoded}"} 1324 1325 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session: 1326 form_data = aiohttp.FormData() 1327 form_data.add_field("From", from_number) 1328 form_data.add_field("To", chat_id) 1329 form_data.add_field("Body", message) 1330 1331 async with session.post(url, data=form_data, headers=headers, **_req_kw) as resp: 1332 body = await resp.json() 1333 if resp.status >= 400: 1334 error_msg = body.get("message", str(body)) 1335 return _error(f"Twilio API error ({resp.status}): {error_msg}") 1336 msg_sid = body.get("sid", "") 1337 return {"success": True, "platform": "sms", "chat_id": chat_id, "message_id": msg_sid} 1338 except Exception as e: 1339 return _error(f"SMS send failed: {e}") 1340 1341 1342 async def _send_mattermost(token, extra, chat_id, message): 1343 """Send via Mattermost REST API.""" 1344 try: 1345 import aiohttp 1346 except ImportError: 1347 return {"error": "aiohttp not installed. Run: pip install aiohttp"} 1348 try: 1349 base_url = (extra.get("url") or os.getenv("MATTERMOST_URL", "")).rstrip("/") 1350 token = token or os.getenv("MATTERMOST_TOKEN", "") 1351 if not base_url or not token: 1352 return {"error": "Mattermost not configured (MATTERMOST_URL, MATTERMOST_TOKEN required)"} 1353 url = f"{base_url}/api/v4/posts" 1354 headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} 1355 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: 1356 async with session.post(url, headers=headers, json={"channel_id": chat_id, "message": message}) as resp: 1357 if resp.status not in (200, 201): 1358 body = await resp.text() 1359 return _error(f"Mattermost API error ({resp.status}): {body}") 1360 data = await resp.json() 1361 return {"success": True, "platform": "mattermost", "chat_id": chat_id, "message_id": data.get("id")} 1362 except Exception as e: 1363 return _error(f"Mattermost send failed: {e}") 1364 1365 1366 async def _send_matrix(token, extra, chat_id, message): 1367 """Send via Matrix Client-Server API. 1368 1369 Converts markdown to HTML for rich rendering in Matrix clients. 1370 Falls back to plain text if the ``markdown`` library is not installed. 1371 """ 1372 try: 1373 import aiohttp 1374 except ImportError: 1375 return {"error": "aiohttp not installed. Run: pip install aiohttp"} 1376 try: 1377 homeserver = (extra.get("homeserver") or os.getenv("MATRIX_HOMESERVER", "")).rstrip("/") 1378 token = token or os.getenv("MATRIX_ACCESS_TOKEN", "") 1379 if not homeserver or not token: 1380 return {"error": "Matrix not configured (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN required)"} 1381 txn_id = f"hermes_{int(time.time() * 1000)}_{os.urandom(4).hex()}" 1382 from urllib.parse import quote 1383 encoded_room = quote(chat_id, safe="") 1384 url = f"{homeserver}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}" 1385 headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} 1386 1387 # Build message payload with optional HTML formatted_body. 1388 payload = {"msgtype": "m.text", "body": message} 1389 try: 1390 import markdown as _md 1391 html = _md.markdown(message, extensions=["fenced_code", "tables"]) 1392 # Convert h1-h6 to bold for Element X compatibility. 1393 html = re.sub(r"<h[1-6]>(.*?)</h[1-6]>", r"<strong>\1</strong>", html) 1394 payload["format"] = "org.matrix.custom.html" 1395 payload["formatted_body"] = html 1396 except ImportError: 1397 pass 1398 1399 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: 1400 async with session.put(url, headers=headers, json=payload) as resp: 1401 if resp.status not in (200, 201): 1402 body = await resp.text() 1403 return _error(f"Matrix API error ({resp.status}): {body}") 1404 data = await resp.json() 1405 return {"success": True, "platform": "matrix", "chat_id": chat_id, "message_id": data.get("event_id")} 1406 except Exception as e: 1407 return _error(f"Matrix send failed: {e}") 1408 1409 1410 async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None, thread_id=None): 1411 """Send via the Matrix adapter so native Matrix media uploads are preserved.""" 1412 try: 1413 from gateway.platforms.matrix import MatrixAdapter 1414 except ImportError: 1415 return {"error": "Matrix dependencies not installed. Run: pip install 'mautrix[encryption]'"} 1416 1417 media_files = media_files or [] 1418 1419 try: 1420 adapter = MatrixAdapter(pconfig) 1421 connected = await adapter.connect() 1422 if not connected: 1423 return _error("Matrix connect failed") 1424 1425 metadata = {"thread_id": thread_id} if thread_id else None 1426 last_result = None 1427 1428 if message.strip(): 1429 last_result = await adapter.send(chat_id, message, metadata=metadata) 1430 if not last_result.success: 1431 return _error(f"Matrix send failed: {last_result.error}") 1432 1433 for media_path, is_voice in media_files: 1434 if not os.path.exists(media_path): 1435 return _error(f"Media file not found: {media_path}") 1436 1437 ext = os.path.splitext(media_path)[1].lower() 1438 if ext in _IMAGE_EXTS: 1439 last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata) 1440 elif ext in _VIDEO_EXTS: 1441 last_result = await adapter.send_video(chat_id, media_path, metadata=metadata) 1442 elif ext in _VOICE_EXTS and is_voice: 1443 last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) 1444 elif ext in _AUDIO_EXTS: 1445 last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) 1446 else: 1447 last_result = await adapter.send_document(chat_id, media_path, metadata=metadata) 1448 1449 if not last_result.success: 1450 return _error(f"Matrix media send failed: {last_result.error}") 1451 1452 if last_result is None: 1453 return {"error": "No deliverable text or media remained after processing MEDIA tags"} 1454 1455 return { 1456 "success": True, 1457 "platform": "matrix", 1458 "chat_id": chat_id, 1459 "message_id": last_result.message_id, 1460 } 1461 except Exception as e: 1462 return _error(f"Matrix send failed: {e}") 1463 finally: 1464 try: 1465 await adapter.disconnect() 1466 except Exception: 1467 pass 1468 1469 1470 async def _send_homeassistant(token, extra, chat_id, message): 1471 """Send via Home Assistant notify service.""" 1472 try: 1473 import aiohttp 1474 except ImportError: 1475 return {"error": "aiohttp not installed. Run: pip install aiohttp"} 1476 try: 1477 hass_url = (extra.get("url") or os.getenv("HASS_URL", "")).rstrip("/") 1478 token = token or os.getenv("HASS_TOKEN", "") 1479 if not hass_url or not token: 1480 return {"error": "Home Assistant not configured (HASS_URL, HASS_TOKEN required)"} 1481 url = f"{hass_url}/api/services/notify/notify" 1482 headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} 1483 async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: 1484 async with session.post(url, headers=headers, json={"message": message, "target": chat_id}) as resp: 1485 if resp.status not in (200, 201): 1486 body = await resp.text() 1487 return _error(f"Home Assistant API error ({resp.status}): {body}") 1488 return {"success": True, "platform": "homeassistant", "chat_id": chat_id} 1489 except Exception as e: 1490 return _error(f"Home Assistant send failed: {e}") 1491 1492 1493 async def _send_dingtalk(extra, chat_id, message): 1494 """Send via DingTalk robot webhook. 1495 1496 Note: The gateway's DingTalk adapter uses per-session webhook URLs from 1497 incoming messages (dingtalk-stream SDK). For cross-platform send_message 1498 delivery we use a static robot webhook URL instead, which must be 1499 configured via ``DINGTALK_WEBHOOK_URL`` env var or ``webhook_url`` in the 1500 platform's extra config. 1501 """ 1502 try: 1503 import httpx 1504 except ImportError: 1505 return {"error": "httpx not installed"} 1506 try: 1507 webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "") 1508 if not webhook_url: 1509 return {"error": "DingTalk not configured. Set DINGTALK_WEBHOOK_URL env var or webhook_url in dingtalk platform extra config."} 1510 async with httpx.AsyncClient(timeout=30.0) as client: 1511 resp = await client.post( 1512 webhook_url, 1513 json={"msgtype": "text", "text": {"content": message}}, 1514 ) 1515 resp.raise_for_status() 1516 data = resp.json() 1517 if data.get("errcode", 0) != 0: 1518 return _error(f"DingTalk API error: {data.get('errmsg', 'unknown')}") 1519 return {"success": True, "platform": "dingtalk", "chat_id": chat_id} 1520 except Exception as e: 1521 return _error(f"DingTalk send failed: {e}") 1522 1523 1524 async def _send_wecom(extra, chat_id, message): 1525 """Send via WeCom using the adapter's WebSocket send pipeline.""" 1526 try: 1527 from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements 1528 if not check_wecom_requirements(): 1529 return {"error": "WeCom requirements not met. Need aiohttp + WECOM_BOT_ID/SECRET."} 1530 except ImportError: 1531 return {"error": "WeCom adapter not available."} 1532 1533 try: 1534 from gateway.config import PlatformConfig 1535 pconfig = PlatformConfig(extra=extra) 1536 adapter = WeComAdapter(pconfig) 1537 connected = await adapter.connect() 1538 if not connected: 1539 return _error(f"WeCom: failed to connect - {adapter.fatal_error_message or 'unknown error'}") 1540 try: 1541 result = await adapter.send(chat_id, message) 1542 if not result.success: 1543 return _error(f"WeCom send failed: {result.error}") 1544 return {"success": True, "platform": "wecom", "chat_id": chat_id, "message_id": result.message_id} 1545 finally: 1546 await adapter.disconnect() 1547 except Exception as e: 1548 return _error(f"WeCom send failed: {e}") 1549 1550 1551 async def _send_weixin(pconfig, chat_id, message, media_files=None): 1552 """Send via Weixin iLink using the native adapter helper.""" 1553 try: 1554 from gateway.platforms.weixin import check_weixin_requirements, send_weixin_direct 1555 if not check_weixin_requirements(): 1556 return {"error": "Weixin requirements not met. Need aiohttp + cryptography."} 1557 except ImportError: 1558 return {"error": "Weixin adapter not available."} 1559 1560 try: 1561 return await send_weixin_direct( 1562 extra=pconfig.extra, 1563 token=pconfig.token, 1564 chat_id=chat_id, 1565 message=message, 1566 media_files=media_files, 1567 ) 1568 except Exception as e: 1569 return _error(f"Weixin send failed: {e}") 1570 1571 1572 async def _send_bluebubbles(extra, chat_id, message): 1573 """Send via BlueBubbles iMessage server using the adapter's REST API.""" 1574 try: 1575 from gateway.platforms.bluebubbles import BlueBubblesAdapter, check_bluebubbles_requirements 1576 if not check_bluebubbles_requirements(): 1577 return {"error": "BlueBubbles requirements not met (need aiohttp + httpx)."} 1578 except ImportError: 1579 return {"error": "BlueBubbles adapter not available."} 1580 1581 try: 1582 from gateway.config import PlatformConfig 1583 pconfig = PlatformConfig(extra=extra) 1584 adapter = BlueBubblesAdapter(pconfig) 1585 connected = await adapter.connect() 1586 if not connected: 1587 return _error("BlueBubbles: failed to connect to server") 1588 try: 1589 result = await adapter.send(chat_id, message) 1590 if not result.success: 1591 return _error(f"BlueBubbles send failed: {result.error}") 1592 return {"success": True, "platform": "bluebubbles", "chat_id": chat_id, "message_id": result.message_id} 1593 finally: 1594 await adapter.disconnect() 1595 except Exception as e: 1596 return _error(f"BlueBubbles send failed: {e}") 1597 1598 1599 async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=None): 1600 """Send via Feishu/Lark using the adapter's send pipeline.""" 1601 try: 1602 from gateway.platforms.feishu import FeishuAdapter, FEISHU_AVAILABLE 1603 if not FEISHU_AVAILABLE: 1604 return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"} 1605 from gateway.platforms.feishu import FEISHU_DOMAIN, LARK_DOMAIN 1606 except ImportError: 1607 return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"} 1608 1609 media_files = media_files or [] 1610 1611 try: 1612 adapter = FeishuAdapter(pconfig) 1613 domain_name = getattr(adapter, "_domain_name", "feishu") 1614 domain = FEISHU_DOMAIN if domain_name != "lark" else LARK_DOMAIN 1615 adapter._client = adapter._build_lark_client(domain) 1616 metadata = {"thread_id": thread_id} if thread_id else None 1617 1618 last_result = None 1619 if message.strip(): 1620 last_result = await adapter.send(chat_id, message, metadata=metadata) 1621 if not last_result.success: 1622 return _error(f"Feishu send failed: {last_result.error}") 1623 1624 for media_path, is_voice in media_files: 1625 if not os.path.exists(media_path): 1626 return _error(f"Media file not found: {media_path}") 1627 1628 ext = os.path.splitext(media_path)[1].lower() 1629 if ext in _IMAGE_EXTS: 1630 last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata) 1631 elif ext in _VIDEO_EXTS: 1632 last_result = await adapter.send_video(chat_id, media_path, metadata=metadata) 1633 elif ext in _VOICE_EXTS and is_voice: 1634 last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) 1635 elif ext in _AUDIO_EXTS: 1636 last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata) 1637 else: 1638 last_result = await adapter.send_document(chat_id, media_path, metadata=metadata) 1639 1640 if not last_result.success: 1641 return _error(f"Feishu media send failed: {last_result.error}") 1642 1643 if last_result is None: 1644 return {"error": "No deliverable text or media remained after processing MEDIA tags"} 1645 1646 return { 1647 "success": True, 1648 "platform": "feishu", 1649 "chat_id": chat_id, 1650 "message_id": last_result.message_id, 1651 } 1652 except Exception as e: 1653 return _error(f"Feishu send failed: {e}") 1654 1655 1656 def _check_send_message(): 1657 """Gate send_message on gateway running (always available on messaging platforms).""" 1658 from gateway.session_context import get_session_env 1659 platform = get_session_env("HERMES_SESSION_PLATFORM", "") 1660 if platform and platform != "local": 1661 return True 1662 try: 1663 from gateway.status import is_gateway_running 1664 return is_gateway_running() 1665 except Exception: 1666 return False 1667 1668 1669 async def _send_qqbot(pconfig, chat_id, message): 1670 """Send via QQBot using the REST API directly (no WebSocket needed). 1671 1672 Uses the QQ Bot Open Platform REST endpoints to get an access token 1673 and post a message. Supports guild channels, C2C (private) chats, 1674 and group chats by trying the appropriate endpoints. 1675 """ 1676 try: 1677 import httpx 1678 except ImportError: 1679 return _error("QQBot direct send requires httpx. Run: pip install httpx") 1680 1681 extra = pconfig.extra or {} 1682 appid = extra.get("app_id") or os.getenv("QQ_APP_ID", "") 1683 secret = (pconfig.token or extra.get("client_secret") 1684 or os.getenv("QQ_CLIENT_SECRET", "")) 1685 if not appid or not secret: 1686 return _error("QQBot: QQ_APP_ID / QQ_CLIENT_SECRET not configured.") 1687 1688 try: 1689 async with httpx.AsyncClient(timeout=15) as client: 1690 # Step 1: Get access token 1691 token_resp = await client.post( 1692 "https://bots.qq.com/app/getAppAccessToken", 1693 json={"appId": str(appid), "clientSecret": str(secret)}, 1694 ) 1695 if token_resp.status_code != 200: 1696 return _error(f"QQBot token request failed: {token_resp.status_code}") 1697 token_data = token_resp.json() 1698 access_token = token_data.get("access_token") 1699 if not access_token: 1700 return _error(f"QQBot: no access_token in response") 1701 1702 # Step 2: Send message via REST 1703 # QQ Bot API has separate endpoints for channels, C2C, and groups. 1704 # We try them in order: channel first, then fallback to C2C. 1705 headers = { 1706 "Authorization": f"QQBot {access_token}", 1707 "Content-Type": "application/json", 1708 } 1709 payload = {"content": message[:4000], "msg_type": 0} 1710 1711 # Try channel endpoint first (works for guild channels) 1712 url = f"https://api.sgroup.qq.com/channels/{chat_id}/messages" 1713 resp = await client.post(url, json=payload, headers=headers) 1714 if resp.status_code in (200, 201): 1715 data = resp.json() 1716 return {"success": True, "platform": "qqbot", "chat_id": chat_id, 1717 "message_id": data.get("id")} 1718 1719 # If channel endpoint failed (likely "频道不存在"), try C2C endpoint 1720 url_c2c = f"https://api.sgroup.qq.com/v2/users/{chat_id}/messages" 1721 resp_c2c = await client.post(url_c2c, json=payload, headers=headers) 1722 if resp_c2c.status_code in (200, 201): 1723 data = resp_c2c.json() 1724 return {"success": True, "platform": "qqbot", "chat_id": chat_id, 1725 "message_id": data.get("id")} 1726 1727 # If C2C also failed, try group endpoint 1728 url_group = f"https://api.sgroup.qq.com/v2/groups/{chat_id}/messages" 1729 resp_group = await client.post(url_group, json=payload, headers=headers) 1730 if resp_group.status_code in (200, 201): 1731 data = resp_group.json() 1732 return {"success": True, "platform": "qqbot", "chat_id": chat_id, 1733 "message_id": data.get("id")} 1734 1735 # All endpoints failed — return the most informative error 1736 return _error(f"QQBot send failed: channel={resp.status_code} c2c={resp_c2c.status_code} group={resp_group.status_code}") 1737 except Exception as e: 1738 return _error(f"QQBot send failed: {e}") 1739 1740 1741 async def _send_yuanbao(chat_id, message, media_files=None): 1742 """Send via Yuanbao using the running gateway adapter's WebSocket connection. 1743 1744 Yuanbao uses a persistent WebSocket — unlike HTTP-based platforms, we 1745 cannot create a throwaway client. We obtain the running singleton from 1746 the adapter module itself (``get_active_adapter``). 1747 1748 chat_id format: 1749 - Group: "group:<group_code>" 1750 - DM: "direct:<account_id>" or just "<account_id>" 1751 """ 1752 try: 1753 from gateway.platforms.yuanbao import get_active_adapter, send_yuanbao_direct 1754 except ImportError: 1755 return _error("Yuanbao adapter module not available.") 1756 1757 adapter = get_active_adapter() 1758 if adapter is None: 1759 return _error( 1760 "Yuanbao adapter is not running. " 1761 "Start the gateway with yuanbao platform enabled first." 1762 ) 1763 1764 try: 1765 return await send_yuanbao_direct(adapter, chat_id, message, media_files=media_files) 1766 except Exception as e: 1767 return _error(f"Yuanbao send failed: {e}") 1768 1769 1770 # --- Registry --- 1771 from tools.registry import registry, tool_error 1772 1773 registry.register( 1774 name="send_message", 1775 toolset="messaging", 1776 schema=SEND_MESSAGE_SCHEMA, 1777 handler=send_message_tool, 1778 check_fn=_check_send_message, 1779 emoji="📨", 1780 )