/ tools / tts_tool.py
tts_tool.py
   1  #!/usr/bin/env python3
   2  """
   3  Text-to-Speech Tool Module
   4  
   5  Built-in TTS providers:
   6  - Edge TTS (default, free, no API key): Microsoft Edge neural voices
   7  - ElevenLabs (premium): High-quality voices, needs ELEVENLABS_API_KEY
   8  - OpenAI TTS: Good quality, needs OPENAI_API_KEY
   9  - MiniMax TTS: High-quality with voice cloning, needs MINIMAX_API_KEY
  10  - Mistral (Voxtral TTS): Multilingual, native Opus, needs MISTRAL_API_KEY
  11  - Google Gemini TTS: Controllable, 30 prebuilt voices, needs GEMINI_API_KEY
  12  - xAI TTS: Grok voices, needs XAI_API_KEY
  13  - NeuTTS (local, free, no API key): On-device TTS via neutts
  14  - KittenTTS (local, free, no API key): On-device 25MB model
  15  - Piper (local, free, no API key): OHF-Voice/piper1-gpl neural VITS, 44 languages
  16  
  17  Custom command providers:
  18  - Users can declare any number of named providers with ``type: command``
  19    under ``tts.providers.<name>`` in ``~/.hermes/config.yaml``. Hermes
  20    writes the input text to a temp file and runs the configured shell
  21    command, which must produce the audio file at the expected path.
  22    See the Local Command section of ``website/docs/user-guide/features/tts.md``.
  23  
  24  Output formats:
  25  - Opus (.ogg) for Telegram voice bubbles (requires ffmpeg for Edge TTS)
  26  - MP3 (.mp3) for everything else (CLI, Discord, WhatsApp)
  27  
  28  Configuration is loaded from ~/.hermes/config.yaml under the 'tts:' key.
  29  The user chooses the provider and voice; the model just sends text.
  30  
  31  Usage:
  32      from tools.tts_tool import text_to_speech_tool, check_tts_requirements
  33  
  34      result = text_to_speech_tool(text="Hello world")
  35  """
  36  
  37  import asyncio
  38  import base64
  39  import datetime
  40  import json
  41  import logging
  42  import os
  43  import queue
  44  import re
  45  import shlex
  46  import shutil
  47  import signal
  48  import subprocess
  49  import tempfile
  50  import threading
  51  import uuid
  52  from pathlib import Path
  53  from typing import Callable, Dict, Any, Optional
  54  from urllib.parse import urljoin
  55  
  56  from hermes_constants import display_hermes_home
  57  
  58  logger = logging.getLogger(__name__)
  59  def get_env_value(name, default=None):
  60      """Read env values through the live config module.
  61  
  62      Tests may monkeypatch and later restore ``hermes_cli.config.get_env_value``
  63      before this module is imported. Resolve the helper at call time so TTS does
  64      not keep a stale imported function for the rest of the test process.
  65      """
  66      try:
  67          from hermes_cli.config import get_env_value as _get_env_value
  68      except ImportError:
  69          return os.getenv(name, default)
  70      value = _get_env_value(name)
  71      return default if value is None else value
  72  from tools.managed_tool_gateway import resolve_managed_tool_gateway
  73  from tools.tool_backend_helpers import managed_nous_tools_enabled, prefers_gateway, resolve_openai_audio_api_key
  74  from tools.xai_http import hermes_xai_user_agent
  75  
  76  # ---------------------------------------------------------------------------
  77  # Lazy imports -- providers are imported only when actually used to avoid
  78  # crashing in headless environments (SSH, Docker, WSL, no PortAudio).
  79  # ---------------------------------------------------------------------------
  80  
  81  def _import_edge_tts():
  82      """Lazy import edge_tts. Returns the module or raises ImportError."""
  83      import edge_tts
  84      return edge_tts
  85  
  86  def _import_elevenlabs():
  87      """Lazy import ElevenLabs client. Returns the class or raises ImportError."""
  88      from elevenlabs.client import ElevenLabs
  89      return ElevenLabs
  90  
  91  def _import_openai_client():
  92      """Lazy import OpenAI client. Returns the class or raises ImportError."""
  93      from openai import OpenAI as OpenAIClient
  94      return OpenAIClient
  95  
  96  def _import_mistral_client():
  97      """Lazy import Mistral client. Returns the class or raises ImportError."""
  98      from mistralai.client import Mistral
  99      return Mistral
 100  
 101  def _import_sounddevice():
 102      """Lazy import sounddevice. Returns the module or raises ImportError/OSError."""
 103      import sounddevice as sd
 104      return sd
 105  
 106  
 107  def _import_kittentts():
 108      """Lazy import KittenTTS. Returns the class or raises ImportError."""
 109      from kittentts import KittenTTS
 110      return KittenTTS
 111  
 112  
 113  def _import_piper():
 114      """Lazy import Piper. Returns the PiperVoice class or raises ImportError.
 115  
 116      Piper is an optional, fully-local neural TTS engine (Home Assistant /
 117      Open Home Foundation). ``pip install piper-tts`` provides cross-platform
 118      wheels (Linux / macOS / Windows, x86_64 + ARM64) with embedded espeak-ng.
 119      Voice models (.onnx + .onnx.json) are downloaded on first use.
 120      """
 121      from piper import PiperVoice
 122      return PiperVoice
 123  
 124  
 125  # ===========================================================================
 126  # Defaults
 127  # ===========================================================================
 128  DEFAULT_PROVIDER = "edge"
 129  DEFAULT_EDGE_VOICE = "en-US-AriaNeural"
 130  DEFAULT_ELEVENLABS_VOICE_ID = "pNInz6obpgDQGcFmaJgB"  # Adam
 131  DEFAULT_ELEVENLABS_MODEL_ID = "eleven_multilingual_v2"
 132  DEFAULT_ELEVENLABS_STREAMING_MODEL_ID = "eleven_flash_v2_5"
 133  DEFAULT_OPENAI_MODEL = "gpt-4o-mini-tts"
 134  DEFAULT_KITTENTTS_MODEL = "KittenML/kitten-tts-nano-0.8-int8"  # 25MB
 135  DEFAULT_KITTENTTS_VOICE = "Jasper"
 136  DEFAULT_PIPER_VOICE = "en_US-lessac-medium"  # balanced size/quality
 137  DEFAULT_OPENAI_VOICE = "alloy"
 138  DEFAULT_OPENAI_BASE_URL = "https://api.openai.com/v1"
 139  DEFAULT_MINIMAX_MODEL = "speech-2.8-hd"
 140  DEFAULT_MINIMAX_VOICE_ID = "English_Graceful_Lady"
 141  DEFAULT_MINIMAX_BASE_URL = "https://api.minimax.io/v1/t2a_v2"
 142  DEFAULT_MISTRAL_TTS_MODEL = "voxtral-mini-tts-2603"
 143  DEFAULT_MISTRAL_TTS_VOICE_ID = "c69964a6-ab8b-4f8a-9465-ec0925096ec8"  # Paul - Neutral
 144  DEFAULT_XAI_VOICE_ID = "eve"
 145  DEFAULT_XAI_LANGUAGE = "en"
 146  DEFAULT_XAI_SAMPLE_RATE = 24000
 147  DEFAULT_XAI_BIT_RATE = 128000
 148  DEFAULT_XAI_BASE_URL = "https://api.x.ai/v1"
 149  DEFAULT_GEMINI_TTS_MODEL = "gemini-2.5-flash-preview-tts"
 150  DEFAULT_GEMINI_TTS_VOICE = "Kore"
 151  DEFAULT_GEMINI_TTS_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
 152  # PCM output specs for Gemini TTS (fixed by the API)
 153  GEMINI_TTS_SAMPLE_RATE = 24000
 154  GEMINI_TTS_CHANNELS = 1
 155  GEMINI_TTS_SAMPLE_WIDTH = 2  # 16-bit PCM (L16)
 156  
 157  
 158  def _rewrite_requested_output_path_for_truthful_generation(
 159      provider: str,
 160      file_path: Path,
 161  ) -> Path:
 162      """Return a safe generation path that won't mislabel the container.
 163  
 164      Some providers emit a fixed native container (for example MP3 or WAV).
 165      Writing those bytes straight into a path with a different suffix creates a
 166      misleading file that downstream platforms may reject or mis-handle.
 167  
 168      For those providers, generate into a truthful sibling path first, then let
 169      the existing post-processing/conversion path promote it to ``.ogg`` when
 170      conversion succeeds. If conversion is unavailable, the tool will still
 171      return the truthful MP3/WAV path instead of a mislabeled file.
 172      """
 173      provider = (provider or "").lower().strip()
 174      requested_suffix = file_path.suffix.lower()
 175      supported_suffixes_by_provider = {
 176          "edge": {".mp3"},
 177          "xai": {".mp3", ".wav"},
 178          "minimax": {".mp3", ".wav", ".flac"},
 179          "neutts": {".wav"},
 180          "kittentts": {".wav"},
 181          "piper": {".wav"},
 182      }
 183      preferred_suffix_by_provider = {
 184          "edge": ".mp3",
 185          "xai": ".mp3",
 186          "minimax": ".mp3",
 187          "neutts": ".wav",
 188          "kittentts": ".wav",
 189          "piper": ".wav",
 190      }
 191  
 192      supported_suffixes = supported_suffixes_by_provider.get(provider)
 193      if not supported_suffixes or requested_suffix in supported_suffixes:
 194          return file_path
 195  
 196      return file_path.with_suffix(preferred_suffix_by_provider[provider])
 197  
 198  
 199  def _get_default_output_dir() -> str:
 200      from hermes_constants import get_hermes_dir
 201      return str(get_hermes_dir("cache/audio", "audio_cache"))
 202  
 203  DEFAULT_OUTPUT_DIR = _get_default_output_dir()
 204  
 205  # ---------------------------------------------------------------------------
 206  # Per-provider input-character limits (from official provider docs).
 207  # A single global cap was wrong: OpenAI is 4096, xAI is 15k, MiniMax is 10k,
 208  # ElevenLabs is model-dependent (5k / 10k / 30k / 40k), Gemini caps at ~8k
 209  # input tokens.  Users can override any of these via
 210  # ``tts.<provider>.max_text_length`` in config.yaml.
 211  # ---------------------------------------------------------------------------
 212  PROVIDER_MAX_TEXT_LENGTH: Dict[str, int] = {
 213      "edge": 5000,         # edge-tts practical sync limit
 214      "openai": 4096,       # https://platform.openai.com/docs/guides/text-to-speech
 215      "xai": 15000,         # https://docs.x.ai/developers/model-capabilities/audio/text-to-speech
 216      "minimax": 10000,     # https://platform.minimax.io/docs/api-reference/speech-t2a-http (sync)
 217      "mistral": 4000,      # conservative; no published per-request cap
 218      "gemini": 5000,       # Gemini TTS caps at ~8k input tokens / ~655s audio
 219      "elevenlabs": 10000,  # fallback when model-aware lookup can't resolve (multilingual_v2)
 220      "neutts": 2000,       # local model, quality falls off on long text
 221      "kittentts": 2000,    # local 25MB model
 222      "piper": 5000,        # local VITS model, phoneme-based; practical cap
 223  }
 224  
 225  # ElevenLabs caps vary by model_id. https://elevenlabs.io/docs/overview/models
 226  ELEVENLABS_MODEL_MAX_TEXT_LENGTH: Dict[str, int] = {
 227      "eleven_v3": 5000,
 228      "eleven_ttv_v3": 5000,
 229      "eleven_multilingual_v2": 10000,
 230      "eleven_multilingual_v1": 10000,
 231      "eleven_english_sts_v2": 10000,
 232      "eleven_english_sts_v1": 10000,
 233      "eleven_flash_v2": 30000,
 234      "eleven_flash_v2_5": 40000,
 235  }
 236  
 237  # Final fallback when provider isn't recognised at all.
 238  FALLBACK_MAX_TEXT_LENGTH = 4000
 239  
 240  # Back-compat alias. Prefer ``_resolve_max_text_length()`` for new code.
 241  MAX_TEXT_LENGTH = FALLBACK_MAX_TEXT_LENGTH
 242  
 243  
 244  def _resolve_max_text_length(
 245      provider: Optional[str],
 246      tts_config: Optional[Dict[str, Any]] = None,
 247  ) -> int:
 248      """Return the input-character cap for *provider*.
 249  
 250      Resolution order:
 251        1. ``tts.<provider>.max_text_length`` (user override in config.yaml)
 252        2. ``tts.providers.<provider>.max_text_length`` for user-declared
 253           command providers
 254        3. ElevenLabs model-aware table (keyed on configured ``model_id``)
 255        4. ``PROVIDER_MAX_TEXT_LENGTH`` default
 256        5. ``DEFAULT_COMMAND_TTS_MAX_TEXT_LENGTH`` when the provider is a
 257           command-type user provider without an explicit cap
 258        6. ``FALLBACK_MAX_TEXT_LENGTH`` (4000)
 259  
 260      Non-positive or non-integer overrides fall through to the default so a
 261      broken config can't accidentally disable truncation entirely.
 262      """
 263      if not provider:
 264          return FALLBACK_MAX_TEXT_LENGTH
 265      key = provider.lower().strip()
 266      cfg = tts_config or {}
 267  
 268      # Built-in-style override at tts.<provider>.max_text_length wins first,
 269      # matching historical behavior.
 270      prov_cfg = cfg.get(key) if isinstance(cfg.get(key), dict) else {}
 271      override = prov_cfg.get("max_text_length") if prov_cfg else None
 272      if isinstance(override, bool):
 273          override = None
 274      if isinstance(override, int) and override > 0:
 275          return override
 276  
 277      if key == "elevenlabs":
 278          model_id = (prov_cfg or {}).get("model_id") or DEFAULT_ELEVENLABS_MODEL_ID
 279          mapped = ELEVENLABS_MODEL_MAX_TEXT_LENGTH.get(str(model_id).strip())
 280          if mapped:
 281              return mapped
 282  
 283      if key in PROVIDER_MAX_TEXT_LENGTH:
 284          return PROVIDER_MAX_TEXT_LENGTH[key]
 285  
 286      # User-declared command provider (under tts.providers.<name>)
 287      if key not in BUILTIN_TTS_PROVIDERS:
 288          named = _get_named_provider_config(cfg, key)
 289          if _is_command_provider_config(named):
 290              named_override = named.get("max_text_length")
 291              if isinstance(named_override, bool):
 292                  named_override = None
 293              if isinstance(named_override, int) and named_override > 0:
 294                  return named_override
 295              return DEFAULT_COMMAND_TTS_MAX_TEXT_LENGTH
 296  
 297      return FALLBACK_MAX_TEXT_LENGTH
 298  
 299  
 300  # ===========================================================================
 301  # Config loader -- reads tts: section from ~/.hermes/config.yaml
 302  # ===========================================================================
 303  def _load_tts_config() -> Dict[str, Any]:
 304      """
 305      Load TTS configuration from ~/.hermes/config.yaml.
 306  
 307      Returns a dict with provider settings. Falls back to defaults
 308      for any missing fields.
 309      """
 310      try:
 311          from hermes_cli.config import load_config
 312          config = load_config()
 313          return config.get("tts", {})
 314      except ImportError:
 315          logger.debug("hermes_cli.config not available, using default TTS config")
 316          return {}
 317      except Exception as e:
 318          logger.warning("Failed to load TTS config: %s", e, exc_info=True)
 319          return {}
 320  
 321  
 322  def _get_provider(tts_config: Dict[str, Any]) -> str:
 323      """Get the configured TTS provider name."""
 324      return (tts_config.get("provider") or DEFAULT_PROVIDER).lower().strip()
 325  
 326  
 327  # ===========================================================================
 328  # Custom command providers (type: command under tts.providers.<name>)
 329  # ===========================================================================
 330  #
 331  # Users can declare any number of command-type providers alongside the
 332  # built-ins so they can plug any local CLI (Piper, VoxCPM, Kokoro CLIs,
 333  # custom voice-cloning scripts, etc.) into Hermes without any Python code
 334  # changes. The config shape is::
 335  #
 336  #     tts:
 337  #       provider: piper-en
 338  #       providers:
 339  #         piper-en:
 340  #           type: command
 341  #           command: "piper -m ~/model.onnx -f {output_path} < {input_path}"
 342  #           output_format: wav
 343  #
 344  # Hermes writes the input text to a temp UTF-8 file, runs the command with
 345  # placeholder substitution, and reads the audio file the command wrote to
 346  # ``{output_path}``. Supported placeholders: ``{input_path}``,
 347  # ``{text_path}`` (alias for input_path), ``{output_path}``, ``{format}``,
 348  # ``{voice}``, ``{model}``, ``{speed}``. Use ``{{`` / ``}}`` for literal braces.
 349  #
 350  # Built-in provider names always win over an entry with the same name under
 351  # ``tts.providers``, so user config can't silently shadow ``edge`` etc.
 352  #
 353  # Placeholder values are shell-quoted for their surrounding context
 354  # (bare / single / double quote), so paths with spaces work transparently.
 355  
 356  # Built-in provider names. Any ``tts.provider`` value NOT in this set is
 357  # interpreted as a reference to ``tts.providers.<name>``.
 358  BUILTIN_TTS_PROVIDERS = frozenset({
 359      "edge",
 360      "elevenlabs",
 361      "openai",
 362      "minimax",
 363      "xai",
 364      "mistral",
 365      "gemini",
 366      "neutts",
 367      "kittentts",
 368      "piper",
 369  })
 370  
 371  DEFAULT_COMMAND_TTS_TIMEOUT_SECONDS = 120
 372  DEFAULT_COMMAND_TTS_OUTPUT_FORMAT = "mp3"
 373  COMMAND_TTS_OUTPUT_FORMATS = frozenset({"mp3", "wav", "ogg", "flac"})
 374  DEFAULT_COMMAND_TTS_MAX_TEXT_LENGTH = 5000
 375  
 376  
 377  def _get_provider_section(tts_config: Dict[str, Any], name: str) -> Dict[str, Any]:
 378      """Return a provider config block if it's a dict, else an empty dict."""
 379      if not isinstance(tts_config, dict):
 380          return {}
 381      section = tts_config.get(name)
 382      return section if isinstance(section, dict) else {}
 383  
 384  
 385  def _get_named_provider_config(
 386      tts_config: Dict[str, Any],
 387      name: str,
 388  ) -> Dict[str, Any]:
 389      """Return the config dict for a user-declared provider.
 390  
 391      Looks up ``tts.providers.<name>`` first (the canonical location), and
 392      falls back to ``tts.<name>`` so users who followed the built-in layout
 393      still work. Returns an empty dict when the provider is not declared.
 394      """
 395      providers = _get_provider_section(tts_config, "providers")
 396      section = providers.get(name) if isinstance(providers, dict) else None
 397      if isinstance(section, dict):
 398          return section
 399      # Back-compat: allow ``tts.<name>`` for user-declared providers too,
 400      # but only when the name is not a built-in (so a user's ``tts.openai``
 401      # block still means the OpenAI provider, not a custom command).
 402      if name.lower() not in BUILTIN_TTS_PROVIDERS:
 403          legacy = _get_provider_section(tts_config, name)
 404          if legacy:
 405              return legacy
 406      return {}
 407  
 408  
 409  def _is_command_provider_config(config: Dict[str, Any]) -> bool:
 410      """Return True when *config* declares a command-type provider."""
 411      if not isinstance(config, dict):
 412          return False
 413      ptype = str(config.get("type") or "").strip().lower()
 414      if ptype and ptype != "command":
 415          return False
 416      command = config.get("command")
 417      return isinstance(command, str) and bool(command.strip())
 418  
 419  
 420  def _resolve_command_provider_config(
 421      provider: str,
 422      tts_config: Dict[str, Any],
 423  ) -> Optional[Dict[str, Any]]:
 424      """Return the provider config if *provider* resolves to a command type.
 425  
 426      Built-in provider names are rejected (they have native handlers).
 427      Returns None when the name is a built-in, unknown, or not a command
 428      type.
 429      """
 430      if not provider:
 431          return None
 432      key = provider.lower().strip()
 433      if key in BUILTIN_TTS_PROVIDERS:
 434          return None
 435      config = _get_named_provider_config(tts_config, key)
 436      if _is_command_provider_config(config):
 437          return config
 438      return None
 439  
 440  
 441  def _iter_command_providers(tts_config: Dict[str, Any]):
 442      """Yield (name, config) pairs for every declared command-type provider."""
 443      if not isinstance(tts_config, dict):
 444          return
 445      providers = _get_provider_section(tts_config, "providers")
 446      for name, cfg in (providers or {}).items():
 447          if isinstance(name, str) and name.lower() not in BUILTIN_TTS_PROVIDERS:
 448              if _is_command_provider_config(cfg):
 449                  yield name, cfg
 450  
 451  
 452  def _get_command_tts_timeout(config: Dict[str, Any]) -> float:
 453      """Return timeout in seconds, falling back when invalid."""
 454      raw = config.get("timeout", config.get("timeout_seconds", DEFAULT_COMMAND_TTS_TIMEOUT_SECONDS))
 455      try:
 456          value = float(raw)
 457      except (TypeError, ValueError):
 458          return float(DEFAULT_COMMAND_TTS_TIMEOUT_SECONDS)
 459      if value <= 0:
 460          return float(DEFAULT_COMMAND_TTS_TIMEOUT_SECONDS)
 461      return value
 462  
 463  
 464  def _get_command_tts_output_format(
 465      config: Dict[str, Any],
 466      output_path: Optional[str] = None,
 467  ) -> str:
 468      """Return the validated output format (mp3/wav/ogg/flac)."""
 469      if output_path:
 470          suffix = Path(output_path).suffix.lower().strip().lstrip(".")
 471          if suffix in COMMAND_TTS_OUTPUT_FORMATS:
 472              return suffix
 473      raw = (
 474          config.get("format")
 475          or config.get("output_format")
 476          or DEFAULT_COMMAND_TTS_OUTPUT_FORMAT
 477      )
 478      fmt = str(raw).lower().strip().lstrip(".")
 479      return fmt if fmt in COMMAND_TTS_OUTPUT_FORMATS else DEFAULT_COMMAND_TTS_OUTPUT_FORMAT
 480  
 481  
 482  def _is_command_tts_voice_compatible(config: Dict[str, Any]) -> bool:
 483      """Return True only when the user explicitly opted in to voice delivery."""
 484      value = config.get("voice_compatible", False)
 485      if isinstance(value, str):
 486          return value.strip().lower() in {"1", "true", "yes", "on"}
 487      return bool(value)
 488  
 489  
 490  def _shell_quote_context(command_template: str, position: int) -> Optional[str]:
 491      """Return the shell quote character active right before *position*.
 492  
 493      Returns ``"'"`` / ``'"'`` when inside a single- / double-quoted region
 494      of the template, ``None`` for bare context.
 495      """
 496      quote: Optional[str] = None
 497      escaped = False
 498      i = 0
 499      while i < position:
 500          char = command_template[i]
 501          if quote == "'":
 502              if char == "'":
 503                  quote = None
 504          elif quote == '"':
 505              if escaped:
 506                  escaped = False
 507              elif char == "\\":
 508                  escaped = True
 509              elif char == '"':
 510                  quote = None
 511          else:
 512              if char == "'":
 513                  quote = "'"
 514              elif char == '"':
 515                  quote = '"'
 516              elif char == "\\":
 517                  i += 1
 518          i += 1
 519      return quote
 520  
 521  
 522  def _quote_command_tts_placeholder(value: str, quote_context: Optional[str]) -> str:
 523      """Quote a placeholder value for its position in a shell command template."""
 524      if quote_context == "'":
 525          return value.replace("'", r"'\''")
 526      if quote_context == '"':
 527          return (
 528              value
 529              .replace("\\", "\\\\")
 530              .replace('"', r'\"')
 531              .replace("$", r"\$")
 532              .replace("`", r"\`")
 533          )
 534      if os.name == "nt":
 535          return subprocess.list2cmdline([value])
 536      return shlex.quote(value)
 537  
 538  
 539  def _render_command_tts_template(
 540      command_template: str,
 541      placeholders: Dict[str, str],
 542  ) -> str:
 543      """Replace supported placeholders while preserving ``{{`` / ``}}``."""
 544      names = "|".join(re.escape(name) for name in placeholders)
 545      pattern = re.compile(
 546          rf"(?<!\$)(?:\{{\{{(?P<double>{names})\}}\}}|\{{(?P<single>{names})\}})"
 547      )
 548      replacements: list[tuple[str, str]] = []
 549  
 550      def replace_match(match: re.Match[str]) -> str:
 551          name = match.group("double") or match.group("single")
 552          token = f"__HERMES_TTS_PLACEHOLDER_{len(replacements)}__"
 553          replacements.append((
 554              token,
 555              _quote_command_tts_placeholder(
 556                  placeholders[name],
 557                  _shell_quote_context(command_template, match.start()),
 558              ),
 559          ))
 560          return token
 561  
 562      rendered = pattern.sub(replace_match, command_template)
 563      rendered = rendered.replace("{{", "{").replace("}}", "}")
 564      for token, value in replacements:
 565          rendered = rendered.replace(token, value)
 566      return rendered
 567  
 568  
 569  def _terminate_command_tts_process_tree(proc: subprocess.Popen) -> None:
 570      """Best-effort termination of a shell process and all of its children."""
 571      if proc.poll() is not None:
 572          return
 573  
 574      if os.name == "nt":
 575          try:
 576              subprocess.run(
 577                  ["taskkill", "/F", "/T", "/PID", str(proc.pid)],
 578                  stdout=subprocess.DEVNULL,
 579                  stderr=subprocess.DEVNULL,
 580                  timeout=5,
 581              )
 582          except Exception:
 583              proc.kill()
 584          return
 585  
 586      try:
 587          os.killpg(proc.pid, signal.SIGTERM)
 588      except ProcessLookupError:
 589          return
 590      except Exception:
 591          proc.terminate()
 592  
 593      try:
 594          proc.wait(timeout=2)
 595          return
 596      except subprocess.TimeoutExpired:
 597          pass
 598  
 599      try:
 600          os.killpg(proc.pid, signal.SIGKILL)
 601      except ProcessLookupError:
 602          return
 603      except Exception:
 604          proc.kill()
 605  
 606  
 607  def _run_command_tts(command: str, timeout: float) -> subprocess.CompletedProcess:
 608      """Run a command-provider shell command with process-tree timeout cleanup."""
 609      popen_kwargs: Dict[str, Any] = {
 610          "shell": True,
 611          "stdout": subprocess.PIPE,
 612          "stderr": subprocess.PIPE,
 613          "text": True,
 614      }
 615      if os.name == "nt":
 616          popen_kwargs["creationflags"] = getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
 617      else:
 618          popen_kwargs["start_new_session"] = True
 619  
 620      proc = subprocess.Popen(command, **popen_kwargs)
 621      try:
 622          stdout, stderr = proc.communicate(timeout=timeout)
 623      except subprocess.TimeoutExpired as exc:
 624          _terminate_command_tts_process_tree(proc)
 625          try:
 626              stdout, stderr = proc.communicate(timeout=1)
 627          except Exception:
 628              stdout = getattr(exc, "output", None)
 629              stderr = getattr(exc, "stderr", None)
 630          raise subprocess.TimeoutExpired(
 631              command,
 632              timeout,
 633              output=stdout,
 634              stderr=stderr,
 635          ) from exc
 636  
 637      if proc.returncode:
 638          raise subprocess.CalledProcessError(
 639              proc.returncode,
 640              command,
 641              output=stdout,
 642              stderr=stderr,
 643          )
 644      return subprocess.CompletedProcess(command, proc.returncode, stdout, stderr)
 645  
 646  
 647  def _configured_command_tts_output_path(path: Path, config: Dict[str, Any]) -> Path:
 648      """Return an output path whose extension matches the provider's output_format."""
 649      fmt = _get_command_tts_output_format(config)
 650      return path.with_suffix(f".{fmt}")
 651  
 652  
 653  def _generate_command_tts(
 654      text: str,
 655      output_path: str,
 656      provider_name: str,
 657      config: Dict[str, Any],
 658      tts_config: Dict[str, Any],
 659  ) -> str:
 660      """Generate speech by running a user-configured shell command.
 661  
 662      Returns the absolute path of the audio file the command wrote.
 663      Raises ``ValueError`` when the provider config is invalid, and
 664      ``RuntimeError`` for timeouts / non-zero exits / empty output.
 665      """
 666      command_template = str(config.get("command") or "").strip()
 667      if not command_template:
 668          raise ValueError(
 669              f"tts.providers.{provider_name}.command is not configured"
 670          )
 671  
 672      output = Path(output_path).expanduser()
 673      output.parent.mkdir(parents=True, exist_ok=True)
 674      if output.exists():
 675          output.unlink()
 676  
 677      timeout = _get_command_tts_timeout(config)
 678      output_format = _get_command_tts_output_format(config, str(output))
 679      speed = config.get("speed", tts_config.get("speed", ""))
 680  
 681      with tempfile.TemporaryDirectory() as tmpdir:
 682          text_path = Path(tmpdir) / "input.txt"
 683          text_path.write_text(text, encoding="utf-8")
 684  
 685          placeholders = {
 686              "input_path": str(text_path),
 687              "text_path": str(text_path),
 688              "output_path": str(output),
 689              "format": output_format,
 690              "voice": str(config.get("voice", "")),
 691              "model": str(config.get("model", "")),
 692              "speed": str(speed),
 693          }
 694          command = _render_command_tts_template(command_template, placeholders)
 695  
 696          try:
 697              _run_command_tts(command, timeout)
 698          except subprocess.TimeoutExpired as exc:
 699              raise RuntimeError(
 700                  f"TTS provider '{provider_name}' timed out after {timeout:g}s"
 701              ) from exc
 702          except subprocess.CalledProcessError as exc:
 703              detail_parts = []
 704              if exc.stderr:
 705                  detail_parts.append(f"stderr: {exc.stderr.strip()}")
 706              if exc.stdout:
 707                  detail_parts.append(f"stdout: {exc.stdout.strip()}")
 708              detail = "; ".join(detail_parts) or "no command output"
 709              raise RuntimeError(
 710                  f"TTS provider '{provider_name}' exited with code "
 711                  f"{exc.returncode}: {detail}"
 712              ) from exc
 713  
 714      if not output.exists() or output.stat().st_size <= 0:
 715          raise RuntimeError(
 716              f"TTS provider '{provider_name}' produced no output at {output}"
 717          )
 718      return str(output)
 719  
 720  
 721  def _has_any_command_tts_provider(tts_config: Optional[Dict[str, Any]] = None) -> bool:
 722      """Return True when any command-type TTS provider is configured."""
 723      if tts_config is None:
 724          tts_config = _load_tts_config()
 725      for _name, _cfg in _iter_command_providers(tts_config):
 726          return True
 727      return False
 728  
 729  
 730  # ===========================================================================
 731  # ffmpeg Opus conversion (Edge TTS MP3 -> OGG Opus for Telegram)
 732  # ===========================================================================
 733  def _has_ffmpeg() -> bool:
 734      """Check if ffmpeg is available on the system."""
 735      return shutil.which("ffmpeg") is not None
 736  
 737  
 738  def _convert_to_opus(mp3_path: str) -> Optional[str]:
 739      """
 740      Convert an MP3 file to OGG Opus format for Telegram voice bubbles.
 741  
 742      Args:
 743          mp3_path: Path to the input MP3 file.
 744  
 745      Returns:
 746          Path to the .ogg file, or None if conversion fails.
 747      """
 748      if not _has_ffmpeg():
 749          return None
 750  
 751      ogg_path = mp3_path.rsplit(".", 1)[0] + ".ogg"
 752      try:
 753          result = subprocess.run(
 754              ["ffmpeg", "-i", mp3_path, "-acodec", "libopus",
 755               "-ac", "1", "-b:a", "64k", "-vbr", "off", ogg_path, "-y"],
 756              capture_output=True, timeout=30,
 757          )
 758          if result.returncode != 0:
 759              logger.warning("ffmpeg conversion failed with return code %d: %s", 
 760                            result.returncode, result.stderr.decode('utf-8', errors='ignore')[:200])
 761              return None
 762          if os.path.exists(ogg_path) and os.path.getsize(ogg_path) > 0:
 763              return ogg_path
 764      except subprocess.TimeoutExpired:
 765          logger.warning("ffmpeg OGG conversion timed out after 30s")
 766      except FileNotFoundError:
 767          logger.warning("ffmpeg not found in PATH")
 768      except Exception as e:
 769          logger.warning("ffmpeg OGG conversion failed: %s", e, exc_info=True)
 770      return None
 771  
 772  
 773  # ===========================================================================
 774  # Provider: Edge TTS (free)
 775  # ===========================================================================
 776  async def _generate_edge_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
 777      """
 778      Generate audio using Edge TTS.
 779  
 780      Args:
 781          text: Text to convert.
 782          output_path: Where to save the MP3 file.
 783          tts_config: TTS config dict.
 784  
 785      Returns:
 786          Path to the saved audio file.
 787      """
 788      _edge_tts = _import_edge_tts()
 789      edge_config = tts_config.get("edge", {})
 790      voice = edge_config.get("voice", DEFAULT_EDGE_VOICE)
 791      speed = float(edge_config.get("speed", tts_config.get("speed", 1.0)))
 792  
 793      kwargs = {"voice": voice}
 794      if speed != 1.0:
 795          pct = round((speed - 1.0) * 100)
 796          kwargs["rate"] = f"{pct:+d}%"
 797  
 798      communicate = _edge_tts.Communicate(text, **kwargs)
 799      await communicate.save(output_path)
 800      return output_path
 801  
 802  
 803  # ===========================================================================
 804  # Provider: ElevenLabs (premium)
 805  # ===========================================================================
 806  def _generate_elevenlabs(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
 807      """
 808      Generate audio using ElevenLabs.
 809  
 810      Args:
 811          text: Text to convert.
 812          output_path: Where to save the audio file.
 813          tts_config: TTS config dict.
 814  
 815      Returns:
 816          Path to the saved audio file.
 817      """
 818      api_key = (get_env_value("ELEVENLABS_API_KEY") or "")
 819      if not api_key:
 820          raise ValueError("ELEVENLABS_API_KEY not set. Get one at https://elevenlabs.io/")
 821  
 822      el_config = tts_config.get("elevenlabs", {})
 823      voice_id = el_config.get("voice_id", DEFAULT_ELEVENLABS_VOICE_ID)
 824      model_id = el_config.get("model_id", DEFAULT_ELEVENLABS_MODEL_ID)
 825  
 826      # Determine output format based on file extension
 827      if output_path.endswith(".ogg"):
 828          output_format = "opus_48000_64"
 829      else:
 830          output_format = "mp3_44100_128"
 831  
 832      ElevenLabs = _import_elevenlabs()
 833      client = ElevenLabs(api_key=api_key)
 834      audio_generator = client.text_to_speech.convert(
 835          text=text,
 836          voice_id=voice_id,
 837          model_id=model_id,
 838          output_format=output_format,
 839      )
 840  
 841      # audio_generator yields chunks -- write them all
 842      with open(output_path, "wb") as f:
 843          for chunk in audio_generator:
 844              f.write(chunk)
 845  
 846      return output_path
 847  
 848  
 849  # ===========================================================================
 850  # Provider: OpenAI TTS
 851  # ===========================================================================
 852  def _generate_openai_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
 853      """
 854      Generate audio using OpenAI TTS.
 855  
 856      Args:
 857          text: Text to convert.
 858          output_path: Where to save the audio file.
 859          tts_config: TTS config dict.
 860  
 861      Returns:
 862          Path to the saved audio file.
 863      """
 864      api_key, base_url = _resolve_openai_audio_client_config()
 865  
 866      oai_config = tts_config.get("openai", {})
 867      model = oai_config.get("model", DEFAULT_OPENAI_MODEL)
 868      voice = oai_config.get("voice", DEFAULT_OPENAI_VOICE)
 869      base_url = oai_config.get("base_url", base_url)
 870      speed = float(oai_config.get("speed", tts_config.get("speed", 1.0)))
 871  
 872      # Determine response format from extension
 873      if output_path.endswith(".ogg"):
 874          response_format = "opus"
 875      else:
 876          response_format = "mp3"
 877  
 878      OpenAIClient = _import_openai_client()
 879      client = OpenAIClient(api_key=api_key, base_url=base_url)
 880      try:
 881          create_kwargs = dict(
 882              model=model,
 883              voice=voice,
 884              input=text,
 885              response_format=response_format,
 886              extra_headers={"x-idempotency-key": str(uuid.uuid4())},
 887          )
 888          if speed != 1.0:
 889              create_kwargs["speed"] = max(0.25, min(4.0, speed))
 890          response = client.audio.speech.create(**create_kwargs)
 891  
 892          response.stream_to_file(output_path)
 893          return output_path
 894      finally:
 895          close = getattr(client, "close", None)
 896          if callable(close):
 897              close()
 898  
 899  
 900  # ===========================================================================
 901  # Provider: xAI TTS
 902  # ===========================================================================
 903  def _generate_xai_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
 904      """
 905      Generate audio using xAI TTS.
 906  
 907      xAI exposes a dedicated /v1/tts endpoint instead of the OpenAI audio.speech
 908      API shape, so this is implemented as a separate backend.
 909      """
 910      import requests
 911  
 912      api_key = (get_env_value("XAI_API_KEY") or "").strip()
 913      if not api_key:
 914          raise ValueError("XAI_API_KEY not set. Get one at https://console.x.ai/")
 915  
 916      xai_config = tts_config.get("xai", {})
 917      voice_id = str(xai_config.get("voice_id", DEFAULT_XAI_VOICE_ID)).strip() or DEFAULT_XAI_VOICE_ID
 918      language = str(xai_config.get("language", DEFAULT_XAI_LANGUAGE)).strip() or DEFAULT_XAI_LANGUAGE
 919      sample_rate = int(xai_config.get("sample_rate", DEFAULT_XAI_SAMPLE_RATE))
 920      bit_rate = int(xai_config.get("bit_rate", DEFAULT_XAI_BIT_RATE))
 921      base_url = str(
 922          xai_config.get("base_url")
 923          or get_env_value("XAI_BASE_URL")
 924          or DEFAULT_XAI_BASE_URL
 925      ).strip().rstrip("/")
 926  
 927      # Match the documented minimal POST /v1/tts shape by default. Only send
 928      # output_format when Hermes actually needs a non-default format/override.
 929      codec = "wav" if output_path.endswith(".wav") else "mp3"
 930      payload: Dict[str, Any] = {
 931          "text": text,
 932          "voice_id": voice_id,
 933          "language": language,
 934      }
 935      if (
 936          codec != "mp3"
 937          or sample_rate != DEFAULT_XAI_SAMPLE_RATE
 938          or (codec == "mp3" and bit_rate != DEFAULT_XAI_BIT_RATE)
 939      ):
 940          output_format: Dict[str, Any] = {"codec": codec}
 941          if sample_rate:
 942              output_format["sample_rate"] = sample_rate
 943          if codec == "mp3" and bit_rate:
 944              output_format["bit_rate"] = bit_rate
 945          payload["output_format"] = output_format
 946  
 947      response = requests.post(
 948          f"{base_url}/tts",
 949          headers={
 950              "Authorization": f"Bearer {api_key}",
 951              "Content-Type": "application/json",
 952              "User-Agent": hermes_xai_user_agent(),
 953          },
 954          json=payload,
 955          timeout=60,
 956      )
 957      response.raise_for_status()
 958  
 959      with open(output_path, "wb") as f:
 960          f.write(response.content)
 961  
 962      return output_path
 963  
 964  
 965  # ===========================================================================
 966  # Provider: MiniMax TTS
 967  # ===========================================================================
 968  def _generate_minimax_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
 969      """
 970      Generate audio using MiniMax TTS API.
 971  
 972      MiniMax returns hex-encoded audio data. Supports streaming (SSE) and
 973      non-streaming modes. This implementation uses non-streaming for simplicity.
 974  
 975      Args:
 976          text: Text to convert (max 10,000 characters).
 977          output_path: Where to save the audio file.
 978          tts_config: TTS config dict.
 979  
 980      Returns:
 981          Path to the saved audio file.
 982      """
 983      import requests
 984  
 985      api_key = (get_env_value("MINIMAX_API_KEY") or "")
 986      if not api_key:
 987          raise ValueError("MINIMAX_API_KEY not set. Get one at https://platform.minimax.io/")
 988  
 989      mm_config = tts_config.get("minimax", {})
 990      model = mm_config.get("model", DEFAULT_MINIMAX_MODEL)
 991      voice_id = mm_config.get("voice_id", DEFAULT_MINIMAX_VOICE_ID)
 992      speed = mm_config.get("speed", tts_config.get("speed", 1))
 993      vol = mm_config.get("vol", 1)
 994      pitch = mm_config.get("pitch", 0)
 995      base_url = mm_config.get("base_url", DEFAULT_MINIMAX_BASE_URL)
 996  
 997      # Determine audio format from output extension
 998      if output_path.endswith(".wav"):
 999          audio_format = "wav"
1000      elif output_path.endswith(".flac"):
1001          audio_format = "flac"
1002      else:
1003          audio_format = "mp3"
1004  
1005      payload = {
1006          "model": model,
1007          "text": text,
1008          "stream": False,
1009          "voice_setting": {
1010              "voice_id": voice_id,
1011              "speed": speed,
1012              "vol": vol,
1013              "pitch": pitch,
1014          },
1015          "audio_setting": {
1016              "sample_rate": 32000,
1017              "bitrate": 128000,
1018              "format": audio_format,
1019              "channel": 1,
1020          },
1021      }
1022  
1023      headers = {
1024          "Content-Type": "application/json",
1025          "Authorization": f"Bearer {api_key}",
1026      }
1027  
1028      response = requests.post(base_url, json=payload, headers=headers, timeout=60)
1029      response.raise_for_status()
1030  
1031      result = response.json()
1032      base_resp = result.get("base_resp", {})
1033      status_code = base_resp.get("status_code", -1)
1034  
1035      if status_code != 0:
1036          status_msg = base_resp.get("status_msg", "unknown error")
1037          raise RuntimeError(f"MiniMax TTS API error (code {status_code}): {status_msg}")
1038  
1039      hex_audio = result.get("data", {}).get("audio", "")
1040      if not hex_audio:
1041          raise RuntimeError("MiniMax TTS returned empty audio data")
1042  
1043      # MiniMax returns hex-encoded audio (not base64)
1044      audio_bytes = bytes.fromhex(hex_audio)
1045  
1046      with open(output_path, "wb") as f:
1047          f.write(audio_bytes)
1048  
1049      return output_path
1050  
1051  
1052  # ===========================================================================
1053  # Provider: Mistral (Voxtral TTS)
1054  # ===========================================================================
1055  def _generate_mistral_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
1056      """Generate audio using Mistral Voxtral TTS API.
1057  
1058      The API returns base64-encoded audio; this function decodes it
1059      and writes the raw bytes to *output_path*.
1060      Supports native Opus output for Telegram voice bubbles.
1061      """
1062      api_key = (get_env_value("MISTRAL_API_KEY") or "")
1063      if not api_key:
1064          raise ValueError("MISTRAL_API_KEY not set. Get one at https://console.mistral.ai/")
1065  
1066      mi_config = tts_config.get("mistral", {})
1067      model = mi_config.get("model", DEFAULT_MISTRAL_TTS_MODEL)
1068      voice_id = mi_config.get("voice_id") or DEFAULT_MISTRAL_TTS_VOICE_ID
1069  
1070      if output_path.endswith(".ogg"):
1071          response_format = "opus"
1072      elif output_path.endswith(".wav"):
1073          response_format = "wav"
1074      elif output_path.endswith(".flac"):
1075          response_format = "flac"
1076      else:
1077          response_format = "mp3"
1078  
1079      Mistral = _import_mistral_client()
1080      try:
1081          with Mistral(api_key=api_key) as client:
1082              response = client.audio.speech.complete(
1083                  model=model,
1084                  input=text,
1085                  voice_id=voice_id,
1086                  response_format=response_format,
1087              )
1088              audio_bytes = base64.b64decode(response.audio_data)
1089      except ValueError:
1090          raise
1091      except Exception as e:
1092          logger.error("Mistral TTS failed: %s", e, exc_info=True)
1093          raise RuntimeError(f"Mistral TTS failed: {type(e).__name__}") from e
1094  
1095      with open(output_path, "wb") as f:
1096          f.write(audio_bytes)
1097  
1098      return output_path
1099  
1100  
1101  # ===========================================================================
1102  # Provider: Google Gemini TTS
1103  # ===========================================================================
1104  def _wrap_pcm_as_wav(
1105      pcm_bytes: bytes,
1106      sample_rate: int = GEMINI_TTS_SAMPLE_RATE,
1107      channels: int = GEMINI_TTS_CHANNELS,
1108      sample_width: int = GEMINI_TTS_SAMPLE_WIDTH,
1109  ) -> bytes:
1110      """Wrap raw signed-little-endian PCM with a standard WAV RIFF header.
1111  
1112      Gemini TTS returns audio/L16;codec=pcm;rate=24000 -- raw PCM samples with
1113      no container. We add a minimal WAV header so the file is playable and
1114      ffmpeg can re-encode it to MP3/Opus downstream.
1115      """
1116      import struct
1117  
1118      byte_rate = sample_rate * channels * sample_width
1119      block_align = channels * sample_width
1120      data_size = len(pcm_bytes)
1121      fmt_chunk = struct.pack(
1122          "<4sIHHIIHH",
1123          b"fmt ",
1124          16,             # fmt chunk size (PCM)
1125          1,              # audio format (PCM)
1126          channels,
1127          sample_rate,
1128          byte_rate,
1129          block_align,
1130          sample_width * 8,
1131      )
1132      data_chunk_header = struct.pack("<4sI", b"data", data_size)
1133      riff_size = 4 + len(fmt_chunk) + len(data_chunk_header) + data_size
1134      riff_header = struct.pack("<4sI4s", b"RIFF", riff_size, b"WAVE")
1135      return riff_header + fmt_chunk + data_chunk_header + pcm_bytes
1136  
1137  
1138  def _generate_gemini_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
1139      """Generate audio using Google Gemini TTS.
1140  
1141      Gemini's generateContent endpoint with responseModalities=["AUDIO"] returns
1142      raw 24kHz mono 16-bit PCM (L16) as base64. We wrap it with a WAV RIFF
1143      header to produce a playable file, then ffmpeg-convert to MP3 / Opus if
1144      the caller requested those formats (same pattern as NeuTTS).
1145  
1146      Args:
1147          text: Text to convert (prompt-style; supports inline direction like
1148                "Say cheerfully:" and audio tags like [whispers]).
1149          output_path: Where to save the audio file (.wav, .mp3, or .ogg).
1150          tts_config: TTS config dict.
1151  
1152      Returns:
1153          Path to the saved audio file.
1154      """
1155      import requests
1156  
1157      api_key = (get_env_value("GEMINI_API_KEY") or get_env_value("GOOGLE_API_KEY") or "").strip()
1158      if not api_key:
1159          raise ValueError(
1160              "GEMINI_API_KEY not set. Get one at https://aistudio.google.com/app/apikey"
1161          )
1162  
1163      gemini_config = tts_config.get("gemini", {})
1164      model = str(gemini_config.get("model", DEFAULT_GEMINI_TTS_MODEL)).strip() or DEFAULT_GEMINI_TTS_MODEL
1165      voice = str(gemini_config.get("voice", DEFAULT_GEMINI_TTS_VOICE)).strip() or DEFAULT_GEMINI_TTS_VOICE
1166      base_url = str(
1167          gemini_config.get("base_url")
1168          or get_env_value("GEMINI_BASE_URL")
1169          or DEFAULT_GEMINI_TTS_BASE_URL
1170      ).strip().rstrip("/")
1171  
1172      payload: Dict[str, Any] = {
1173          "contents": [{"parts": [{"text": text}]}],
1174          "generationConfig": {
1175              "responseModalities": ["AUDIO"],
1176              "speechConfig": {
1177                  "voiceConfig": {
1178                      "prebuiltVoiceConfig": {"voiceName": voice},
1179                  },
1180              },
1181          },
1182      }
1183  
1184      endpoint = f"{base_url}/models/{model}:generateContent"
1185      response = requests.post(
1186          endpoint,
1187          params={"key": api_key},
1188          headers={"Content-Type": "application/json"},
1189          json=payload,
1190          timeout=60,
1191      )
1192      if response.status_code != 200:
1193          # Surface the API error message when present
1194          try:
1195              err = response.json().get("error", {})
1196              detail = err.get("message") or response.text[:300]
1197          except Exception:
1198              detail = response.text[:300]
1199          raise RuntimeError(
1200              f"Gemini TTS API error (HTTP {response.status_code}): {detail}"
1201          )
1202  
1203      try:
1204          data = response.json()
1205          parts = data["candidates"][0]["content"]["parts"]
1206          audio_part = next((p for p in parts if "inlineData" in p or "inline_data" in p), None)
1207          if audio_part is None:
1208              raise RuntimeError("Gemini TTS response contained no audio data")
1209          inline = audio_part.get("inlineData") or audio_part.get("inline_data") or {}
1210          audio_b64 = inline.get("data", "")
1211      except (KeyError, IndexError, TypeError) as e:
1212          raise RuntimeError(f"Gemini TTS response was malformed: {e}") from e
1213  
1214      if not audio_b64:
1215          raise RuntimeError("Gemini TTS returned empty audio data")
1216  
1217      pcm_bytes = base64.b64decode(audio_b64)
1218      wav_bytes = _wrap_pcm_as_wav(pcm_bytes)
1219  
1220      # Fast path: caller wants WAV directly, just write.
1221      if output_path.lower().endswith(".wav"):
1222          with open(output_path, "wb") as f:
1223              f.write(wav_bytes)
1224          return output_path
1225  
1226      # Otherwise write WAV to a temp file and ffmpeg-convert to the target
1227      # format (.mp3 or .ogg). If ffmpeg is missing, fall back to renaming the
1228      # WAV -- this matches the NeuTTS behavior and keeps the tool usable on
1229      # systems without ffmpeg (audio still plays, just with a misleading
1230      # extension).
1231      with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
1232          tmp.write(wav_bytes)
1233          wav_path = tmp.name
1234  
1235      try:
1236          ffmpeg = shutil.which("ffmpeg")
1237          if ffmpeg:
1238              # For .ogg output, force libopus encoding (Telegram voice bubbles
1239              # require Opus specifically; ffmpeg's default for .ogg is Vorbis).
1240              if output_path.lower().endswith(".ogg"):
1241                  cmd = [
1242                      ffmpeg, "-i", wav_path,
1243                      "-acodec", "libopus", "-ac", "1",
1244                      "-b:a", "64k", "-vbr", "off",
1245                      "-y", "-loglevel", "error",
1246                      output_path,
1247                  ]
1248              else:
1249                  cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path]
1250              result = subprocess.run(cmd, capture_output=True, timeout=30)
1251              if result.returncode != 0:
1252                  stderr = result.stderr.decode("utf-8", errors="ignore")[:300]
1253                  raise RuntimeError(f"ffmpeg conversion failed: {stderr}")
1254          else:
1255              logger.warning(
1256                  "ffmpeg not found; writing raw WAV to %s (extension may be misleading)",
1257                  output_path,
1258              )
1259              shutil.copyfile(wav_path, output_path)
1260      finally:
1261          try:
1262              os.remove(wav_path)
1263          except OSError:
1264              pass
1265  
1266      return output_path
1267  
1268  
1269  # ===========================================================================
1270  # NeuTTS (local, on-device TTS via neutts_cli)
1271  # ===========================================================================
1272  
1273  def _check_neutts_available() -> bool:
1274      """Check if the neutts engine is importable (installed locally)."""
1275      try:
1276          import importlib.util
1277          return importlib.util.find_spec("neutts") is not None
1278      except Exception:
1279          return False
1280  
1281  
1282  def _check_kittentts_available() -> bool:
1283      """Check if the kittentts engine is importable (installed locally)."""
1284      try:
1285          import importlib.util
1286          return importlib.util.find_spec("kittentts") is not None
1287      except Exception:
1288          return False
1289  
1290  
1291  def _default_neutts_ref_audio() -> str:
1292      """Return path to the bundled default voice reference audio."""
1293      return str(Path(__file__).parent / "neutts_samples" / "jo.wav")
1294  
1295  
1296  def _default_neutts_ref_text() -> str:
1297      """Return path to the bundled default voice reference transcript."""
1298      return str(Path(__file__).parent / "neutts_samples" / "jo.txt")
1299  
1300  
1301  def _generate_neutts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
1302      """Generate speech using the local NeuTTS engine.
1303  
1304      Runs synthesis in a subprocess via tools/neutts_synth.py to keep the
1305      ~500MB model in a separate process that exits after synthesis.
1306      Outputs WAV; the caller handles conversion for Telegram if needed.
1307      """
1308      import sys
1309  
1310      neutts_config = tts_config.get("neutts", {})
1311      ref_audio = neutts_config.get("ref_audio", "") or _default_neutts_ref_audio()
1312      ref_text = neutts_config.get("ref_text", "") or _default_neutts_ref_text()
1313      model = neutts_config.get("model", "neuphonic/neutts-air-q4-gguf")
1314      device = neutts_config.get("device", "cpu")
1315  
1316      # NeuTTS outputs WAV natively — use a .wav path for generation,
1317      # let the caller convert to the final format afterward.
1318      wav_path = output_path
1319      if not output_path.endswith(".wav"):
1320          wav_path = output_path.rsplit(".", 1)[0] + ".wav"
1321  
1322      synth_script = str(Path(__file__).parent / "neutts_synth.py")
1323      cmd = [
1324          sys.executable, synth_script,
1325          "--text", text,
1326          "--out", wav_path,
1327          "--ref-audio", ref_audio,
1328          "--ref-text", ref_text,
1329          "--model", model,
1330          "--device", device,
1331      ]
1332  
1333      result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
1334      if result.returncode != 0:
1335          stderr = result.stderr.strip()
1336          # Filter out the "OK:" line from stderr
1337          error_lines = [l for l in stderr.splitlines() if not l.startswith("OK:")]
1338          raise RuntimeError(f"NeuTTS synthesis failed: {chr(10).join(error_lines) or 'unknown error'}")
1339  
1340      # If the caller wanted .mp3 or .ogg, convert from WAV
1341      if wav_path != output_path:
1342          ffmpeg = shutil.which("ffmpeg")
1343          if ffmpeg:
1344              conv_cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path]
1345              subprocess.run(conv_cmd, check=True, timeout=30)
1346              os.remove(wav_path)
1347          else:
1348              # No ffmpeg — just rename the WAV to the expected path
1349              os.rename(wav_path, output_path)
1350  
1351      return output_path
1352  
1353  
1354  # ===========================================================================
1355  # Provider: Piper (local, neural VITS, 44 languages)
1356  # ===========================================================================
1357  
1358  # Module-level cache for Piper voice instances. Voices are keyed on their
1359  # absolute .onnx model path so switching voices doesn't invalidate older
1360  # cached voices.
1361  _piper_voice_cache: Dict[str, Any] = {}
1362  
1363  
1364  def _check_piper_available() -> bool:
1365      """Check whether the piper-tts package is importable."""
1366      try:
1367          import importlib.util
1368          return importlib.util.find_spec("piper") is not None
1369      except Exception:
1370          return False
1371  
1372  
1373  def _get_piper_voices_dir() -> Path:
1374      """Return the directory where Hermes caches Piper voice models.
1375  
1376      Resolves to ``~/.hermes/cache/piper-voices/`` under the active
1377      HERMES_HOME so voice downloads follow profile boundaries.
1378      """
1379      from hermes_constants import get_hermes_dir
1380      root = Path(get_hermes_dir("cache/piper-voices", "piper_voices_cache"))
1381      root.mkdir(parents=True, exist_ok=True)
1382      return root
1383  
1384  
1385  def _resolve_piper_voice_path(voice: str, download_dir: Path) -> str:
1386      """Resolve *voice* (a model name or path) to a concrete .onnx file path.
1387  
1388      Accepts any of:
1389        - Absolute / expanded path to an .onnx file the user already has
1390        - A voice *name* like ``en_US-lessac-medium`` (downloads to
1391          ``download_dir`` on first use via ``python -m piper.download_voices``)
1392  
1393      Raises RuntimeError if the model can't be located or downloaded.
1394      """
1395      if not voice:
1396          voice = DEFAULT_PIPER_VOICE
1397  
1398      # Case 1: user gave a direct file path.
1399      candidate = Path(voice).expanduser()
1400      if candidate.suffix.lower() == ".onnx" and candidate.exists():
1401          return str(candidate)
1402  
1403      # Case 2: user gave a voice *name*. See if it's already downloaded.
1404      cached = download_dir / f"{voice}.onnx"
1405      if cached.exists() and (download_dir / f"{voice}.onnx.json").exists():
1406          return str(cached)
1407  
1408      # Case 3: download the voice. piper ships a download helper module.
1409      import sys as _sys
1410      logger.info("[Piper] Downloading voice '%s' to %s (first use)", voice, download_dir)
1411      try:
1412          result = subprocess.run(
1413              [_sys.executable, "-m", "piper.download_voices", voice,
1414               "--download-dir", str(download_dir)],
1415              capture_output=True, text=True, timeout=300,
1416          )
1417      except subprocess.TimeoutExpired as exc:
1418          raise RuntimeError(
1419              f"Piper voice download timed out after 300s for '{voice}'"
1420          ) from exc
1421  
1422      if result.returncode != 0:
1423          stderr = (result.stderr or "").strip() or "no stderr output"
1424          raise RuntimeError(
1425              f"Piper voice download failed for '{voice}': {stderr[:400]}"
1426          )
1427  
1428      if not cached.exists():
1429          raise RuntimeError(
1430              f"Piper voice download completed but {cached} is missing — "
1431              f"check voice name (see: https://github.com/OHF-Voice/piper1-gpl/"
1432              f"blob/main/docs/VOICES.md)"
1433          )
1434      return str(cached)
1435  
1436  
1437  def _generate_piper_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
1438      """Generate speech using the local Piper engine.
1439  
1440      Loads the voice model once per process (cached by absolute path) and
1441      writes a WAV file. Caller is responsible for converting to MP3/Opus
1442      via ffmpeg when a different output format is required.
1443      """
1444      PiperVoice = _import_piper()
1445      import wave
1446  
1447      piper_config = tts_config.get("piper", {}) if isinstance(tts_config, dict) else {}
1448      voice_name = piper_config.get("voice") or DEFAULT_PIPER_VOICE
1449      download_dir = Path(piper_config.get("voices_dir") or _get_piper_voices_dir()).expanduser()
1450      download_dir.mkdir(parents=True, exist_ok=True)
1451      use_cuda = bool(piper_config.get("use_cuda", False))
1452  
1453      model_path = _resolve_piper_voice_path(voice_name, download_dir)
1454  
1455      cache_key = f"{model_path}::cuda={use_cuda}"
1456      global _piper_voice_cache
1457      if cache_key not in _piper_voice_cache:
1458          logger.info("[Piper] Loading voice: %s", model_path)
1459          _piper_voice_cache[cache_key] = PiperVoice.load(model_path, use_cuda=use_cuda)
1460          logger.info("[Piper] Voice loaded")
1461      voice = _piper_voice_cache[cache_key]
1462  
1463      # Optional synthesis knobs — only pass a SynthesisConfig when at least
1464      # one advanced knob is configured, so we don't depend on a newer Piper
1465      # version than the user's installed one unless we need to.
1466      syn_config = None
1467      has_advanced = any(
1468          k in piper_config
1469          for k in ("length_scale", "noise_scale", "noise_w_scale", "volume", "normalize_audio")
1470      )
1471      if has_advanced:
1472          try:
1473              from piper import SynthesisConfig  # type: ignore
1474              syn_config = SynthesisConfig(
1475                  length_scale=float(piper_config.get("length_scale", 1.0)),
1476                  noise_scale=float(piper_config.get("noise_scale", 0.667)),
1477                  noise_w_scale=float(piper_config.get("noise_w_scale", 0.8)),
1478                  volume=float(piper_config.get("volume", 1.0)),
1479                  normalize_audio=bool(piper_config.get("normalize_audio", True)),
1480              )
1481          except ImportError:
1482              logger.warning(
1483                  "[Piper] SynthesisConfig not available in this piper-tts "
1484                  "version — advanced knobs ignored"
1485              )
1486  
1487      # Piper outputs WAV. Caller handles downstream MP3/Opus conversion.
1488      wav_path = output_path
1489      if not output_path.endswith(".wav"):
1490          wav_path = output_path.rsplit(".", 1)[0] + ".wav"
1491  
1492      with wave.open(wav_path, "wb") as wav_file:
1493          if syn_config is not None:
1494              voice.synthesize_wav(text, wav_file, syn_config=syn_config)
1495          else:
1496              voice.synthesize_wav(text, wav_file)
1497  
1498      # Convert to desired format if caller requested mp3/ogg
1499      if wav_path != output_path:
1500          ffmpeg = shutil.which("ffmpeg")
1501          if ffmpeg:
1502              conv_cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path]
1503              subprocess.run(conv_cmd, check=True, timeout=30)
1504              try:
1505                  os.remove(wav_path)
1506              except OSError:
1507                  pass
1508          else:
1509              # No ffmpeg — keep WAV and return that path
1510              os.rename(wav_path, output_path)
1511  
1512      return output_path
1513  
1514  
1515  # ===========================================================================
1516  # Provider: KittenTTS (local, lightweight)
1517  # ===========================================================================
1518  
1519  # Module-level cache for KittenTTS model instance
1520  _kittentts_model_cache: Dict[str, Any] = {}
1521  
1522  
1523  def _generate_kittentts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
1524      """Generate speech using KittenTTS local ONNX model.
1525  
1526      KittenTTS is a lightweight TTS engine (25-80MB models) that runs
1527      entirely on CPU without requiring a GPU or API key.
1528  
1529      Args:
1530          text: Text to convert to speech.
1531          output_path: Where to save the audio file.
1532          tts_config: TTS config dict.
1533  
1534      Returns:
1535          Path to the saved audio file.
1536      """
1537      KittenTTS = _import_kittentts()
1538      kt_config = tts_config.get("kittentts", {})
1539      model_name = kt_config.get("model", DEFAULT_KITTENTTS_MODEL)
1540      voice = kt_config.get("voice", DEFAULT_KITTENTTS_VOICE)
1541      speed = kt_config.get("speed", 1.0)
1542      clean_text = kt_config.get("clean_text", True)
1543  
1544      # Use cached model instance if available
1545      global _kittentts_model_cache
1546      if model_name not in _kittentts_model_cache:
1547          logger.info("[KittenTTS] Loading model: %s", model_name)
1548          _kittentts_model_cache[model_name] = KittenTTS(model_name)
1549          logger.info("[KittenTTS] Model loaded successfully")
1550  
1551      model = _kittentts_model_cache[model_name]
1552  
1553      # Generate audio (returns numpy array at 24kHz)
1554      audio = model.generate(text, voice=voice, speed=speed, clean_text=clean_text)
1555  
1556      # Save as WAV
1557      import soundfile as sf
1558      wav_path = output_path
1559      if not output_path.endswith(".wav"):
1560          wav_path = output_path.rsplit(".", 1)[0] + ".wav"
1561  
1562      sf.write(wav_path, audio, 24000)
1563  
1564      # Convert to desired format if needed
1565      if wav_path != output_path:
1566          ffmpeg = shutil.which("ffmpeg")
1567          if ffmpeg:
1568              conv_cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path]
1569              subprocess.run(conv_cmd, check=True, timeout=30)
1570              os.remove(wav_path)
1571          else:
1572              # No ffmpeg — rename the WAV to the expected path
1573              os.rename(wav_path, output_path)
1574  
1575      return output_path
1576  
1577  
1578  # ===========================================================================
1579  # Main tool function
1580  # ===========================================================================
1581  def text_to_speech_tool(
1582      text: str,
1583      output_path: Optional[str] = None,
1584  ) -> str:
1585      """
1586      Convert text to speech audio.
1587  
1588      Reads provider/voice config from ~/.hermes/config.yaml (tts: section).
1589      The model sends text; the user configures voice and provider.
1590  
1591      On messaging platforms, the returned MEDIA:<path> tag is intercepted
1592      by the send pipeline and delivered as a native voice message.
1593      In CLI mode, the file is saved to ~/voice-memos/.
1594  
1595      Args:
1596          text: The text to convert to speech.
1597          output_path: Optional custom save path. Defaults to ~/voice-memos/<timestamp>.mp3
1598  
1599      Returns:
1600          str: JSON result with success, file_path, and optionally MEDIA tag.
1601      """
1602      if not text or not text.strip():
1603          return tool_error("Text is required", success=False)
1604  
1605      tts_config = _load_tts_config()
1606      provider = _get_provider(tts_config)
1607  
1608      # User-declared command provider (type: command under tts.providers.<name>)
1609      # resolves BEFORE the built-in dispatch. Built-in names short-circuit here
1610      # so a user's ``tts.providers.openai.command`` can't override the real
1611      # OpenAI handler.
1612      command_provider_config = _resolve_command_provider_config(provider, tts_config)
1613  
1614      # Truncate very long text with a warning. The cap is per-provider
1615      # (OpenAI 4096, xAI 15k, MiniMax 10k, ElevenLabs model-aware, etc.).
1616      max_len = _resolve_max_text_length(provider, tts_config)
1617      if len(text) > max_len:
1618          logger.warning(
1619              "TTS text too long for provider %s (%d chars), truncating to %d",
1620              provider, len(text), max_len,
1621          )
1622          text = text[:max_len]
1623  
1624      # Detect platform from gateway env var to choose the best output format.
1625      # Telegram voice bubbles require Opus (.ogg); OpenAI and ElevenLabs can
1626      # produce Opus natively (no ffmpeg needed).  Edge TTS always outputs MP3
1627      # and needs ffmpeg for conversion.
1628      from gateway.session_context import get_session_env
1629      platform = get_session_env("HERMES_SESSION_PLATFORM", "").lower()
1630      want_opus = (platform == "telegram")
1631  
1632      # Determine output path
1633      if output_path:
1634          file_path = Path(output_path).expanduser()
1635          if command_provider_config is not None:
1636              # Respect caller-supplied path but align the extension with the
1637              # provider's configured output_format so the command writes to a
1638              # path the caller actually expects.
1639              file_path = _configured_command_tts_output_path(
1640                  file_path, command_provider_config
1641              )
1642      else:
1643          timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
1644          out_dir = Path(DEFAULT_OUTPUT_DIR)
1645          out_dir.mkdir(parents=True, exist_ok=True)
1646          if command_provider_config is not None:
1647              fmt = _get_command_tts_output_format(command_provider_config)
1648              file_path = out_dir / f"tts_{timestamp}.{fmt}"
1649          # Use .ogg for Telegram with providers that support native Opus output,
1650          # otherwise fall back to .mp3 (Edge TTS will attempt ffmpeg conversion later).
1651          elif want_opus and provider in ("openai", "elevenlabs", "mistral", "gemini"):
1652              file_path = out_dir / f"tts_{timestamp}.ogg"
1653          else:
1654              file_path = out_dir / f"tts_{timestamp}.mp3"
1655  
1656      # Ensure parent directory exists
1657      requested_file_path = file_path
1658  
1659      # Some providers cannot natively write OGG/Opus. If the caller explicitly
1660      # requested .ogg for one of those providers, generate into a truthful
1661      # sibling path first (.mp3 or .wav) and let the post-processing step
1662      # convert it when possible.
1663      if command_provider_config is None:
1664          file_path = _rewrite_requested_output_path_for_truthful_generation(provider, file_path)
1665  
1666      file_path.parent.mkdir(parents=True, exist_ok=True)
1667      file_str = str(file_path)
1668  
1669      try:
1670          # Generate audio with the configured provider
1671          if command_provider_config is not None:
1672              logger.info(
1673                  "Generating speech with command TTS provider '%s'...", provider,
1674              )
1675              file_str = _generate_command_tts(
1676                  text, file_str, provider, command_provider_config, tts_config,
1677              )
1678  
1679          elif provider == "elevenlabs":
1680              try:
1681                  _import_elevenlabs()
1682              except ImportError:
1683                  return json.dumps({
1684                      "success": False,
1685                      "error": "ElevenLabs provider selected but 'elevenlabs' package not installed. Run: pip install elevenlabs"
1686                  }, ensure_ascii=False)
1687              logger.info("Generating speech with ElevenLabs...")
1688              _generate_elevenlabs(text, file_str, tts_config)
1689  
1690          elif provider == "openai":
1691              try:
1692                  _import_openai_client()
1693              except ImportError:
1694                  return json.dumps({
1695                      "success": False,
1696                      "error": "OpenAI provider selected but 'openai' package not installed."
1697                  }, ensure_ascii=False)
1698              logger.info("Generating speech with OpenAI TTS...")
1699              _generate_openai_tts(text, file_str, tts_config)
1700  
1701          elif provider == "minimax":
1702              logger.info("Generating speech with MiniMax TTS...")
1703              _generate_minimax_tts(text, file_str, tts_config)
1704  
1705          elif provider == "xai":
1706              logger.info("Generating speech with xAI TTS...")
1707              _generate_xai_tts(text, file_str, tts_config)
1708  
1709          elif provider == "mistral":
1710              try:
1711                  _import_mistral_client()
1712              except ImportError:
1713                  return json.dumps({
1714                      "success": False,
1715                      "error": "Mistral provider selected but 'mistralai' package not installed. "
1716                               "Run: pip install 'hermes-agent[mistral]'"
1717                  }, ensure_ascii=False)
1718              logger.info("Generating speech with Mistral Voxtral TTS...")
1719              _generate_mistral_tts(text, file_str, tts_config)
1720  
1721          elif provider == "gemini":
1722              logger.info("Generating speech with Google Gemini TTS...")
1723              _generate_gemini_tts(text, file_str, tts_config)
1724  
1725          elif provider == "neutts":
1726              if not _check_neutts_available():
1727                  return json.dumps({
1728                      "success": False,
1729                      "error": "NeuTTS provider selected but neutts is not installed. "
1730                               "Run hermes setup and choose NeuTTS, or install espeak-ng and run python -m pip install -U neutts[all]."
1731                  }, ensure_ascii=False)
1732              logger.info("Generating speech with NeuTTS (local)...")
1733              _generate_neutts(text, file_str, tts_config)
1734  
1735          elif provider == "kittentts":
1736              try:
1737                  _import_kittentts()
1738              except ImportError:
1739                  return json.dumps({
1740                      "success": False,
1741                      "error": "KittenTTS provider selected but 'kittentts' package not installed. "
1742                               "Run 'hermes setup tts' and choose KittenTTS, or install manually: "
1743                               "pip install https://github.com/KittenML/KittenTTS/releases/download/0.8.1/kittentts-0.8.1-py3-none-any.whl"
1744                  }, ensure_ascii=False)
1745              logger.info("Generating speech with KittenTTS (local, ~25MB)...")
1746              _generate_kittentts(text, file_str, tts_config)
1747  
1748          elif provider == "piper":
1749              try:
1750                  _import_piper()
1751              except ImportError:
1752                  return json.dumps({
1753                      "success": False,
1754                      "error": "Piper provider selected but 'piper-tts' package not installed. "
1755                               "Run 'hermes tools' and select Piper under TTS, or install manually: "
1756                               "pip install piper-tts",
1757                  }, ensure_ascii=False)
1758              logger.info("Generating speech with Piper (local)...")
1759              _generate_piper_tts(text, file_str, tts_config)
1760  
1761          else:
1762              # Default: Edge TTS (free), with NeuTTS as local fallback
1763              edge_available = True
1764              try:
1765                  _import_edge_tts()
1766              except ImportError:
1767                  edge_available = False
1768  
1769              if edge_available:
1770                  logger.info("Generating speech with Edge TTS...")
1771                  try:
1772                      import concurrent.futures
1773                      with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
1774                          pool.submit(
1775                              lambda: asyncio.run(_generate_edge_tts(text, file_str, tts_config))
1776                          ).result(timeout=60)
1777                  except RuntimeError:
1778                      asyncio.run(_generate_edge_tts(text, file_str, tts_config))
1779              elif _check_neutts_available():
1780                  logger.info("Edge TTS not available, falling back to NeuTTS (local)...")
1781                  provider = "neutts"
1782                  if command_provider_config is None:
1783                      file_path = _rewrite_requested_output_path_for_truthful_generation(provider, file_path)
1784                      file_path.parent.mkdir(parents=True, exist_ok=True)
1785                      file_str = str(file_path)
1786                  _generate_neutts(text, file_str, tts_config)
1787              else:
1788                  return json.dumps({
1789                      "success": False,
1790                      "error": "No TTS provider available. Install edge-tts (pip install edge-tts) "
1791                               "or set up NeuTTS for local synthesis."
1792                  }, ensure_ascii=False)
1793  
1794          # Check the file was actually created
1795          if not os.path.exists(file_str) or os.path.getsize(file_str) == 0:
1796              return json.dumps({
1797                  "success": False,
1798                  "error": f"TTS generation produced no output (provider: {provider})"
1799              }, ensure_ascii=False)
1800  
1801          # Try Opus conversion for Telegram compatibility
1802          # Edge TTS outputs MP3, NeuTTS/KittenTTS output WAV — all need ffmpeg conversion
1803          voice_compatible = False
1804          if command_provider_config is not None:
1805              # Command providers are documents by default. Voice-bubble
1806              # delivery only kicks in when the user explicitly opts in
1807              # via ``voice_compatible: true`` in their provider config.
1808              if _is_command_tts_voice_compatible(command_provider_config):
1809                  if not file_str.endswith(".ogg"):
1810                      opus_path = _convert_to_opus(file_str)
1811                      if opus_path:
1812                          file_str = opus_path
1813                  voice_compatible = file_str.endswith(".ogg")
1814          elif provider in ("edge", "neutts", "minimax", "xai", "kittentts", "piper") and not file_str.endswith(".ogg"):
1815              conversion_input = file_str
1816              opus_path = _convert_to_opus(conversion_input)
1817              if opus_path:
1818                  file_str = opus_path
1819                  voice_compatible = True
1820                  if Path(conversion_input) != Path(file_str) and Path(conversion_input) != requested_file_path:
1821                      try:
1822                          os.remove(conversion_input)
1823                      except OSError:
1824                          pass
1825          elif provider in ("elevenlabs", "openai", "mistral", "gemini"):
1826              voice_compatible = file_str.endswith(".ogg")
1827  
1828          file_size = os.path.getsize(file_str)
1829          logger.info("TTS audio saved: %s (%s bytes, provider: %s)", file_str, f"{file_size:,}", provider)
1830  
1831          # Build response with MEDIA tag for platform delivery
1832          media_tag = f"MEDIA:{file_str}"
1833          if voice_compatible:
1834              media_tag = f"[[audio_as_voice]]\n{media_tag}"
1835  
1836          return json.dumps({
1837              "success": True,
1838              "file_path": file_str,
1839              "media_tag": media_tag,
1840              "provider": provider,
1841              "voice_compatible": voice_compatible,
1842          }, ensure_ascii=False)
1843  
1844      except ValueError as e:
1845          # Configuration errors (missing API keys, etc.)
1846          error_msg = f"TTS configuration error ({provider}): {e}"
1847          logger.error("%s", error_msg)
1848          return tool_error(error_msg, success=False)
1849      except FileNotFoundError as e:
1850          # Missing dependencies or files
1851          error_msg = f"TTS dependency missing ({provider}): {e}"
1852          logger.error("%s", error_msg, exc_info=True)
1853          return tool_error(error_msg, success=False)
1854      except Exception as e:
1855          # Unexpected errors
1856          error_msg = f"TTS generation failed ({provider}): {e}"
1857          logger.error("%s", error_msg, exc_info=True)
1858          return tool_error(error_msg, success=False)
1859  
1860  
1861  # ===========================================================================
1862  # Requirements check
1863  # ===========================================================================
1864  def check_tts_requirements() -> bool:
1865      """
1866      Check if at least one TTS provider is available.
1867  
1868      Edge TTS needs no API key and is the default, so if the package
1869      is installed, TTS is available. A user-declared command provider
1870      also satisfies the requirement.
1871  
1872      Returns:
1873          bool: True if at least one provider can work.
1874      """
1875      # Any configured command provider counts as available.
1876      if _has_any_command_tts_provider():
1877          return True
1878      try:
1879          _import_edge_tts()
1880          return True
1881      except ImportError:
1882          pass
1883      try:
1884          _import_elevenlabs()
1885          if get_env_value("ELEVENLABS_API_KEY"):
1886              return True
1887      except ImportError:
1888          pass
1889      try:
1890          _import_openai_client()
1891          if _has_openai_audio_backend():
1892              return True
1893      except ImportError:
1894          pass
1895      if get_env_value("MINIMAX_API_KEY"):
1896          return True
1897      if get_env_value("XAI_API_KEY"):
1898          return True
1899      if get_env_value("GEMINI_API_KEY") or get_env_value("GOOGLE_API_KEY"):
1900          return True
1901      try:
1902          _import_mistral_client()
1903          if get_env_value("MISTRAL_API_KEY"):
1904              return True
1905      except ImportError:
1906          pass
1907      if _check_neutts_available():
1908          return True
1909      if _check_kittentts_available():
1910          return True
1911      if _check_piper_available():
1912          return True
1913      return False
1914  
1915  
1916  def _resolve_openai_audio_client_config() -> tuple[str, str]:
1917      """Return direct OpenAI audio config or a managed gateway fallback.
1918  
1919      When ``tts.use_gateway`` is set in config, the Tool Gateway is preferred
1920      even if direct OpenAI credentials are present.
1921      """
1922      direct_api_key = resolve_openai_audio_api_key()
1923      if direct_api_key and not prefers_gateway("tts"):
1924          return direct_api_key, DEFAULT_OPENAI_BASE_URL
1925  
1926      managed_gateway = resolve_managed_tool_gateway("openai-audio")
1927      if managed_gateway is None:
1928          message = "Neither VOICE_TOOLS_OPENAI_KEY nor OPENAI_API_KEY is set"
1929          if managed_nous_tools_enabled():
1930              message += ", and the managed OpenAI audio gateway is unavailable"
1931          raise ValueError(message)
1932  
1933      return managed_gateway.nous_user_token, urljoin(
1934          f"{managed_gateway.gateway_origin.rstrip('/')}/", "v1"
1935      )
1936  
1937  
1938  def _has_openai_audio_backend() -> bool:
1939      """Return True when OpenAI audio can use direct credentials or the managed gateway."""
1940      return bool(resolve_openai_audio_api_key() or resolve_managed_tool_gateway("openai-audio"))
1941  
1942  
1943  # ===========================================================================
1944  # Streaming TTS: sentence-by-sentence pipeline for ElevenLabs
1945  # ===========================================================================
1946  # Sentence boundary pattern: punctuation followed by space or newline
1947  _SENTENCE_BOUNDARY_RE = re.compile(r'(?<=[.!?])(?:\s|\n)|(?:\n\n)')
1948  
1949  # Markdown stripping patterns (same as cli.py _voice_speak_response)
1950  _MD_CODE_BLOCK = re.compile(r'```[\s\S]*?```')
1951  _MD_LINK = re.compile(r'\[([^\]]+)\]\([^)]+\)')
1952  _MD_URL = re.compile(r'https?://\S+')
1953  _MD_BOLD = re.compile(r'\*\*(.+?)\*\*')
1954  _MD_ITALIC = re.compile(r'\*(.+?)\*')
1955  _MD_INLINE_CODE = re.compile(r'`(.+?)`')
1956  _MD_HEADER = re.compile(r'^#+\s*', flags=re.MULTILINE)
1957  _MD_LIST_ITEM = re.compile(r'^\s*[-*]\s+', flags=re.MULTILINE)
1958  _MD_HR = re.compile(r'---+')
1959  _MD_EXCESS_NL = re.compile(r'\n{3,}')
1960  
1961  
1962  def _strip_markdown_for_tts(text: str) -> str:
1963      """Remove markdown formatting that shouldn't be spoken aloud."""
1964      text = _MD_CODE_BLOCK.sub(' ', text)
1965      text = _MD_LINK.sub(r'\1', text)
1966      text = _MD_URL.sub('', text)
1967      text = _MD_BOLD.sub(r'\1', text)
1968      text = _MD_ITALIC.sub(r'\1', text)
1969      text = _MD_INLINE_CODE.sub(r'\1', text)
1970      text = _MD_HEADER.sub('', text)
1971      text = _MD_LIST_ITEM.sub('', text)
1972      text = _MD_HR.sub('', text)
1973      text = _MD_EXCESS_NL.sub('\n\n', text)
1974      return text.strip()
1975  
1976  
1977  def stream_tts_to_speaker(
1978      text_queue: queue.Queue,
1979      stop_event: threading.Event,
1980      tts_done_event: threading.Event,
1981      display_callback: Optional[Callable[[str], None]] = None,
1982  ):
1983      """Consume text deltas from *text_queue*, buffer them into sentences,
1984      and stream each sentence through ElevenLabs TTS to the speaker in
1985      real-time.
1986  
1987      Protocol:
1988          * The producer puts ``str`` deltas onto *text_queue*.
1989          * A ``None`` sentinel signals end-of-text (flush remaining buffer).
1990          * *stop_event* can be set to abort early (e.g. user interrupt).
1991          * *tts_done_event* is **set** in the ``finally`` block so callers
1992            waiting on it (continuous voice mode) know playback is finished.
1993      """
1994      tts_done_event.clear()
1995  
1996      try:
1997          # --- TTS client setup (optional -- display_callback works without it) ---
1998          client = None
1999          output_stream = None
2000          voice_id = DEFAULT_ELEVENLABS_VOICE_ID
2001          model_id = DEFAULT_ELEVENLABS_STREAMING_MODEL_ID
2002  
2003          tts_config = _load_tts_config()
2004          el_config = tts_config.get("elevenlabs", {})
2005          voice_id = el_config.get("voice_id", voice_id)
2006          model_id = el_config.get("streaming_model_id",
2007                                   el_config.get("model_id", model_id))
2008          # Per-sentence cap for the streaming path. Look up the cap against
2009          # the *streaming* model_id (defaults to eleven_flash_v2_5 = 40k chars),
2010          # not the sync model_id. A user override
2011          # (tts.elevenlabs.max_text_length) still wins.
2012          stream_max_len = _resolve_max_text_length(
2013              "elevenlabs",
2014              {**tts_config, "elevenlabs": {**el_config, "model_id": model_id}},
2015          )
2016  
2017          api_key = (get_env_value("ELEVENLABS_API_KEY") or "")
2018          if not api_key:
2019              logger.warning("ELEVENLABS_API_KEY not set; streaming TTS audio disabled")
2020          else:
2021              try:
2022                  ElevenLabs = _import_elevenlabs()
2023                  client = ElevenLabs(api_key=api_key)
2024              except ImportError:
2025                  logger.warning("elevenlabs package not installed; streaming TTS disabled")
2026  
2027              # Open a single sounddevice output stream for the lifetime of
2028              # this function.  ElevenLabs pcm_24000 produces signed 16-bit
2029              # little-endian mono PCM at 24 kHz.
2030              if client is not None:
2031                  try:
2032                      sd = _import_sounddevice()
2033                      output_stream = sd.OutputStream(
2034                          samplerate=24000, channels=1, dtype="int16",
2035                      )
2036                      output_stream.start()
2037                  except (ImportError, OSError) as exc:
2038                      logger.debug("sounddevice not available: %s", exc)
2039                      output_stream = None
2040                  except Exception as exc:
2041                      logger.warning("sounddevice OutputStream failed: %s", exc)
2042                      output_stream = None
2043  
2044          sentence_buf = ""
2045          min_sentence_len = 20
2046          long_flush_len = 100
2047          queue_timeout = 0.5
2048          _spoken_sentences: list[str] = []  # track spoken sentences to skip duplicates
2049          # Regex to strip complete <think>...</think> blocks from buffer
2050          _think_block_re = re.compile(r'<think[\s>].*?</think>', flags=re.DOTALL)
2051  
2052          def _speak_sentence(sentence: str):
2053              """Display sentence and optionally generate + play audio."""
2054              if stop_event.is_set():
2055                  return
2056              cleaned = _strip_markdown_for_tts(sentence).strip()
2057              if not cleaned:
2058                  return
2059              # Skip duplicate/near-duplicate sentences (LLM repetition)
2060              cleaned_lower = cleaned.lower().rstrip(".!,")
2061              for prev in _spoken_sentences:
2062                  if prev.lower().rstrip(".!,") == cleaned_lower:
2063                      return
2064              _spoken_sentences.append(cleaned)
2065              # Display raw sentence on screen before TTS processing
2066              if display_callback is not None:
2067                  display_callback(sentence)
2068              # Skip audio generation if no TTS client available
2069              if client is None:
2070                  return
2071              # Truncate very long sentences (ElevenLabs streaming path)
2072              if len(cleaned) > stream_max_len:
2073                  cleaned = cleaned[:stream_max_len]
2074              try:
2075                  audio_iter = client.text_to_speech.convert(
2076                      text=cleaned,
2077                      voice_id=voice_id,
2078                      model_id=model_id,
2079                      output_format="pcm_24000",
2080                  )
2081                  if output_stream is not None:
2082                      for chunk in audio_iter:
2083                          if stop_event.is_set():
2084                              break
2085                          import numpy as _np
2086                          audio_array = _np.frombuffer(chunk, dtype=_np.int16)
2087                          output_stream.write(audio_array.reshape(-1, 1))
2088                  else:
2089                      # Fallback: write chunks to temp file and play via system player
2090                      _play_via_tempfile(audio_iter, stop_event)
2091              except Exception as exc:
2092                  logger.warning("Streaming TTS sentence failed: %s", exc)
2093  
2094          def _play_via_tempfile(audio_iter, stop_evt):
2095              """Write PCM chunks to a temp WAV file and play it."""
2096              tmp_path = None
2097              try:
2098                  import wave
2099                  tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
2100                  tmp_path = tmp.name
2101                  with wave.open(tmp, "wb") as wf:
2102                      wf.setnchannels(1)
2103                      wf.setsampwidth(2)  # 16-bit
2104                      wf.setframerate(24000)
2105                      for chunk in audio_iter:
2106                          if stop_evt.is_set():
2107                              break
2108                          wf.writeframes(chunk)
2109                  from tools.voice_mode import play_audio_file
2110                  play_audio_file(tmp_path)
2111              except Exception as exc:
2112                  logger.warning("Temp-file TTS fallback failed: %s", exc)
2113              finally:
2114                  if tmp_path:
2115                      try:
2116                          os.unlink(tmp_path)
2117                      except OSError:
2118                          pass
2119  
2120          while not stop_event.is_set():
2121              # Read next delta from queue
2122              try:
2123                  delta = text_queue.get(timeout=queue_timeout)
2124              except queue.Empty:
2125                  # Timeout: if we have accumulated a long buffer, flush it
2126                  if len(sentence_buf) > long_flush_len:
2127                      _speak_sentence(sentence_buf)
2128                      sentence_buf = ""
2129                  continue
2130  
2131              if delta is None:
2132                  # End-of-text sentinel: strip any remaining think blocks, flush
2133                  sentence_buf = _think_block_re.sub('', sentence_buf)
2134                  if sentence_buf.strip():
2135                      _speak_sentence(sentence_buf)
2136                  break
2137  
2138              sentence_buf += delta
2139  
2140              # --- Think block filtering ---
2141              # Strip complete <think>...</think> blocks from buffer.
2142              # Works correctly even when tags span multiple deltas.
2143              sentence_buf = _think_block_re.sub('', sentence_buf)
2144  
2145              # If an incomplete <think tag is at the end, wait for more data
2146              # before extracting sentences (the closing tag may arrive next).
2147              if '<think' in sentence_buf and '</think>' not in sentence_buf:
2148                  continue
2149  
2150              # Check for sentence boundaries
2151              while True:
2152                  m = _SENTENCE_BOUNDARY_RE.search(sentence_buf)
2153                  if m is None:
2154                      break
2155                  end_pos = m.end()
2156                  sentence = sentence_buf[:end_pos]
2157                  sentence_buf = sentence_buf[end_pos:]
2158                  # Merge short fragments into the next sentence
2159                  if len(sentence.strip()) < min_sentence_len:
2160                      sentence_buf = sentence + sentence_buf
2161                      break
2162                  _speak_sentence(sentence)
2163  
2164          # Drain any remaining items from the queue
2165          while True:
2166              try:
2167                  text_queue.get_nowait()
2168              except queue.Empty:
2169                  break
2170  
2171          # output_stream is closed in the finally block below
2172  
2173      except Exception as exc:
2174          logger.warning("Streaming TTS pipeline error: %s", exc)
2175      finally:
2176          # Always close the audio output stream to avoid locking the device
2177          if output_stream is not None:
2178              try:
2179                  output_stream.stop()
2180                  output_stream.close()
2181              except Exception:
2182                  pass
2183          tts_done_event.set()
2184  
2185  
2186  # ===========================================================================
2187  # Main -- quick diagnostics
2188  # ===========================================================================
2189  if __name__ == "__main__":
2190      print("🔊 Text-to-Speech Tool Module")
2191      print("=" * 50)
2192  
2193      def _check(importer, label):
2194          try:
2195              importer()
2196              return True
2197          except ImportError:
2198              return False
2199  
2200      print("\nProvider availability:")
2201      print(f"  Edge TTS:   {'installed' if _check(_import_edge_tts, 'edge') else 'not installed (pip install edge-tts)'}")
2202      print(f"  ElevenLabs: {'installed' if _check(_import_elevenlabs, 'el') else 'not installed (pip install elevenlabs)'}")
2203      print(f"    API Key:  {'set' if get_env_value('ELEVENLABS_API_KEY') else 'not set'}")
2204      print(f"  OpenAI:     {'installed' if _check(_import_openai_client, 'oai') else 'not installed'}")
2205      print(
2206          "    API Key:  "
2207          f"{'set' if resolve_openai_audio_api_key() else 'not set (VOICE_TOOLS_OPENAI_KEY or OPENAI_API_KEY)'}"
2208      )
2209      print(f"  MiniMax:    {'API key set' if get_env_value('MINIMAX_API_KEY') else 'not set (MINIMAX_API_KEY)'}")
2210      print(f"  Piper:      {'installed' if _check_piper_available() else 'not installed (pip install piper-tts)'}")
2211      print(f"  ffmpeg:     {'✅ found' if _has_ffmpeg() else '❌ not found (needed for Telegram Opus)'}")
2212      print(f"\n  Output dir: {DEFAULT_OUTPUT_DIR}")
2213  
2214      config = _load_tts_config()
2215      provider = _get_provider(config)
2216      print(f"  Configured provider: {provider}")
2217  
2218  
2219  # ---------------------------------------------------------------------------
2220  # Registry
2221  # ---------------------------------------------------------------------------
2222  from tools.registry import registry, tool_error
2223  
2224  TTS_SCHEMA = {
2225      "name": "text_to_speech",
2226      "description": "Convert text to speech audio. Returns a MEDIA: path that the platform delivers as native audio. Compatible providers render as a voice bubble on Telegram; otherwise audio is sent as a regular attachment. In CLI mode, saves to ~/voice-memos/. Voice and provider are user-configured (built-in providers like edge/openai or custom command providers under tts.providers.<name>), not model-selected.",
2227      "parameters": {
2228          "type": "object",
2229          "properties": {
2230              "text": {
2231                  "type": "string",
2232                  "description": "The text to convert to speech. Provider-specific character caps apply and are enforced automatically (OpenAI 4096, xAI 15000, MiniMax 10000, ElevenLabs 5k-40k depending on model); over-long input is truncated."
2233              },
2234              "output_path": {
2235                  "type": "string",
2236                  "description": f"Optional custom file path to save the audio. Defaults to {display_hermes_home()}/audio_cache/<timestamp>.mp3"
2237              }
2238          },
2239          "required": ["text"]
2240      }
2241  }
2242  
2243  registry.register(
2244      name="text_to_speech",
2245      toolset="tts",
2246      schema=TTS_SCHEMA,
2247      handler=lambda args, **kw: text_to_speech_tool(
2248          text=args.get("text", ""),
2249          output_path=args.get("output_path")),
2250      check_fn=check_tts_requirements,
2251      emoji="🔊",
2252  )