/ tools / send_message_tool.py
send_message_tool.py
   1  """Send Message Tool -- cross-channel messaging via platform APIs.
   2  
   3  Sends a message to a user or channel on any connected messaging platform
   4  (Telegram, Discord, Slack). Supports listing available targets and resolving
   5  human-friendly channel names to IDs. Works in both CLI and gateway contexts.
   6  """
   7  
   8  import asyncio
   9  import json
  10  import logging
  11  import os
  12  import re
  13  import ssl
  14  import time
  15  from email.utils import formatdate
  16  from typing import Dict, Optional
  17  
  18  from agent.redact import redact_sensitive_text
  19  
  20  logger = logging.getLogger(__name__)
  21  
  22  _TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$")
  23  _FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::([-A-Za-z0-9_]+))?\s*$")
  24  # Slack conversation IDs: C (public channel), G (private/group channel), D (DM).
  25  # Must be uppercase alphanumeric, 9+ chars. User IDs (U...) and workspace IDs
  26  # (W...) are NOT valid chat.postMessage channel values — posting to them fails
  27  # because the API requires a conversation ID. To DM a user you must first call
  28  # conversations.open to obtain a D... ID. Without this gate, Slack IDs fall
  29  # through to channel-name resolution, which only matches by name and fails.
  30  _SLACK_TARGET_RE = re.compile(r"^\s*([CGD][A-Z0-9]{8,})\s*$")
  31  _WEIXIN_TARGET_RE = re.compile(r"^\s*((?:wxid|gh|v\d+|wm|wb)_[A-Za-z0-9_-]+|[A-Za-z0-9._-]+@chatroom|filehelper)\s*$")
  32  _YUANBAO_TARGET_RE = re.compile(r"^\s*((?:group|direct):[^:]+)\s*$")
  33  # Discord snowflake IDs are numeric, same regex pattern as Telegram topic targets.
  34  _NUMERIC_TOPIC_RE = _TELEGRAM_TOPIC_TARGET_RE
  35  # Platforms that address recipients by phone number and accept E.164 format
  36  # (with a leading '+'). Without this, "+15551234567" fails the isdigit() check
  37  # below and falls through to channel-name resolution, which has no way to
  38  # resolve a raw phone number. Keeping the '+' preserves the E.164 form that
  39  # downstream adapters (signal, etc.) expect.
  40  _PHONE_PLATFORMS = frozenset({"signal", "sms", "whatsapp"})
  41  _E164_TARGET_RE = re.compile(r"^\s*\+(\d{7,15})\s*$")
  42  _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
  43  _VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".3gp"}
  44  _AUDIO_EXTS = {".ogg", ".opus", ".mp3", ".wav", ".m4a", ".flac"}
  45  _VOICE_EXTS = {".ogg", ".opus"}
  46  # Telegram's Bot API sendAudio only accepts MP3 / M4A. Other audio
  47  # formats either route through sendVoice (Opus/OGG) or fall back to
  48  # document delivery.
  49  _TELEGRAM_SEND_AUDIO_EXTS = {".mp3", ".m4a"}
  50  _URL_SECRET_QUERY_RE = re.compile(
  51      r"([?&](?:access_token|api[_-]?key|auth[_-]?token|token|signature|sig)=)([^&#\s]+)",
  52      re.IGNORECASE,
  53  )
  54  _GENERIC_SECRET_ASSIGN_RE = re.compile(
  55      r"\b(access_token|api[_-]?key|auth[_-]?token|signature|sig)\s*=\s*([^\s,;]+)",
  56      re.IGNORECASE,
  57  )
  58  
  59  
  60  def _sanitize_error_text(text) -> str:
  61      """Redact secrets from error text before surfacing it to users/models."""
  62      redacted = redact_sensitive_text(text)
  63      redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}***", redacted)
  64      redacted = _GENERIC_SECRET_ASSIGN_RE.sub(lambda m: f"{m.group(1)}=***", redacted)
  65      return redacted
  66  
  67  
  68  def _error(message: str) -> dict:
  69      """Build a standardized error payload with redacted content."""
  70      return {"error": _sanitize_error_text(message)}
  71  
  72  
  73  def _telegram_retry_delay(exc: Exception, attempt: int) -> float | None:
  74      retry_after = getattr(exc, "retry_after", None)
  75      if retry_after is not None:
  76          try:
  77              return max(float(retry_after), 0.0)
  78          except (TypeError, ValueError):
  79              return 1.0
  80  
  81      text = str(exc).lower()
  82      if "timed out" in text or "timeout" in text:
  83          return None
  84      if (
  85          "bad gateway" in text
  86          or "502" in text
  87          or "too many requests" in text
  88          or "429" in text
  89          or "service unavailable" in text
  90          or "503" in text
  91          or "gateway timeout" in text
  92          or "504" in text
  93      ):
  94          return float(2 ** attempt)
  95      return None
  96  
  97  
  98  async def _send_telegram_message_with_retry(bot, *, attempts: int = 3, **kwargs):
  99      for attempt in range(attempts):
 100          try:
 101              return await bot.send_message(**kwargs)
 102          except Exception as exc:
 103              delay = _telegram_retry_delay(exc, attempt)
 104              if delay is None or attempt >= attempts - 1:
 105                  raise
 106              logger.warning(
 107                  "Transient Telegram send failure (attempt %d/%d), retrying in %.1fs: %s",
 108                  attempt + 1,
 109                  attempts,
 110                  delay,
 111                  _sanitize_error_text(exc),
 112              )
 113              await asyncio.sleep(delay)
 114  
 115  
 116  SEND_MESSAGE_SCHEMA = {
 117      "name": "send_message",
 118      "description": (
 119          "Send a message to a connected messaging platform, or list available targets.\n\n"
 120          "IMPORTANT: When the user asks to send to a specific channel or person "
 121          "(not just a bare platform name), call send_message(action='list') FIRST to see "
 122          "available targets, then send to the correct one.\n"
 123          "If the user just says a platform name like 'send to telegram', send directly "
 124          "to the home channel without listing first."
 125      ),
 126      "parameters": {
 127          "type": "object",
 128          "properties": {
 129              "action": {
 130                  "type": "string",
 131                  "enum": ["send", "list"],
 132                  "description": "Action to perform. 'send' (default) sends a message. 'list' returns all available channels/contacts across connected platforms."
 133              },
 134              "target": {
 135                  "type": "string",
 136                  "description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or 'platform:chat_id:thread_id' for Telegram topics and Discord threads. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:999888777:555444333', 'discord:#bot-home', 'slack:#engineering', 'signal:+155****4567', 'matrix:!roomid:server.org', 'matrix:@user:server.org', 'yuanbao:direct:<account_id>' (DM), 'yuanbao:group:<group_code>' (group chat)"
 137              },
 138              "message": {
 139                  "type": "string",
 140                  "description": "The message text to send. To send an image or file, include MEDIA:<local_path> (e.g. 'MEDIA:/tmp/hermes/cache/img_xxx.jpg') in the message — the platform will deliver it as a native media attachment."
 141              }
 142          },
 143          "required": []
 144      }
 145  }
 146  
 147  
 148  def send_message_tool(args, **kw):
 149      """Handle cross-channel send_message tool calls."""
 150      action = args.get("action", "send")
 151  
 152      if action == "list":
 153          return _handle_list()
 154  
 155      return _handle_send(args)
 156  
 157  
 158  def _handle_list():
 159      """Return formatted list of available messaging targets."""
 160      try:
 161          from gateway.channel_directory import format_directory_for_display
 162          return json.dumps({"targets": format_directory_for_display()})
 163      except Exception as e:
 164          return json.dumps(_error(f"Failed to load channel directory: {e}"))
 165  
 166  
 167  def _handle_send(args):
 168      """Send a message to a platform target."""
 169      target = args.get("target", "")
 170      message = args.get("message", "")
 171      if not target or not message:
 172          return tool_error("Both 'target' and 'message' are required when action='send'")
 173  
 174      parts = target.split(":", 1)
 175      platform_name = parts[0].strip().lower()
 176      target_ref = parts[1].strip() if len(parts) > 1 else None
 177      chat_id = None
 178      thread_id = None
 179  
 180      if target_ref:
 181          chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref)
 182      else:
 183          is_explicit = False
 184  
 185      # Resolve human-friendly channel names to numeric IDs
 186      if target_ref and not is_explicit:
 187          try:
 188              from gateway.channel_directory import resolve_channel_name
 189              resolved = resolve_channel_name(platform_name, target_ref)
 190              if resolved:
 191                  chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved)
 192              else:
 193                  return json.dumps({
 194                      "error": f"Could not resolve '{target_ref}' on {platform_name}. "
 195                      f"Use send_message(action='list') to see available targets."
 196                  })
 197          except Exception:
 198              return json.dumps({
 199                  "error": f"Could not resolve '{target_ref}' on {platform_name}. "
 200                  f"Try using a numeric channel ID instead."
 201              })
 202  
 203      from tools.interrupt import is_interrupted
 204      if is_interrupted():
 205          return tool_error("Interrupted")
 206  
 207      try:
 208          from gateway.config import load_gateway_config, Platform
 209          config = load_gateway_config()
 210      except Exception as e:
 211          return json.dumps(_error(f"Failed to load gateway config: {e}"))
 212  
 213      # Accept any platform name — built-in names resolve to their enum
 214      # member, plugin platform names create dynamic members via _missing_().
 215      try:
 216          platform = Platform(platform_name)
 217      except (ValueError, KeyError):
 218          return tool_error(f"Unknown platform: {platform_name}")
 219  
 220      pconfig = config.platforms.get(platform)
 221      if not pconfig or not pconfig.enabled:
 222          # Weixin can be configured purely via .env; synthesize a pconfig so
 223          # send_message and cron delivery work without a gateway.yaml entry.
 224          if platform_name == "weixin":
 225              wx_token = os.getenv("WEIXIN_TOKEN", "").strip()
 226              wx_account = os.getenv("WEIXIN_ACCOUNT_ID", "").strip()
 227              if wx_token and wx_account:
 228                  from gateway.config import PlatformConfig
 229                  pconfig = PlatformConfig(
 230                      enabled=True,
 231                      token=wx_token,
 232                      extra={
 233                          "account_id": wx_account,
 234                          "base_url": os.getenv("WEIXIN_BASE_URL", "").strip(),
 235                          "cdn_base_url": os.getenv("WEIXIN_CDN_BASE_URL", "").strip(),
 236                      },
 237                  )
 238              else:
 239                  return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
 240          else:
 241              return tool_error(f"Platform '{platform_name}' is not configured. Set up credentials in ~/.hermes/config.yaml or environment variables.")
 242  
 243      from gateway.platforms.base import BasePlatformAdapter
 244  
 245      media_files, cleaned_message = BasePlatformAdapter.extract_media(message)
 246      mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files)
 247  
 248      used_home_channel = False
 249      if not chat_id:
 250          home = config.get_home_channel(platform)
 251          if not home and platform_name == "weixin":
 252              wx_home = os.getenv("WEIXIN_HOME_CHANNEL", "").strip()
 253              if wx_home:
 254                  from gateway.config import HomeChannel
 255                  home = HomeChannel(platform=platform, chat_id=wx_home, name="Weixin Home")
 256          if home:
 257              chat_id = home.chat_id
 258              used_home_channel = True
 259          else:
 260              return json.dumps({
 261                  "error": f"No home channel set for {platform_name} to determine where to send the message. "
 262                  f"Either specify a channel directly with '{platform_name}:CHANNEL_NAME', "
 263                  f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>"
 264              })
 265  
 266      duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id)
 267      if duplicate_skip:
 268          return json.dumps(duplicate_skip)
 269  
 270      try:
 271          from model_tools import _run_async
 272          result = _run_async(
 273              _send_to_platform(
 274                  platform,
 275                  pconfig,
 276                  chat_id,
 277                  cleaned_message,
 278                  thread_id=thread_id,
 279                  media_files=media_files,
 280              )
 281          )
 282          if used_home_channel and isinstance(result, dict) and result.get("success"):
 283              result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})"
 284  
 285          # Mirror the sent message into the target's gateway session
 286          if isinstance(result, dict) and result.get("success") and mirror_text:
 287              try:
 288                  from gateway.mirror import mirror_to_session
 289                  from gateway.session_context import get_session_env
 290                  source_label = get_session_env("HERMES_SESSION_PLATFORM", "cli")
 291                  user_id = get_session_env("HERMES_SESSION_USER_ID", "") or None
 292                  if mirror_to_session(
 293                      platform_name,
 294                      chat_id,
 295                      mirror_text,
 296                      source_label=source_label,
 297                      thread_id=thread_id,
 298                      user_id=user_id,
 299                  ):
 300                      result["mirrored"] = True
 301              except Exception:
 302                  pass
 303  
 304          if isinstance(result, dict) and "error" in result:
 305              result["error"] = _sanitize_error_text(result["error"])
 306          return json.dumps(result)
 307      except Exception as e:
 308          return json.dumps(_error(f"Send failed: {e}"))
 309  
 310  
 311  def _parse_target_ref(platform_name: str, target_ref: str):
 312      """Parse a tool target into chat_id/thread_id and whether it is explicit."""
 313      if platform_name == "telegram":
 314          match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref)
 315          if match:
 316              return match.group(1), match.group(2), True
 317      if platform_name == "feishu":
 318          match = _FEISHU_TARGET_RE.fullmatch(target_ref)
 319          if match:
 320              return match.group(1), match.group(2), True
 321      if platform_name == "discord":
 322          match = _NUMERIC_TOPIC_RE.fullmatch(target_ref)
 323          if match:
 324              return match.group(1), match.group(2), True
 325      if platform_name == "slack":
 326          match = _SLACK_TARGET_RE.fullmatch(target_ref)
 327          if match:
 328              return match.group(1), None, True
 329      if platform_name == "weixin":
 330          match = _WEIXIN_TARGET_RE.fullmatch(target_ref)
 331          if match:
 332              return match.group(1), None, True
 333      if platform_name == "yuanbao":
 334          match = _YUANBAO_TARGET_RE.fullmatch(target_ref)
 335          if match:
 336              return match.group(1), None, True
 337          if target_ref.strip().isdigit():
 338              return f"group:{target_ref.strip()}", None, True
 339          return None, None, False
 340      if platform_name in _PHONE_PLATFORMS:
 341          match = _E164_TARGET_RE.fullmatch(target_ref)
 342          if match:
 343              # Preserve the leading '+' — signal-cli and sms/whatsapp adapters
 344              # expect E.164 format for direct recipients.
 345              return target_ref.strip(), None, True
 346      if target_ref.lstrip("-").isdigit():
 347          return target_ref, None, True
 348      # Matrix room IDs (start with !) and user IDs (start with @) are explicit
 349      if platform_name == "matrix" and (target_ref.startswith("!") or target_ref.startswith("@")):
 350          return target_ref, None, True
 351      return None, None, False
 352  
 353  
 354  def _describe_media_for_mirror(media_files):
 355      """Return a human-readable mirror summary when a message only contains media."""
 356      if not media_files:
 357          return ""
 358      if len(media_files) == 1:
 359          media_path, is_voice = media_files[0]
 360          ext = os.path.splitext(media_path)[1].lower()
 361          if is_voice and ext in _VOICE_EXTS:
 362              return "[Sent voice message]"
 363          if ext in _IMAGE_EXTS:
 364              return "[Sent image attachment]"
 365          if ext in _VIDEO_EXTS:
 366              return "[Sent video attachment]"
 367          if ext in _AUDIO_EXTS:
 368              return "[Sent audio attachment]"
 369          return "[Sent document attachment]"
 370      return f"[Sent {len(media_files)} media attachments]"
 371  
 372  
 373  def _get_cron_auto_delivery_target():
 374      """Return the cron scheduler's auto-delivery target for the current run, if any."""
 375      from gateway.session_context import get_session_env
 376      platform = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower()
 377      chat_id = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip()
 378      if not platform or not chat_id:
 379          return None
 380      thread_id = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None
 381      return {
 382          "platform": platform,
 383          "chat_id": chat_id,
 384          "thread_id": thread_id,
 385      }
 386  
 387  
 388  def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None):
 389      """Skip redundant cron send_message calls when the scheduler will auto-deliver there."""
 390      auto_target = _get_cron_auto_delivery_target()
 391      if not auto_target:
 392          return None
 393  
 394      same_target = (
 395          auto_target["platform"] == platform_name
 396          and str(auto_target["chat_id"]) == str(chat_id)
 397          and auto_target.get("thread_id") == thread_id
 398      )
 399      if not same_target:
 400          return None
 401  
 402      target_label = f"{platform_name}:{chat_id}"
 403      if thread_id is not None:
 404          target_label += f":{thread_id}"
 405  
 406      return {
 407          "success": True,
 408          "skipped": True,
 409          "reason": "cron_auto_delivery_duplicate_target",
 410          "target": target_label,
 411          "note": (
 412              f"Skipped send_message to {target_label}. This cron job will already auto-deliver "
 413              "its final response to that same target. Put the intended user-facing content in "
 414              "your final response instead, or use a different target if you want an additional message."
 415          ),
 416      }
 417  
 418  
 419  async def _send_via_adapter(platform, pconfig, chat_id, chunk):
 420      """Send a message via a live gateway adapter (for plugin platforms).
 421  
 422      Falls back to error if no adapter is connected for this platform.
 423      """
 424      try:
 425          from gateway.run import _gateway_runner_ref
 426          runner = _gateway_runner_ref()
 427          if runner:
 428              adapter = runner.adapters.get(platform)
 429              if adapter:
 430                  from gateway.platforms.base import SendResult
 431                  result = await adapter.send(chat_id=chat_id, content=chunk)
 432                  if result.success:
 433                      return {"success": True, "message_id": result.message_id}
 434                  return {"error": f"Adapter send failed: {result.error}"}
 435      except Exception as e:
 436          return {"error": f"Plugin platform send failed: {e}"}
 437      return {"error": f"No live adapter for platform '{platform.value}'. Is the gateway running with this platform connected?"}
 438  
 439  
 440  async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None):
 441      """Route a message to the appropriate platform sender.
 442  
 443      Long messages are automatically chunked to fit within platform limits
 444      using the same smart-splitting algorithm as the gateway adapters
 445      (preserves code-block boundaries, adds part indicators).
 446      """
 447      from gateway.config import Platform
 448      from gateway.platforms.base import BasePlatformAdapter, utf16_len
 449      from gateway.platforms.discord import DiscordAdapter
 450      from gateway.platforms.slack import SlackAdapter
 451  
 452      # Telegram adapter import is optional (requires python-telegram-bot)
 453      try:
 454          from gateway.platforms.telegram import TelegramAdapter
 455          _telegram_available = True
 456      except ImportError:
 457          _telegram_available = False
 458  
 459      # Feishu adapter import is optional (requires lark-oapi)
 460      try:
 461          from gateway.platforms.feishu import FeishuAdapter
 462          _feishu_available = True
 463      except ImportError:
 464          _feishu_available = False
 465  
 466      media_files = media_files or []
 467  
 468      if platform == Platform.SLACK and message:
 469          try:
 470              slack_adapter = SlackAdapter.__new__(SlackAdapter)
 471              message = slack_adapter.format_message(message)
 472          except Exception:
 473              logger.debug("Failed to apply Slack mrkdwn formatting in _send_to_platform", exc_info=True)
 474  
 475      # Platform message length limits (from adapter class attributes)
 476      _MAX_LENGTHS = {
 477          Platform.TELEGRAM: TelegramAdapter.MAX_MESSAGE_LENGTH if _telegram_available else 4096,
 478          Platform.DISCORD: DiscordAdapter.MAX_MESSAGE_LENGTH,
 479          Platform.SLACK: SlackAdapter.MAX_MESSAGE_LENGTH,
 480      }
 481      if _feishu_available:
 482          _MAX_LENGTHS[Platform.FEISHU] = FeishuAdapter.MAX_MESSAGE_LENGTH
 483  
 484      # Check plugin registry for max_message_length
 485      if platform not in _MAX_LENGTHS:
 486          try:
 487              from gateway.platform_registry import platform_registry
 488              entry = platform_registry.get(platform.value)
 489              if entry and entry.max_message_length > 0:
 490                  _MAX_LENGTHS[platform] = entry.max_message_length
 491          except Exception:
 492              pass
 493  
 494      # Smart-chunk the message to fit within platform limits.
 495      # For short messages or platforms without a known limit this is a no-op.
 496      # Telegram measures length in UTF-16 code units, not Unicode codepoints.
 497      max_len = _MAX_LENGTHS.get(platform)
 498      if max_len:
 499          _len_fn = utf16_len if platform == Platform.TELEGRAM else None
 500          chunks = BasePlatformAdapter.truncate_message(message, max_len, len_fn=_len_fn)
 501      else:
 502          chunks = [message]
 503  
 504      # --- Telegram: special handling for media attachments ---
 505      if platform == Platform.TELEGRAM:
 506          last_result = None
 507          disable_link_previews = bool(getattr(pconfig, "extra", {}) and pconfig.extra.get("disable_link_previews"))
 508          for i, chunk in enumerate(chunks):
 509              is_last = (i == len(chunks) - 1)
 510              result = await _send_telegram(
 511                  pconfig.token,
 512                  chat_id,
 513                  chunk,
 514                  media_files=media_files if is_last else [],
 515                  thread_id=thread_id,
 516                  disable_link_previews=disable_link_previews,
 517              )
 518              if isinstance(result, dict) and result.get("error"):
 519                  return result
 520              last_result = result
 521          return last_result
 522  
 523      # --- Weixin: use the native one-shot adapter helper for text + media ---
 524      if platform == Platform.WEIXIN:
 525          return await _send_weixin(pconfig, chat_id, message, media_files=media_files)
 526  
 527      # --- Discord: special handling for media attachments ---
 528      if platform == Platform.DISCORD:
 529          last_result = None
 530          for i, chunk in enumerate(chunks):
 531              is_last = (i == len(chunks) - 1)
 532              result = await _send_discord(
 533                  pconfig.token,
 534                  chat_id,
 535                  chunk,
 536                  media_files=media_files if is_last else [],
 537                  thread_id=thread_id,
 538              )
 539              if isinstance(result, dict) and result.get("error"):
 540                  return result
 541              last_result = result
 542          return last_result
 543  
 544      # --- Matrix: use the native adapter helper when media is present ---
 545      if platform == Platform.MATRIX and media_files:
 546          last_result = None
 547          for i, chunk in enumerate(chunks):
 548              is_last = (i == len(chunks) - 1)
 549              result = await _send_matrix_via_adapter(
 550                  pconfig,
 551                  chat_id,
 552                  chunk,
 553                  media_files=media_files if is_last else [],
 554                  thread_id=thread_id,
 555              )
 556              if isinstance(result, dict) and result.get("error"):
 557                  return result
 558              last_result = result
 559          return last_result
 560  
 561      # --- Signal: native attachment support via JSON-RPC attachments param ---
 562      if platform == Platform.SIGNAL and media_files:
 563          last_result = None
 564          for i, chunk in enumerate(chunks):
 565              is_last = (i == len(chunks) - 1)
 566              result = await _send_signal(
 567                  pconfig.extra,
 568                  chat_id,
 569                  chunk,
 570                  media_files=media_files if is_last else [],
 571              )
 572              if isinstance(result, dict) and result.get("error"):
 573                  return result
 574              last_result = result
 575          return last_result
 576  
 577      # --- Yuanbao: native media attachment support via running gateway adapter ---
 578      if platform == Platform.YUANBAO and media_files:
 579          last_result = None
 580          for i, chunk in enumerate(chunks):
 581              is_last = (i == len(chunks) - 1)
 582              result = await _send_yuanbao(
 583                  chat_id,
 584                  chunk,
 585                  media_files=media_files if is_last else None,
 586              )
 587              if isinstance(result, dict) and result.get("error"):
 588                  return result
 589              last_result = result
 590          return last_result
 591  
 592      # --- Feishu: native media attachment support via adapter ---
 593      if platform == Platform.FEISHU and media_files:
 594          last_result = None
 595          for i, chunk in enumerate(chunks):
 596              is_last = (i == len(chunks) - 1)
 597              result = await _send_feishu(
 598                  pconfig,
 599                  chat_id,
 600                  chunk,
 601                  media_files=media_files if is_last else None,
 602                  thread_id=thread_id,
 603              )
 604              if isinstance(result, dict) and result.get("error"):
 605                  return result
 606              last_result = result
 607          return last_result
 608  
 609      # --- Non-media platforms ---
 610      if media_files and not message.strip():
 611          return {
 612              "error": (
 613                  f"send_message MEDIA delivery is currently only supported for telegram, discord, matrix, weixin, signal, yuanbao and feishu; "
 614                  f"target {platform.value} had only media attachments"
 615              )
 616          }
 617      warning = None
 618      if media_files:
 619          warning = (
 620              f"MEDIA attachments were omitted for {platform.value}; "
 621              "native send_message media delivery is currently only supported for telegram, discord, matrix, weixin, signal, yuanbao and feishu"
 622          )
 623  
 624      last_result = None
 625      for chunk in chunks:
 626          if platform == Platform.SLACK:
 627              result = await _send_slack(pconfig.token, chat_id, chunk)
 628          elif platform == Platform.WHATSAPP:
 629              result = await _send_whatsapp(pconfig.extra, chat_id, chunk)
 630          elif platform == Platform.SIGNAL:
 631              result = await _send_signal(pconfig.extra, chat_id, chunk)
 632          elif platform == Platform.EMAIL:
 633              result = await _send_email(pconfig.extra, chat_id, chunk)
 634          elif platform == Platform.SMS:
 635              result = await _send_sms(pconfig.api_key, chat_id, chunk)
 636          elif platform == Platform.MATTERMOST:
 637              result = await _send_mattermost(pconfig.token, pconfig.extra, chat_id, chunk)
 638          elif platform == Platform.MATRIX:
 639              result = await _send_matrix(pconfig.token, pconfig.extra, chat_id, chunk)
 640          elif platform == Platform.HOMEASSISTANT:
 641              result = await _send_homeassistant(pconfig.token, pconfig.extra, chat_id, chunk)
 642          elif platform == Platform.DINGTALK:
 643              result = await _send_dingtalk(pconfig.extra, chat_id, chunk)
 644          elif platform == Platform.FEISHU:
 645              result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id)
 646          elif platform == Platform.WECOM:
 647              result = await _send_wecom(pconfig.extra, chat_id, chunk)
 648          elif platform == Platform.BLUEBUBBLES:
 649              result = await _send_bluebubbles(pconfig.extra, chat_id, chunk)
 650          elif platform == Platform.QQBOT:
 651              result = await _send_qqbot(pconfig, chat_id, chunk)
 652          elif platform == Platform.YUANBAO:
 653              result = await _send_yuanbao(chat_id, chunk)
 654          else:
 655              # Plugin platform — route through the gateway's live adapter
 656              # if available, otherwise report the error.
 657              result = await _send_via_adapter(platform, pconfig, chat_id, chunk)
 658  
 659          if isinstance(result, dict) and result.get("error"):
 660              return result
 661          last_result = result
 662  
 663      if warning and isinstance(last_result, dict) and last_result.get("success"):
 664          warnings = list(last_result.get("warnings", []))
 665          warnings.append(warning)
 666          last_result["warnings"] = warnings
 667      return last_result
 668  
 669  
 670  async def _send_telegram(token, chat_id, message, media_files=None, thread_id=None, disable_link_previews=False):
 671      """Send via Telegram Bot API (one-shot, no polling needed).
 672  
 673      Applies markdown→MarkdownV2 formatting (same as the gateway adapter)
 674      so that bold, links, and headers render correctly.  If the message
 675      already contains HTML tags, it is sent with ``parse_mode='HTML'``
 676      instead, bypassing MarkdownV2 conversion.
 677      """
 678      try:
 679          from telegram import Bot
 680          from telegram.constants import ParseMode
 681  
 682          # Auto-detect HTML tags — if present, skip MarkdownV2 and send as HTML.
 683          # Inspired by github.com/ashaney — PR #1568.
 684          _has_html = bool(re.search(r'<[a-zA-Z/][^>]*>', message))
 685  
 686          if _has_html:
 687              formatted = message
 688              send_parse_mode = ParseMode.HTML
 689          else:
 690              # Reuse the gateway adapter's format_message for markdown→MarkdownV2
 691              try:
 692                  from gateway.platforms.telegram import TelegramAdapter
 693                  _adapter = TelegramAdapter.__new__(TelegramAdapter)
 694                  formatted = _adapter.format_message(message)
 695              except Exception:
 696                  # Fallback: send as-is if formatting unavailable
 697                  formatted = message
 698              send_parse_mode = ParseMode.MARKDOWN_V2
 699  
 700          bot = Bot(token=token)
 701          int_chat_id = int(chat_id)
 702          media_files = media_files or []
 703          thread_kwargs = {}
 704          if thread_id is not None:
 705              thread_kwargs["message_thread_id"] = int(thread_id)
 706          if disable_link_previews:
 707              thread_kwargs["disable_web_page_preview"] = True
 708  
 709          last_msg = None
 710          warnings = []
 711  
 712          if formatted.strip():
 713              try:
 714                  last_msg = await _send_telegram_message_with_retry(
 715                      bot,
 716                      chat_id=int_chat_id, text=formatted,
 717                      parse_mode=send_parse_mode, **thread_kwargs
 718                  )
 719              except Exception as md_error:
 720                  # Parse failed, fall back to plain text
 721                  if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower() or "html" in str(md_error).lower():
 722                      logger.warning(
 723                          "Parse mode %s failed in _send_telegram, falling back to plain text: %s",
 724                          send_parse_mode,
 725                          _sanitize_error_text(md_error),
 726                      )
 727                      if not _has_html:
 728                          try:
 729                              from gateway.platforms.telegram import _strip_mdv2
 730                              plain = _strip_mdv2(formatted)
 731                          except Exception:
 732                              plain = message
 733                      else:
 734                          plain = message
 735                      last_msg = await _send_telegram_message_with_retry(
 736                          bot,
 737                          chat_id=int_chat_id, text=plain,
 738                          parse_mode=None, **thread_kwargs
 739                      )
 740                  else:
 741                      raise
 742  
 743          for media_path, is_voice in media_files:
 744              if not os.path.exists(media_path):
 745                  warning = f"Media file not found, skipping: {media_path}"
 746                  logger.warning(warning)
 747                  warnings.append(warning)
 748                  continue
 749  
 750              ext = os.path.splitext(media_path)[1].lower()
 751              try:
 752                  with open(media_path, "rb") as f:
 753                      if ext in _IMAGE_EXTS:
 754                          last_msg = await bot.send_photo(
 755                              chat_id=int_chat_id, photo=f, **thread_kwargs
 756                          )
 757                      elif ext in _VIDEO_EXTS:
 758                          last_msg = await bot.send_video(
 759                              chat_id=int_chat_id, video=f, **thread_kwargs
 760                          )
 761                      elif ext in _VOICE_EXTS and is_voice:
 762                          last_msg = await bot.send_voice(
 763                              chat_id=int_chat_id, voice=f, **thread_kwargs
 764                          )
 765                      elif ext in _TELEGRAM_SEND_AUDIO_EXTS:
 766                          last_msg = await bot.send_audio(
 767                              chat_id=int_chat_id, audio=f, **thread_kwargs
 768                          )
 769                      else:
 770                          last_msg = await bot.send_document(
 771                              chat_id=int_chat_id, document=f, **thread_kwargs
 772                          )
 773              except Exception as e:
 774                  warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}")
 775                  logger.error(warning)
 776                  warnings.append(warning)
 777  
 778          if last_msg is None:
 779              error = "No deliverable text or media remained after processing MEDIA tags"
 780              if warnings:
 781                  return {"error": error, "warnings": warnings}
 782              return {"error": error}
 783  
 784          result = {
 785              "success": True,
 786              "platform": "telegram",
 787              "chat_id": chat_id,
 788              "message_id": str(last_msg.message_id),
 789          }
 790          if warnings:
 791              result["warnings"] = warnings
 792          return result
 793      except ImportError:
 794          return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"}
 795      except Exception as e:
 796          return _error(f"Telegram send failed: {e}")
 797  
 798  
 799  def _derive_forum_thread_name(message: str) -> str:
 800      """Derive a thread name from the first line of the message, capped at 100 chars."""
 801      first_line = message.strip().split("\n", 1)[0].strip()
 802      # Strip common markdown heading prefixes
 803      first_line = first_line.lstrip("#").strip()
 804      if not first_line:
 805          first_line = "New Post"
 806      return first_line[:100]
 807  
 808  
 809  # Process-local cache for Discord channel-type probes.  Avoids re-probing the
 810  # same channel on every send when the directory cache has no entry (e.g. fresh
 811  # install, or channel created after the last directory build).
 812  _DISCORD_CHANNEL_TYPE_PROBE_CACHE: Dict[str, bool] = {}
 813  
 814  
 815  def _remember_channel_is_forum(chat_id: str, is_forum: bool) -> None:
 816      _DISCORD_CHANNEL_TYPE_PROBE_CACHE[str(chat_id)] = bool(is_forum)
 817  
 818  
 819  def _probe_is_forum_cached(chat_id: str) -> Optional[bool]:
 820      return _DISCORD_CHANNEL_TYPE_PROBE_CACHE.get(str(chat_id))
 821  
 822  
 823  async def _send_discord(token, chat_id, message, thread_id=None, media_files=None):
 824      """Send a single message via Discord REST API (no websocket client needed).
 825  
 826      Chunking is handled by _send_to_platform() before this is called.
 827  
 828      When thread_id is provided, the message is sent directly to that thread
 829      via the /channels/{thread_id}/messages endpoint.
 830  
 831      Media files are uploaded one-by-one via multipart/form-data after the
 832      text message is sent (same pattern as Telegram).
 833  
 834      Forum channels (type 15) reject POST /messages — a thread post is created
 835      automatically via POST /channels/{id}/threads.  Media files are uploaded
 836      as multipart attachments on the starter message of the new thread.
 837  
 838      Channel type is resolved from the channel directory first, then a
 839      process-local probe cache, and only as a last resort with a live
 840      GET /channels/{id} probe (whose result is memoized).
 841      """
 842      try:
 843          import aiohttp
 844      except ImportError:
 845          return {"error": "aiohttp not installed. Run: pip install aiohttp"}
 846      try:
 847          from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
 848          _proxy = resolve_proxy_url(platform_env_var="DISCORD_PROXY")
 849          _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
 850          auth_headers = {"Authorization": f"Bot {token}"}
 851          json_headers = {**auth_headers, "Content-Type": "application/json"}
 852          media_files = media_files or []
 853          last_data = None
 854          warnings = []
 855  
 856          # Thread endpoint: Discord threads are channels; send directly to the thread ID.
 857          if thread_id:
 858              url = f"https://discord.com/api/v10/channels/{thread_id}/messages"
 859          else:
 860              # Check if the target channel is a forum channel (type 15).
 861              # Forum channels reject POST /messages — create a thread post instead.
 862              # Three-layer detection: directory cache → process-local probe
 863              # cache → GET /channels/{id} probe (with result memoized).
 864              _channel_type = None
 865              try:
 866                  from gateway.channel_directory import lookup_channel_type
 867                  _channel_type = lookup_channel_type("discord", chat_id)
 868              except Exception:
 869                  pass
 870  
 871              if _channel_type == "forum":
 872                  is_forum = True
 873              elif _channel_type is not None:
 874                  is_forum = False
 875              else:
 876                  cached = _probe_is_forum_cached(chat_id)
 877                  if cached is not None:
 878                      is_forum = cached
 879                  else:
 880                      is_forum = False
 881                      try:
 882                          info_url = f"https://discord.com/api/v10/channels/{chat_id}"
 883                          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15), **_sess_kw) as info_sess:
 884                              async with info_sess.get(info_url, headers=json_headers, **_req_kw) as info_resp:
 885                                  if info_resp.status == 200:
 886                                      info = await info_resp.json()
 887                                      is_forum = info.get("type") == 15
 888                                      _remember_channel_is_forum(chat_id, is_forum)
 889                      except Exception:
 890                          logger.debug("Failed to probe channel type for %s", chat_id, exc_info=True)
 891  
 892              if is_forum:
 893                  thread_name = _derive_forum_thread_name(message)
 894                  thread_url = f"https://discord.com/api/v10/channels/{chat_id}/threads"
 895  
 896                  # Filter to readable media files up front so we can pick the
 897                  # right code path (JSON vs multipart) before opening a session.
 898                  valid_media = []
 899                  for media_path, _is_voice in media_files:
 900                      if not os.path.exists(media_path):
 901                          warning = f"Media file not found, skipping: {media_path}"
 902                          logger.warning(warning)
 903                          warnings.append(warning)
 904                          continue
 905                      valid_media.append(media_path)
 906  
 907                  async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60), **_sess_kw) as session:
 908                      if valid_media:
 909                          # Multipart: payload_json + files[N] creates a forum
 910                          # thread with the starter message plus attachments in
 911                          # a single API call.
 912                          attachments_meta = [
 913                              {"id": str(idx), "filename": os.path.basename(path)}
 914                              for idx, path in enumerate(valid_media)
 915                          ]
 916                          starter_message = {"content": message, "attachments": attachments_meta}
 917                          payload_json = json.dumps({"name": thread_name, "message": starter_message})
 918  
 919                          form = aiohttp.FormData()
 920                          form.add_field("payload_json", payload_json, content_type="application/json")
 921  
 922                          # Buffer file bytes up front — aiohttp's FormData can
 923                          # read lazily and we don't want handles closing under
 924                          # it on retry.
 925                          try:
 926                              for idx, media_path in enumerate(valid_media):
 927                                  with open(media_path, "rb") as fh:
 928                                      form.add_field(
 929                                          f"files[{idx}]",
 930                                          fh.read(),
 931                                          filename=os.path.basename(media_path),
 932                                      )
 933                              async with session.post(thread_url, headers=auth_headers, data=form, **_req_kw) as resp:
 934                                  if resp.status not in (200, 201):
 935                                      body = await resp.text()
 936                                      return _error(f"Discord forum thread creation error ({resp.status}): {body}")
 937                                  data = await resp.json()
 938                          except Exception as e:
 939                              return _error(_sanitize_error_text(f"Discord forum thread upload failed: {e}"))
 940                      else:
 941                          # No media — simple JSON POST creates the thread with
 942                          # just the text starter.
 943                          async with session.post(
 944                              thread_url,
 945                              headers=json_headers,
 946                              json={
 947                                  "name": thread_name,
 948                                  "message": {"content": message},
 949                              },
 950                              **_req_kw,
 951                          ) as resp:
 952                              if resp.status not in (200, 201):
 953                                  body = await resp.text()
 954                                  return _error(f"Discord forum thread creation error ({resp.status}): {body}")
 955                              data = await resp.json()
 956  
 957                  thread_id_created = data.get("id")
 958                  starter_msg_id = (data.get("message") or {}).get("id", thread_id_created)
 959                  result = {
 960                      "success": True,
 961                      "platform": "discord",
 962                      "chat_id": chat_id,
 963                      "thread_id": thread_id_created,
 964                      "message_id": starter_msg_id,
 965                  }
 966                  if warnings:
 967                      result["warnings"] = warnings
 968                  return result
 969  
 970              url = f"https://discord.com/api/v10/channels/{chat_id}/messages"
 971  
 972          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
 973              # Send text message (skip if empty and media is present)
 974              if message.strip() or not media_files:
 975                  async with session.post(url, headers=json_headers, json={"content": message}, **_req_kw) as resp:
 976                      if resp.status not in (200, 201):
 977                          body = await resp.text()
 978                          return _error(f"Discord API error ({resp.status}): {body}")
 979                      last_data = await resp.json()
 980  
 981              # Send each media file as a separate multipart upload
 982              for media_path, _is_voice in media_files:
 983                  if not os.path.exists(media_path):
 984                      warning = f"Media file not found, skipping: {media_path}"
 985                      logger.warning(warning)
 986                      warnings.append(warning)
 987                      continue
 988                  try:
 989                      form = aiohttp.FormData()
 990                      filename = os.path.basename(media_path)
 991                      with open(media_path, "rb") as f:
 992                          form.add_field("files[0]", f, filename=filename)
 993                          async with session.post(url, headers=auth_headers, data=form, **_req_kw) as resp:
 994                              if resp.status not in (200, 201):
 995                                  body = await resp.text()
 996                                  warning = _sanitize_error_text(f"Failed to send media {media_path}: Discord API error ({resp.status}): {body}")
 997                                  logger.error(warning)
 998                                  warnings.append(warning)
 999                                  continue
1000                              last_data = await resp.json()
1001                  except Exception as e:
1002                      warning = _sanitize_error_text(f"Failed to send media {media_path}: {e}")
1003                      logger.error(warning)
1004                      warnings.append(warning)
1005  
1006          if last_data is None:
1007              error = "No deliverable text or media remained after processing"
1008              if warnings:
1009                  return {"error": error, "warnings": warnings}
1010              return {"error": error}
1011  
1012          result = {"success": True, "platform": "discord", "chat_id": chat_id, "message_id": last_data.get("id")}
1013          if warnings:
1014              result["warnings"] = warnings
1015          return result
1016      except Exception as e:
1017          return _error(f"Discord send failed: {e}")
1018  
1019  
1020  async def _send_slack(token, chat_id, message):
1021      """Send via Slack Web API."""
1022      try:
1023          import aiohttp
1024      except ImportError:
1025          return {"error": "aiohttp not installed. Run: pip install aiohttp"}
1026      try:
1027          from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
1028          _proxy = resolve_proxy_url()
1029          _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
1030          url = "https://slack.com/api/chat.postMessage"
1031          headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
1032          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
1033              payload = {"channel": chat_id, "text": message, "mrkdwn": True}
1034              async with session.post(url, headers=headers, json=payload, **_req_kw) as resp:
1035                  data = await resp.json()
1036                  if data.get("ok"):
1037                      return {"success": True, "platform": "slack", "chat_id": chat_id, "message_id": data.get("ts")}
1038                  return _error(f"Slack API error: {data.get('error', 'unknown')}")
1039      except Exception as e:
1040          return _error(f"Slack send failed: {e}")
1041  
1042  
1043  async def _send_whatsapp(extra, chat_id, message):
1044      """Send via the local WhatsApp bridge HTTP API."""
1045      try:
1046          import aiohttp
1047      except ImportError:
1048          return {"error": "aiohttp not installed. Run: pip install aiohttp"}
1049      try:
1050          bridge_port = extra.get("bridge_port", 3000)
1051          async with aiohttp.ClientSession() as session:
1052              async with session.post(
1053                  f"http://localhost:{bridge_port}/send",
1054                  json={"chatId": chat_id, "message": message},
1055                  timeout=aiohttp.ClientTimeout(total=30),
1056              ) as resp:
1057                  if resp.status == 200:
1058                      data = await resp.json()
1059                      return {
1060                          "success": True,
1061                          "platform": "whatsapp",
1062                          "chat_id": chat_id,
1063                          "message_id": data.get("messageId"),
1064                      }
1065                  body = await resp.text()
1066                  return _error(f"WhatsApp bridge error ({resp.status}): {body}")
1067      except Exception as e:
1068          return _error(f"WhatsApp send failed: {e}")
1069  
1070  
1071  async def _send_signal(extra, chat_id, message, media_files=None):
1072      """Send via signal-cli JSON-RPC API.
1073  
1074      Supports both text-only and text-with-attachments (images/audio/documents).
1075      Multi-attachment sends are chunked into batches of
1076      SIGNAL_MAX_ATTACHMENTS_PER_MSG and metered by the process-wide
1077      SignalAttachmentScheduler — same bucket the gateway adapter uses, so
1078      sends from this tool and inbound-driven replies share rate-limit state.
1079      """
1080      try:
1081          import httpx
1082      except ImportError:
1083          return {"error": "httpx not installed"}
1084  
1085      from gateway.platforms.signal_rate_limit import (
1086          SIGNAL_BATCH_PACING_NOTICE_THRESHOLD,
1087          SIGNAL_MAX_ATTACHMENTS_PER_MSG,
1088          SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
1089          _extract_retry_after_seconds,
1090          _format_wait,
1091          _is_signal_rate_limit_error,
1092          _signal_send_timeout,
1093          get_scheduler,
1094      )
1095  
1096      try:
1097          http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
1098          account = extra.get("account", "")
1099          if not account:
1100              return {"error": "Signal account not configured"}
1101  
1102          valid_media = media_files or []
1103          attachment_paths = []
1104          for media_path, _is_voice in valid_media:
1105              if os.path.exists(media_path):
1106                  attachment_paths.append(media_path)
1107              else:
1108                  logger.warning("Signal media file not found, skipping: %s", media_path)
1109  
1110          # Chunk attachments. With no attachments we still emit one batch
1111          # (text only). With attachments, the text rides on batch #0 so the
1112          # caption isn't repeated across every chunk.
1113          if attachment_paths:
1114              att_batches = [
1115                  attachment_paths[i:i + SIGNAL_MAX_ATTACHMENTS_PER_MSG]
1116                  for i in range(0, len(attachment_paths), SIGNAL_MAX_ATTACHMENTS_PER_MSG)
1117              ]
1118          else:
1119              att_batches = [[]]
1120  
1121          async def _post(batch_attachments, batch_message):
1122              params = {"account": account, "message": batch_message}
1123              if chat_id.startswith("group:"):
1124                  params["groupId"] = chat_id[6:]
1125              else:
1126                  params["recipient"] = [chat_id]
1127              if batch_attachments:
1128                  params["attachments"] = batch_attachments
1129  
1130              payload = {
1131                  "jsonrpc": "2.0",
1132                  "method": "send",
1133                  "params": params,
1134                  "id": f"send_{int(time.time() * 1000)}",
1135              }
1136              timeout = _signal_send_timeout(len(batch_attachments) if batch_attachments else 0)
1137              async with httpx.AsyncClient(timeout=timeout) as client:
1138                  resp = await client.post(f"{http_url}/api/v1/rpc", json=payload)
1139                  resp.raise_for_status()
1140                  return resp.json()
1141  
1142          async def _send_inline_notice(text: str) -> None:
1143              """Best-effort one-shot RPC for a user-facing pacing notice."""
1144              notice_params = {"account": account, "message": text}
1145              if chat_id.startswith("group:"):
1146                  notice_params["groupId"] = chat_id[6:]
1147              else:
1148                  notice_params["recipient"] = [chat_id]
1149              try:
1150                  async with httpx.AsyncClient(timeout=30.0) as _client:
1151                      await _client.post(
1152                          f"{http_url}/api/v1/rpc",
1153                          json={
1154                              "jsonrpc": "2.0",
1155                              "method": "send",
1156                              "params": notice_params,
1157                              "id": f"notice_{int(time.time() * 1000)}",
1158                          },
1159                      )
1160              except Exception as _e:
1161                  logger.warning("Signal: inline notice failed: %s", _e)
1162  
1163          scheduler = get_scheduler()
1164          logger.info(
1165              "send_message Signal: scheduler state=%s, %d attachment(s) in %d batch(es)",
1166              scheduler.state(), len(attachment_paths), len(att_batches),
1167          )
1168          failed_batches: list[int] = []
1169          for idx, att_batch in enumerate(att_batches):
1170              n = len(att_batch)
1171              if n > 0:
1172                  estimated = scheduler.estimate_wait(n)
1173                  if estimated >= SIGNAL_BATCH_PACING_NOTICE_THRESHOLD:
1174                      await _send_inline_notice(
1175                          f"(More images coming — pausing ~{_format_wait(estimated)} "
1176                          f"for Signal rate limit, batch {idx + 1}/{len(att_batches)}.)"
1177                      )
1178  
1179              batch_message = message if idx == 0 else ""
1180  
1181              for attempt in range(1, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS + 1):
1182                  try:
1183                      await scheduler.acquire(n)
1184                      _rpc_t0 = time.monotonic()
1185                      data = await _post(att_batch, batch_message)
1186                      _rpc_duration = time.monotonic() - _rpc_t0
1187                      if "error" not in data:
1188                          await scheduler.report_rpc_duration(_rpc_duration, n)
1189                          break
1190  
1191                      err = data["error"]
1192  
1193                      if not _is_signal_rate_limit_error(err):
1194                          return _error(f"Signal RPC error on batch {idx + 1}/{len(att_batches)}: {err}")
1195  
1196                      server_retry_after = _extract_retry_after_seconds(err)
1197                      scheduler.feedback(server_retry_after, n)
1198  
1199                      if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
1200                          failed_batches.append(idx + 1)
1201                          logger.error(
1202                              "Signal: rate-limit retries exhausted on batch %d/%d "
1203                              "(%d attachments lost, server retry_after=%s)",
1204                              idx + 1, len(att_batches), n,
1205                              f"{server_retry_after:.0f}s" if server_retry_after else "unknown",
1206                          )
1207                          break
1208                      logger.warning(
1209                          "Signal: rate-limited on batch %d/%d "
1210                          "(attempt %d/%d, server retry_after=%s); "
1211                          "scheduler will pace the retry",
1212                          idx + 1, len(att_batches),
1213                          attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
1214                          f"{server_retry_after:.0f}s" if server_retry_after else "unknown",
1215                      )
1216                  except Exception as e:
1217                      if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
1218                          failed_batches.append(idx + 1)
1219                          logger.error(
1220                              "Signal: send error on batch %d/%d after %d attempts: %s",
1221                              idx + 1, len(att_batches), attempt, str(e)
1222                          )
1223                          break
1224                      logger.warning(
1225                          "Signal: transient error on batch %d/%d (attempt %d/%d): %s; will retry",
1226                          idx + 1, len(att_batches), attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, str(e)
1227                      )
1228  
1229          warnings = []
1230          if len(attachment_paths) < len(valid_media):
1231              warnings.append("Some media files were skipped (not found on disk)")
1232          if failed_batches:
1233              warnings.append(
1234                  f"Signal rate-limited {len(failed_batches)} batch(es) "
1235                  f"(#{', #'.join(str(b) for b in failed_batches)})"
1236              )
1237  
1238          if failed_batches and len(failed_batches) == len(att_batches):
1239              return _error(
1240                  f"Signal: every batch ({len(att_batches)}) hit rate limit; "
1241                  f"no attachments delivered"
1242              )
1243  
1244          result = {"success": True, "platform": "signal", "chat_id": chat_id}
1245          if warnings:
1246              result["warnings"] = warnings
1247          return result
1248      except Exception as e:
1249          return _error(f"Signal send failed: {e}")
1250  
1251  
1252  async def _send_email(extra, chat_id, message):
1253      """Send via SMTP (one-shot, no persistent connection needed)."""
1254      import smtplib
1255      from email.mime.text import MIMEText
1256      from email.utils import formatdate
1257  
1258      address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "")
1259      password = os.getenv("EMAIL_PASSWORD", "")
1260      smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "")
1261      try:
1262          smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587"))
1263      except (ValueError, TypeError):
1264          smtp_port = 587
1265  
1266      if not all([address, password, smtp_host]):
1267          return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"}
1268  
1269      try:
1270          msg = MIMEText(message, "plain", "utf-8")
1271          msg["From"] = address
1272          msg["To"] = chat_id
1273          msg["Subject"] = "Hermes Agent"
1274          msg["Date"] = formatdate(localtime=True)
1275  
1276          server = smtplib.SMTP(smtp_host, smtp_port)
1277          server.starttls(context=ssl.create_default_context())
1278          server.login(address, password)
1279          server.send_message(msg)
1280          server.quit()
1281          return {"success": True, "platform": "email", "chat_id": chat_id}
1282      except Exception as e:
1283          return _error(f"Email send failed: {e}")
1284  
1285  
1286  async def _send_sms(auth_token, chat_id, message):
1287      """Send a single SMS via Twilio REST API.
1288  
1289      Uses HTTP Basic auth (Account SID : Auth Token) and form-encoded POST.
1290      Chunking is handled by _send_to_platform() before this is called.
1291      """
1292      try:
1293          import aiohttp
1294      except ImportError:
1295          return {"error": "aiohttp not installed. Run: pip install aiohttp"}
1296  
1297      import base64
1298  
1299      account_sid = os.getenv("TWILIO_ACCOUNT_SID", "")
1300      from_number = os.getenv("TWILIO_PHONE_NUMBER", "")
1301      if not account_sid or not auth_token or not from_number:
1302          return {"error": "SMS not configured (TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_PHONE_NUMBER required)"}
1303  
1304      # Strip markdown — SMS renders it as literal characters
1305      message = re.sub(r"\*\*(.+?)\*\*", r"\1", message, flags=re.DOTALL)
1306      message = re.sub(r"\*(.+?)\*", r"\1", message, flags=re.DOTALL)
1307      message = re.sub(r"__(.+?)__", r"\1", message, flags=re.DOTALL)
1308      message = re.sub(r"_(.+?)_", r"\1", message, flags=re.DOTALL)
1309      message = re.sub(r"```[a-z]*\n?", "", message)
1310      message = re.sub(r"`(.+?)`", r"\1", message)
1311      message = re.sub(r"^#{1,6}\s+", "", message, flags=re.MULTILINE)
1312      message = re.sub(r"\[([^\]]+)\]\([^\)]+\)", r"\1", message)
1313      message = re.sub(r"\n{3,}", "\n\n", message)
1314      message = message.strip()
1315  
1316      try:
1317          from gateway.platforms.base import resolve_proxy_url, proxy_kwargs_for_aiohttp
1318          _proxy = resolve_proxy_url()
1319          _sess_kw, _req_kw = proxy_kwargs_for_aiohttp(_proxy)
1320          creds = f"{account_sid}:{auth_token}"
1321          encoded = base64.b64encode(creds.encode("ascii")).decode("ascii")
1322          url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
1323          headers = {"Authorization": f"Basic {encoded}"}
1324  
1325          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30), **_sess_kw) as session:
1326              form_data = aiohttp.FormData()
1327              form_data.add_field("From", from_number)
1328              form_data.add_field("To", chat_id)
1329              form_data.add_field("Body", message)
1330  
1331              async with session.post(url, data=form_data, headers=headers, **_req_kw) as resp:
1332                  body = await resp.json()
1333                  if resp.status >= 400:
1334                      error_msg = body.get("message", str(body))
1335                      return _error(f"Twilio API error ({resp.status}): {error_msg}")
1336                  msg_sid = body.get("sid", "")
1337                  return {"success": True, "platform": "sms", "chat_id": chat_id, "message_id": msg_sid}
1338      except Exception as e:
1339          return _error(f"SMS send failed: {e}")
1340  
1341  
1342  async def _send_mattermost(token, extra, chat_id, message):
1343      """Send via Mattermost REST API."""
1344      try:
1345          import aiohttp
1346      except ImportError:
1347          return {"error": "aiohttp not installed. Run: pip install aiohttp"}
1348      try:
1349          base_url = (extra.get("url") or os.getenv("MATTERMOST_URL", "")).rstrip("/")
1350          token = token or os.getenv("MATTERMOST_TOKEN", "")
1351          if not base_url or not token:
1352              return {"error": "Mattermost not configured (MATTERMOST_URL, MATTERMOST_TOKEN required)"}
1353          url = f"{base_url}/api/v4/posts"
1354          headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
1355          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
1356              async with session.post(url, headers=headers, json={"channel_id": chat_id, "message": message}) as resp:
1357                  if resp.status not in (200, 201):
1358                      body = await resp.text()
1359                      return _error(f"Mattermost API error ({resp.status}): {body}")
1360                  data = await resp.json()
1361          return {"success": True, "platform": "mattermost", "chat_id": chat_id, "message_id": data.get("id")}
1362      except Exception as e:
1363          return _error(f"Mattermost send failed: {e}")
1364  
1365  
1366  async def _send_matrix(token, extra, chat_id, message):
1367      """Send via Matrix Client-Server API.
1368  
1369      Converts markdown to HTML for rich rendering in Matrix clients.
1370      Falls back to plain text if the ``markdown`` library is not installed.
1371      """
1372      try:
1373          import aiohttp
1374      except ImportError:
1375          return {"error": "aiohttp not installed. Run: pip install aiohttp"}
1376      try:
1377          homeserver = (extra.get("homeserver") or os.getenv("MATRIX_HOMESERVER", "")).rstrip("/")
1378          token = token or os.getenv("MATRIX_ACCESS_TOKEN", "")
1379          if not homeserver or not token:
1380              return {"error": "Matrix not configured (MATRIX_HOMESERVER, MATRIX_ACCESS_TOKEN required)"}
1381          txn_id = f"hermes_{int(time.time() * 1000)}_{os.urandom(4).hex()}"
1382          from urllib.parse import quote
1383          encoded_room = quote(chat_id, safe="")
1384          url = f"{homeserver}/_matrix/client/v3/rooms/{encoded_room}/send/m.room.message/{txn_id}"
1385          headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
1386  
1387          # Build message payload with optional HTML formatted_body.
1388          payload = {"msgtype": "m.text", "body": message}
1389          try:
1390              import markdown as _md
1391              html = _md.markdown(message, extensions=["fenced_code", "tables"])
1392              # Convert h1-h6 to bold for Element X compatibility.
1393              html = re.sub(r"<h[1-6]>(.*?)</h[1-6]>", r"<strong>\1</strong>", html)
1394              payload["format"] = "org.matrix.custom.html"
1395              payload["formatted_body"] = html
1396          except ImportError:
1397              pass
1398  
1399          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
1400              async with session.put(url, headers=headers, json=payload) as resp:
1401                  if resp.status not in (200, 201):
1402                      body = await resp.text()
1403                      return _error(f"Matrix API error ({resp.status}): {body}")
1404                  data = await resp.json()
1405          return {"success": True, "platform": "matrix", "chat_id": chat_id, "message_id": data.get("event_id")}
1406      except Exception as e:
1407          return _error(f"Matrix send failed: {e}")
1408  
1409  
1410  async def _send_matrix_via_adapter(pconfig, chat_id, message, media_files=None, thread_id=None):
1411      """Send via the Matrix adapter so native Matrix media uploads are preserved."""
1412      try:
1413          from gateway.platforms.matrix import MatrixAdapter
1414      except ImportError:
1415          return {"error": "Matrix dependencies not installed. Run: pip install 'mautrix[encryption]'"}
1416  
1417      media_files = media_files or []
1418  
1419      try:
1420          adapter = MatrixAdapter(pconfig)
1421          connected = await adapter.connect()
1422          if not connected:
1423              return _error("Matrix connect failed")
1424  
1425          metadata = {"thread_id": thread_id} if thread_id else None
1426          last_result = None
1427  
1428          if message.strip():
1429              last_result = await adapter.send(chat_id, message, metadata=metadata)
1430              if not last_result.success:
1431                  return _error(f"Matrix send failed: {last_result.error}")
1432  
1433          for media_path, is_voice in media_files:
1434              if not os.path.exists(media_path):
1435                  return _error(f"Media file not found: {media_path}")
1436  
1437              ext = os.path.splitext(media_path)[1].lower()
1438              if ext in _IMAGE_EXTS:
1439                  last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
1440              elif ext in _VIDEO_EXTS:
1441                  last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
1442              elif ext in _VOICE_EXTS and is_voice:
1443                  last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
1444              elif ext in _AUDIO_EXTS:
1445                  last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
1446              else:
1447                  last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
1448  
1449              if not last_result.success:
1450                  return _error(f"Matrix media send failed: {last_result.error}")
1451  
1452          if last_result is None:
1453              return {"error": "No deliverable text or media remained after processing MEDIA tags"}
1454  
1455          return {
1456              "success": True,
1457              "platform": "matrix",
1458              "chat_id": chat_id,
1459              "message_id": last_result.message_id,
1460          }
1461      except Exception as e:
1462          return _error(f"Matrix send failed: {e}")
1463      finally:
1464          try:
1465              await adapter.disconnect()
1466          except Exception:
1467              pass
1468  
1469  
1470  async def _send_homeassistant(token, extra, chat_id, message):
1471      """Send via Home Assistant notify service."""
1472      try:
1473          import aiohttp
1474      except ImportError:
1475          return {"error": "aiohttp not installed. Run: pip install aiohttp"}
1476      try:
1477          hass_url = (extra.get("url") or os.getenv("HASS_URL", "")).rstrip("/")
1478          token = token or os.getenv("HASS_TOKEN", "")
1479          if not hass_url or not token:
1480              return {"error": "Home Assistant not configured (HASS_URL, HASS_TOKEN required)"}
1481          url = f"{hass_url}/api/services/notify/notify"
1482          headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
1483          async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
1484              async with session.post(url, headers=headers, json={"message": message, "target": chat_id}) as resp:
1485                  if resp.status not in (200, 201):
1486                      body = await resp.text()
1487                      return _error(f"Home Assistant API error ({resp.status}): {body}")
1488          return {"success": True, "platform": "homeassistant", "chat_id": chat_id}
1489      except Exception as e:
1490          return _error(f"Home Assistant send failed: {e}")
1491  
1492  
1493  async def _send_dingtalk(extra, chat_id, message):
1494      """Send via DingTalk robot webhook.
1495  
1496      Note: The gateway's DingTalk adapter uses per-session webhook URLs from
1497      incoming messages (dingtalk-stream SDK).  For cross-platform send_message
1498      delivery we use a static robot webhook URL instead, which must be
1499      configured via ``DINGTALK_WEBHOOK_URL`` env var or ``webhook_url`` in the
1500      platform's extra config.
1501      """
1502      try:
1503          import httpx
1504      except ImportError:
1505          return {"error": "httpx not installed"}
1506      try:
1507          webhook_url = extra.get("webhook_url") or os.getenv("DINGTALK_WEBHOOK_URL", "")
1508          if not webhook_url:
1509              return {"error": "DingTalk not configured. Set DINGTALK_WEBHOOK_URL env var or webhook_url in dingtalk platform extra config."}
1510          async with httpx.AsyncClient(timeout=30.0) as client:
1511              resp = await client.post(
1512                  webhook_url,
1513                  json={"msgtype": "text", "text": {"content": message}},
1514              )
1515              resp.raise_for_status()
1516              data = resp.json()
1517              if data.get("errcode", 0) != 0:
1518                  return _error(f"DingTalk API error: {data.get('errmsg', 'unknown')}")
1519          return {"success": True, "platform": "dingtalk", "chat_id": chat_id}
1520      except Exception as e:
1521          return _error(f"DingTalk send failed: {e}")
1522  
1523  
1524  async def _send_wecom(extra, chat_id, message):
1525      """Send via WeCom using the adapter's WebSocket send pipeline."""
1526      try:
1527          from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements
1528          if not check_wecom_requirements():
1529              return {"error": "WeCom requirements not met. Need aiohttp + WECOM_BOT_ID/SECRET."}
1530      except ImportError:
1531          return {"error": "WeCom adapter not available."}
1532  
1533      try:
1534          from gateway.config import PlatformConfig
1535          pconfig = PlatformConfig(extra=extra)
1536          adapter = WeComAdapter(pconfig)
1537          connected = await adapter.connect()
1538          if not connected:
1539              return _error(f"WeCom: failed to connect - {adapter.fatal_error_message or 'unknown error'}")
1540          try:
1541              result = await adapter.send(chat_id, message)
1542              if not result.success:
1543                  return _error(f"WeCom send failed: {result.error}")
1544              return {"success": True, "platform": "wecom", "chat_id": chat_id, "message_id": result.message_id}
1545          finally:
1546              await adapter.disconnect()
1547      except Exception as e:
1548          return _error(f"WeCom send failed: {e}")
1549  
1550  
1551  async def _send_weixin(pconfig, chat_id, message, media_files=None):
1552      """Send via Weixin iLink using the native adapter helper."""
1553      try:
1554          from gateway.platforms.weixin import check_weixin_requirements, send_weixin_direct
1555          if not check_weixin_requirements():
1556              return {"error": "Weixin requirements not met. Need aiohttp + cryptography."}
1557      except ImportError:
1558          return {"error": "Weixin adapter not available."}
1559  
1560      try:
1561          return await send_weixin_direct(
1562              extra=pconfig.extra,
1563              token=pconfig.token,
1564              chat_id=chat_id,
1565              message=message,
1566              media_files=media_files,
1567          )
1568      except Exception as e:
1569          return _error(f"Weixin send failed: {e}")
1570  
1571  
1572  async def _send_bluebubbles(extra, chat_id, message):
1573      """Send via BlueBubbles iMessage server using the adapter's REST API."""
1574      try:
1575          from gateway.platforms.bluebubbles import BlueBubblesAdapter, check_bluebubbles_requirements
1576          if not check_bluebubbles_requirements():
1577              return {"error": "BlueBubbles requirements not met (need aiohttp + httpx)."}
1578      except ImportError:
1579          return {"error": "BlueBubbles adapter not available."}
1580  
1581      try:
1582          from gateway.config import PlatformConfig
1583          pconfig = PlatformConfig(extra=extra)
1584          adapter = BlueBubblesAdapter(pconfig)
1585          connected = await adapter.connect()
1586          if not connected:
1587              return _error("BlueBubbles: failed to connect to server")
1588          try:
1589              result = await adapter.send(chat_id, message)
1590              if not result.success:
1591                  return _error(f"BlueBubbles send failed: {result.error}")
1592              return {"success": True, "platform": "bluebubbles", "chat_id": chat_id, "message_id": result.message_id}
1593          finally:
1594              await adapter.disconnect()
1595      except Exception as e:
1596          return _error(f"BlueBubbles send failed: {e}")
1597  
1598  
1599  async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=None):
1600      """Send via Feishu/Lark using the adapter's send pipeline."""
1601      try:
1602          from gateway.platforms.feishu import FeishuAdapter, FEISHU_AVAILABLE
1603          if not FEISHU_AVAILABLE:
1604              return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"}
1605          from gateway.platforms.feishu import FEISHU_DOMAIN, LARK_DOMAIN
1606      except ImportError:
1607          return {"error": "Feishu dependencies not installed. Run: pip install 'hermes-agent[feishu]'"}
1608  
1609      media_files = media_files or []
1610  
1611      try:
1612          adapter = FeishuAdapter(pconfig)
1613          domain_name = getattr(adapter, "_domain_name", "feishu")
1614          domain = FEISHU_DOMAIN if domain_name != "lark" else LARK_DOMAIN
1615          adapter._client = adapter._build_lark_client(domain)
1616          metadata = {"thread_id": thread_id} if thread_id else None
1617  
1618          last_result = None
1619          if message.strip():
1620              last_result = await adapter.send(chat_id, message, metadata=metadata)
1621              if not last_result.success:
1622                  return _error(f"Feishu send failed: {last_result.error}")
1623  
1624          for media_path, is_voice in media_files:
1625              if not os.path.exists(media_path):
1626                  return _error(f"Media file not found: {media_path}")
1627  
1628              ext = os.path.splitext(media_path)[1].lower()
1629              if ext in _IMAGE_EXTS:
1630                  last_result = await adapter.send_image_file(chat_id, media_path, metadata=metadata)
1631              elif ext in _VIDEO_EXTS:
1632                  last_result = await adapter.send_video(chat_id, media_path, metadata=metadata)
1633              elif ext in _VOICE_EXTS and is_voice:
1634                  last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
1635              elif ext in _AUDIO_EXTS:
1636                  last_result = await adapter.send_voice(chat_id, media_path, metadata=metadata)
1637              else:
1638                  last_result = await adapter.send_document(chat_id, media_path, metadata=metadata)
1639  
1640              if not last_result.success:
1641                  return _error(f"Feishu media send failed: {last_result.error}")
1642  
1643          if last_result is None:
1644              return {"error": "No deliverable text or media remained after processing MEDIA tags"}
1645  
1646          return {
1647              "success": True,
1648              "platform": "feishu",
1649              "chat_id": chat_id,
1650              "message_id": last_result.message_id,
1651          }
1652      except Exception as e:
1653          return _error(f"Feishu send failed: {e}")
1654  
1655  
1656  def _check_send_message():
1657      """Gate send_message on gateway running (always available on messaging platforms)."""
1658      from gateway.session_context import get_session_env
1659      platform = get_session_env("HERMES_SESSION_PLATFORM", "")
1660      if platform and platform != "local":
1661          return True
1662      try:
1663          from gateway.status import is_gateway_running
1664          return is_gateway_running()
1665      except Exception:
1666          return False
1667  
1668  
1669  async def _send_qqbot(pconfig, chat_id, message):
1670      """Send via QQBot using the REST API directly (no WebSocket needed).
1671  
1672      Uses the QQ Bot Open Platform REST endpoints to get an access token
1673      and post a message. Supports guild channels, C2C (private) chats,
1674      and group chats by trying the appropriate endpoints.
1675      """
1676      try:
1677          import httpx
1678      except ImportError:
1679          return _error("QQBot direct send requires httpx. Run: pip install httpx")
1680  
1681      extra = pconfig.extra or {}
1682      appid = extra.get("app_id") or os.getenv("QQ_APP_ID", "")
1683      secret = (pconfig.token or extra.get("client_secret")
1684                or os.getenv("QQ_CLIENT_SECRET", ""))
1685      if not appid or not secret:
1686          return _error("QQBot: QQ_APP_ID / QQ_CLIENT_SECRET not configured.")
1687  
1688      try:
1689          async with httpx.AsyncClient(timeout=15) as client:
1690              # Step 1: Get access token
1691              token_resp = await client.post(
1692                  "https://bots.qq.com/app/getAppAccessToken",
1693                  json={"appId": str(appid), "clientSecret": str(secret)},
1694              )
1695              if token_resp.status_code != 200:
1696                  return _error(f"QQBot token request failed: {token_resp.status_code}")
1697              token_data = token_resp.json()
1698              access_token = token_data.get("access_token")
1699              if not access_token:
1700                  return _error(f"QQBot: no access_token in response")
1701  
1702              # Step 2: Send message via REST
1703              # QQ Bot API has separate endpoints for channels, C2C, and groups.
1704              # We try them in order: channel first, then fallback to C2C.
1705              headers = {
1706                  "Authorization": f"QQBot {access_token}",
1707                  "Content-Type": "application/json",
1708              }
1709              payload = {"content": message[:4000], "msg_type": 0}
1710  
1711              # Try channel endpoint first (works for guild channels)
1712              url = f"https://api.sgroup.qq.com/channels/{chat_id}/messages"
1713              resp = await client.post(url, json=payload, headers=headers)
1714              if resp.status_code in (200, 201):
1715                  data = resp.json()
1716                  return {"success": True, "platform": "qqbot", "chat_id": chat_id,
1717                          "message_id": data.get("id")}
1718  
1719              # If channel endpoint failed (likely "频道不存在"), try C2C endpoint
1720              url_c2c = f"https://api.sgroup.qq.com/v2/users/{chat_id}/messages"
1721              resp_c2c = await client.post(url_c2c, json=payload, headers=headers)
1722              if resp_c2c.status_code in (200, 201):
1723                  data = resp_c2c.json()
1724                  return {"success": True, "platform": "qqbot", "chat_id": chat_id,
1725                          "message_id": data.get("id")}
1726  
1727              # If C2C also failed, try group endpoint
1728              url_group = f"https://api.sgroup.qq.com/v2/groups/{chat_id}/messages"
1729              resp_group = await client.post(url_group, json=payload, headers=headers)
1730              if resp_group.status_code in (200, 201):
1731                  data = resp_group.json()
1732                  return {"success": True, "platform": "qqbot", "chat_id": chat_id,
1733                          "message_id": data.get("id")}
1734  
1735              # All endpoints failed — return the most informative error
1736              return _error(f"QQBot send failed: channel={resp.status_code} c2c={resp_c2c.status_code} group={resp_group.status_code}")
1737      except Exception as e:
1738          return _error(f"QQBot send failed: {e}")
1739  
1740  
1741  async def _send_yuanbao(chat_id, message, media_files=None):
1742      """Send via Yuanbao using the running gateway adapter's WebSocket connection.
1743  
1744      Yuanbao uses a persistent WebSocket — unlike HTTP-based platforms, we
1745      cannot create a throwaway client.  We obtain the running singleton from
1746      the adapter module itself (``get_active_adapter``).
1747  
1748      chat_id format:
1749        - Group: "group:<group_code>"
1750        - DM:    "direct:<account_id>" or just "<account_id>"
1751      """
1752      try:
1753          from gateway.platforms.yuanbao import get_active_adapter, send_yuanbao_direct
1754      except ImportError:
1755          return _error("Yuanbao adapter module not available.")
1756  
1757      adapter = get_active_adapter()
1758      if adapter is None:
1759          return _error(
1760              "Yuanbao adapter is not running. "
1761              "Start the gateway with yuanbao platform enabled first."
1762          )
1763  
1764      try:
1765          return await send_yuanbao_direct(adapter, chat_id, message, media_files=media_files)
1766      except Exception as e:
1767          return _error(f"Yuanbao send failed: {e}")
1768  
1769  
1770  # --- Registry ---
1771  from tools.registry import registry, tool_error
1772  
1773  registry.register(
1774      name="send_message",
1775      toolset="messaging",
1776      schema=SEND_MESSAGE_SCHEMA,
1777      handler=send_message_tool,
1778      check_fn=_check_send_message,
1779      emoji="📨",
1780  )