tts_tool.py
1 #!/usr/bin/env python3 2 """ 3 Text-to-Speech Tool Module 4 5 Built-in TTS providers: 6 - Edge TTS (default, free, no API key): Microsoft Edge neural voices 7 - ElevenLabs (premium): High-quality voices, needs ELEVENLABS_API_KEY 8 - OpenAI TTS: Good quality, needs OPENAI_API_KEY 9 - MiniMax TTS: High-quality with voice cloning, needs MINIMAX_API_KEY 10 - Mistral (Voxtral TTS): Multilingual, native Opus, needs MISTRAL_API_KEY 11 - Google Gemini TTS: Controllable, 30 prebuilt voices, needs GEMINI_API_KEY 12 - xAI TTS: Grok voices, needs XAI_API_KEY 13 - NeuTTS (local, free, no API key): On-device TTS via neutts 14 - KittenTTS (local, free, no API key): On-device 25MB model 15 - Piper (local, free, no API key): OHF-Voice/piper1-gpl neural VITS, 44 languages 16 17 Custom command providers: 18 - Users can declare any number of named providers with ``type: command`` 19 under ``tts.providers.<name>`` in ``~/.hermes/config.yaml``. Hermes 20 writes the input text to a temp file and runs the configured shell 21 command, which must produce the audio file at the expected path. 22 See the Local Command section of ``website/docs/user-guide/features/tts.md``. 23 24 Output formats: 25 - Opus (.ogg) for Telegram voice bubbles (requires ffmpeg for Edge TTS) 26 - MP3 (.mp3) for everything else (CLI, Discord, WhatsApp) 27 28 Configuration is loaded from ~/.hermes/config.yaml under the 'tts:' key. 29 The user chooses the provider and voice; the model just sends text. 30 31 Usage: 32 from tools.tts_tool import text_to_speech_tool, check_tts_requirements 33 34 result = text_to_speech_tool(text="Hello world") 35 """ 36 37 import asyncio 38 import base64 39 import datetime 40 import json 41 import logging 42 import os 43 import queue 44 import re 45 import shlex 46 import shutil 47 import signal 48 import subprocess 49 import tempfile 50 import threading 51 import uuid 52 from pathlib import Path 53 from typing import Callable, Dict, Any, Optional 54 from urllib.parse import urljoin 55 56 from hermes_constants import display_hermes_home 57 58 logger = logging.getLogger(__name__) 59 def get_env_value(name, default=None): 60 """Read env values through the live config module. 61 62 Tests may monkeypatch and later restore ``hermes_cli.config.get_env_value`` 63 before this module is imported. Resolve the helper at call time so TTS does 64 not keep a stale imported function for the rest of the test process. 65 """ 66 try: 67 from hermes_cli.config import get_env_value as _get_env_value 68 except ImportError: 69 return os.getenv(name, default) 70 value = _get_env_value(name) 71 return default if value is None else value 72 from tools.managed_tool_gateway import resolve_managed_tool_gateway 73 from tools.tool_backend_helpers import managed_nous_tools_enabled, prefers_gateway, resolve_openai_audio_api_key 74 from tools.xai_http import hermes_xai_user_agent 75 76 # --------------------------------------------------------------------------- 77 # Lazy imports -- providers are imported only when actually used to avoid 78 # crashing in headless environments (SSH, Docker, WSL, no PortAudio). 79 # --------------------------------------------------------------------------- 80 81 def _import_edge_tts(): 82 """Lazy import edge_tts. Returns the module or raises ImportError.""" 83 import edge_tts 84 return edge_tts 85 86 def _import_elevenlabs(): 87 """Lazy import ElevenLabs client. Returns the class or raises ImportError.""" 88 from elevenlabs.client import ElevenLabs 89 return ElevenLabs 90 91 def _import_openai_client(): 92 """Lazy import OpenAI client. Returns the class or raises ImportError.""" 93 from openai import OpenAI as OpenAIClient 94 return OpenAIClient 95 96 def _import_mistral_client(): 97 """Lazy import Mistral client. Returns the class or raises ImportError.""" 98 from mistralai.client import Mistral 99 return Mistral 100 101 def _import_sounddevice(): 102 """Lazy import sounddevice. Returns the module or raises ImportError/OSError.""" 103 import sounddevice as sd 104 return sd 105 106 107 def _import_kittentts(): 108 """Lazy import KittenTTS. Returns the class or raises ImportError.""" 109 from kittentts import KittenTTS 110 return KittenTTS 111 112 113 def _import_piper(): 114 """Lazy import Piper. Returns the PiperVoice class or raises ImportError. 115 116 Piper is an optional, fully-local neural TTS engine (Home Assistant / 117 Open Home Foundation). ``pip install piper-tts`` provides cross-platform 118 wheels (Linux / macOS / Windows, x86_64 + ARM64) with embedded espeak-ng. 119 Voice models (.onnx + .onnx.json) are downloaded on first use. 120 """ 121 from piper import PiperVoice 122 return PiperVoice 123 124 125 # =========================================================================== 126 # Defaults 127 # =========================================================================== 128 DEFAULT_PROVIDER = "edge" 129 DEFAULT_EDGE_VOICE = "en-US-AriaNeural" 130 DEFAULT_ELEVENLABS_VOICE_ID = "pNInz6obpgDQGcFmaJgB" # Adam 131 DEFAULT_ELEVENLABS_MODEL_ID = "eleven_multilingual_v2" 132 DEFAULT_ELEVENLABS_STREAMING_MODEL_ID = "eleven_flash_v2_5" 133 DEFAULT_OPENAI_MODEL = "gpt-4o-mini-tts" 134 DEFAULT_KITTENTTS_MODEL = "KittenML/kitten-tts-nano-0.8-int8" # 25MB 135 DEFAULT_KITTENTTS_VOICE = "Jasper" 136 DEFAULT_PIPER_VOICE = "en_US-lessac-medium" # balanced size/quality 137 DEFAULT_OPENAI_VOICE = "alloy" 138 DEFAULT_OPENAI_BASE_URL = "https://api.openai.com/v1" 139 DEFAULT_MINIMAX_MODEL = "speech-2.8-hd" 140 DEFAULT_MINIMAX_VOICE_ID = "English_Graceful_Lady" 141 DEFAULT_MINIMAX_BASE_URL = "https://api.minimax.io/v1/t2a_v2" 142 DEFAULT_MISTRAL_TTS_MODEL = "voxtral-mini-tts-2603" 143 DEFAULT_MISTRAL_TTS_VOICE_ID = "c69964a6-ab8b-4f8a-9465-ec0925096ec8" # Paul - Neutral 144 DEFAULT_XAI_VOICE_ID = "eve" 145 DEFAULT_XAI_LANGUAGE = "en" 146 DEFAULT_XAI_SAMPLE_RATE = 24000 147 DEFAULT_XAI_BIT_RATE = 128000 148 DEFAULT_XAI_BASE_URL = "https://api.x.ai/v1" 149 DEFAULT_GEMINI_TTS_MODEL = "gemini-2.5-flash-preview-tts" 150 DEFAULT_GEMINI_TTS_VOICE = "Kore" 151 DEFAULT_GEMINI_TTS_BASE_URL = "https://generativelanguage.googleapis.com/v1beta" 152 # PCM output specs for Gemini TTS (fixed by the API) 153 GEMINI_TTS_SAMPLE_RATE = 24000 154 GEMINI_TTS_CHANNELS = 1 155 GEMINI_TTS_SAMPLE_WIDTH = 2 # 16-bit PCM (L16) 156 157 158 def _rewrite_requested_output_path_for_truthful_generation( 159 provider: str, 160 file_path: Path, 161 ) -> Path: 162 """Return a safe generation path that won't mislabel the container. 163 164 Some providers emit a fixed native container (for example MP3 or WAV). 165 Writing those bytes straight into a path with a different suffix creates a 166 misleading file that downstream platforms may reject or mis-handle. 167 168 For those providers, generate into a truthful sibling path first, then let 169 the existing post-processing/conversion path promote it to ``.ogg`` when 170 conversion succeeds. If conversion is unavailable, the tool will still 171 return the truthful MP3/WAV path instead of a mislabeled file. 172 """ 173 provider = (provider or "").lower().strip() 174 requested_suffix = file_path.suffix.lower() 175 supported_suffixes_by_provider = { 176 "edge": {".mp3"}, 177 "xai": {".mp3", ".wav"}, 178 "minimax": {".mp3", ".wav", ".flac"}, 179 "neutts": {".wav"}, 180 "kittentts": {".wav"}, 181 "piper": {".wav"}, 182 } 183 preferred_suffix_by_provider = { 184 "edge": ".mp3", 185 "xai": ".mp3", 186 "minimax": ".mp3", 187 "neutts": ".wav", 188 "kittentts": ".wav", 189 "piper": ".wav", 190 } 191 192 supported_suffixes = supported_suffixes_by_provider.get(provider) 193 if not supported_suffixes or requested_suffix in supported_suffixes: 194 return file_path 195 196 return file_path.with_suffix(preferred_suffix_by_provider[provider]) 197 198 199 def _get_default_output_dir() -> str: 200 from hermes_constants import get_hermes_dir 201 return str(get_hermes_dir("cache/audio", "audio_cache")) 202 203 DEFAULT_OUTPUT_DIR = _get_default_output_dir() 204 205 # --------------------------------------------------------------------------- 206 # Per-provider input-character limits (from official provider docs). 207 # A single global cap was wrong: OpenAI is 4096, xAI is 15k, MiniMax is 10k, 208 # ElevenLabs is model-dependent (5k / 10k / 30k / 40k), Gemini caps at ~8k 209 # input tokens. Users can override any of these via 210 # ``tts.<provider>.max_text_length`` in config.yaml. 211 # --------------------------------------------------------------------------- 212 PROVIDER_MAX_TEXT_LENGTH: Dict[str, int] = { 213 "edge": 5000, # edge-tts practical sync limit 214 "openai": 4096, # https://platform.openai.com/docs/guides/text-to-speech 215 "xai": 15000, # https://docs.x.ai/developers/model-capabilities/audio/text-to-speech 216 "minimax": 10000, # https://platform.minimax.io/docs/api-reference/speech-t2a-http (sync) 217 "mistral": 4000, # conservative; no published per-request cap 218 "gemini": 5000, # Gemini TTS caps at ~8k input tokens / ~655s audio 219 "elevenlabs": 10000, # fallback when model-aware lookup can't resolve (multilingual_v2) 220 "neutts": 2000, # local model, quality falls off on long text 221 "kittentts": 2000, # local 25MB model 222 "piper": 5000, # local VITS model, phoneme-based; practical cap 223 } 224 225 # ElevenLabs caps vary by model_id. https://elevenlabs.io/docs/overview/models 226 ELEVENLABS_MODEL_MAX_TEXT_LENGTH: Dict[str, int] = { 227 "eleven_v3": 5000, 228 "eleven_ttv_v3": 5000, 229 "eleven_multilingual_v2": 10000, 230 "eleven_multilingual_v1": 10000, 231 "eleven_english_sts_v2": 10000, 232 "eleven_english_sts_v1": 10000, 233 "eleven_flash_v2": 30000, 234 "eleven_flash_v2_5": 40000, 235 } 236 237 # Final fallback when provider isn't recognised at all. 238 FALLBACK_MAX_TEXT_LENGTH = 4000 239 240 # Back-compat alias. Prefer ``_resolve_max_text_length()`` for new code. 241 MAX_TEXT_LENGTH = FALLBACK_MAX_TEXT_LENGTH 242 243 244 def _resolve_max_text_length( 245 provider: Optional[str], 246 tts_config: Optional[Dict[str, Any]] = None, 247 ) -> int: 248 """Return the input-character cap for *provider*. 249 250 Resolution order: 251 1. ``tts.<provider>.max_text_length`` (user override in config.yaml) 252 2. ``tts.providers.<provider>.max_text_length`` for user-declared 253 command providers 254 3. ElevenLabs model-aware table (keyed on configured ``model_id``) 255 4. ``PROVIDER_MAX_TEXT_LENGTH`` default 256 5. ``DEFAULT_COMMAND_TTS_MAX_TEXT_LENGTH`` when the provider is a 257 command-type user provider without an explicit cap 258 6. ``FALLBACK_MAX_TEXT_LENGTH`` (4000) 259 260 Non-positive or non-integer overrides fall through to the default so a 261 broken config can't accidentally disable truncation entirely. 262 """ 263 if not provider: 264 return FALLBACK_MAX_TEXT_LENGTH 265 key = provider.lower().strip() 266 cfg = tts_config or {} 267 268 # Built-in-style override at tts.<provider>.max_text_length wins first, 269 # matching historical behavior. 270 prov_cfg = cfg.get(key) if isinstance(cfg.get(key), dict) else {} 271 override = prov_cfg.get("max_text_length") if prov_cfg else None 272 if isinstance(override, bool): 273 override = None 274 if isinstance(override, int) and override > 0: 275 return override 276 277 if key == "elevenlabs": 278 model_id = (prov_cfg or {}).get("model_id") or DEFAULT_ELEVENLABS_MODEL_ID 279 mapped = ELEVENLABS_MODEL_MAX_TEXT_LENGTH.get(str(model_id).strip()) 280 if mapped: 281 return mapped 282 283 if key in PROVIDER_MAX_TEXT_LENGTH: 284 return PROVIDER_MAX_TEXT_LENGTH[key] 285 286 # User-declared command provider (under tts.providers.<name>) 287 if key not in BUILTIN_TTS_PROVIDERS: 288 named = _get_named_provider_config(cfg, key) 289 if _is_command_provider_config(named): 290 named_override = named.get("max_text_length") 291 if isinstance(named_override, bool): 292 named_override = None 293 if isinstance(named_override, int) and named_override > 0: 294 return named_override 295 return DEFAULT_COMMAND_TTS_MAX_TEXT_LENGTH 296 297 return FALLBACK_MAX_TEXT_LENGTH 298 299 300 # =========================================================================== 301 # Config loader -- reads tts: section from ~/.hermes/config.yaml 302 # =========================================================================== 303 def _load_tts_config() -> Dict[str, Any]: 304 """ 305 Load TTS configuration from ~/.hermes/config.yaml. 306 307 Returns a dict with provider settings. Falls back to defaults 308 for any missing fields. 309 """ 310 try: 311 from hermes_cli.config import load_config 312 config = load_config() 313 return config.get("tts", {}) 314 except ImportError: 315 logger.debug("hermes_cli.config not available, using default TTS config") 316 return {} 317 except Exception as e: 318 logger.warning("Failed to load TTS config: %s", e, exc_info=True) 319 return {} 320 321 322 def _get_provider(tts_config: Dict[str, Any]) -> str: 323 """Get the configured TTS provider name.""" 324 return (tts_config.get("provider") or DEFAULT_PROVIDER).lower().strip() 325 326 327 # =========================================================================== 328 # Custom command providers (type: command under tts.providers.<name>) 329 # =========================================================================== 330 # 331 # Users can declare any number of command-type providers alongside the 332 # built-ins so they can plug any local CLI (Piper, VoxCPM, Kokoro CLIs, 333 # custom voice-cloning scripts, etc.) into Hermes without any Python code 334 # changes. The config shape is:: 335 # 336 # tts: 337 # provider: piper-en 338 # providers: 339 # piper-en: 340 # type: command 341 # command: "piper -m ~/model.onnx -f {output_path} < {input_path}" 342 # output_format: wav 343 # 344 # Hermes writes the input text to a temp UTF-8 file, runs the command with 345 # placeholder substitution, and reads the audio file the command wrote to 346 # ``{output_path}``. Supported placeholders: ``{input_path}``, 347 # ``{text_path}`` (alias for input_path), ``{output_path}``, ``{format}``, 348 # ``{voice}``, ``{model}``, ``{speed}``. Use ``{{`` / ``}}`` for literal braces. 349 # 350 # Built-in provider names always win over an entry with the same name under 351 # ``tts.providers``, so user config can't silently shadow ``edge`` etc. 352 # 353 # Placeholder values are shell-quoted for their surrounding context 354 # (bare / single / double quote), so paths with spaces work transparently. 355 356 # Built-in provider names. Any ``tts.provider`` value NOT in this set is 357 # interpreted as a reference to ``tts.providers.<name>``. 358 BUILTIN_TTS_PROVIDERS = frozenset({ 359 "edge", 360 "elevenlabs", 361 "openai", 362 "minimax", 363 "xai", 364 "mistral", 365 "gemini", 366 "neutts", 367 "kittentts", 368 "piper", 369 }) 370 371 DEFAULT_COMMAND_TTS_TIMEOUT_SECONDS = 120 372 DEFAULT_COMMAND_TTS_OUTPUT_FORMAT = "mp3" 373 COMMAND_TTS_OUTPUT_FORMATS = frozenset({"mp3", "wav", "ogg", "flac"}) 374 DEFAULT_COMMAND_TTS_MAX_TEXT_LENGTH = 5000 375 376 377 def _get_provider_section(tts_config: Dict[str, Any], name: str) -> Dict[str, Any]: 378 """Return a provider config block if it's a dict, else an empty dict.""" 379 if not isinstance(tts_config, dict): 380 return {} 381 section = tts_config.get(name) 382 return section if isinstance(section, dict) else {} 383 384 385 def _get_named_provider_config( 386 tts_config: Dict[str, Any], 387 name: str, 388 ) -> Dict[str, Any]: 389 """Return the config dict for a user-declared provider. 390 391 Looks up ``tts.providers.<name>`` first (the canonical location), and 392 falls back to ``tts.<name>`` so users who followed the built-in layout 393 still work. Returns an empty dict when the provider is not declared. 394 """ 395 providers = _get_provider_section(tts_config, "providers") 396 section = providers.get(name) if isinstance(providers, dict) else None 397 if isinstance(section, dict): 398 return section 399 # Back-compat: allow ``tts.<name>`` for user-declared providers too, 400 # but only when the name is not a built-in (so a user's ``tts.openai`` 401 # block still means the OpenAI provider, not a custom command). 402 if name.lower() not in BUILTIN_TTS_PROVIDERS: 403 legacy = _get_provider_section(tts_config, name) 404 if legacy: 405 return legacy 406 return {} 407 408 409 def _is_command_provider_config(config: Dict[str, Any]) -> bool: 410 """Return True when *config* declares a command-type provider.""" 411 if not isinstance(config, dict): 412 return False 413 ptype = str(config.get("type") or "").strip().lower() 414 if ptype and ptype != "command": 415 return False 416 command = config.get("command") 417 return isinstance(command, str) and bool(command.strip()) 418 419 420 def _resolve_command_provider_config( 421 provider: str, 422 tts_config: Dict[str, Any], 423 ) -> Optional[Dict[str, Any]]: 424 """Return the provider config if *provider* resolves to a command type. 425 426 Built-in provider names are rejected (they have native handlers). 427 Returns None when the name is a built-in, unknown, or not a command 428 type. 429 """ 430 if not provider: 431 return None 432 key = provider.lower().strip() 433 if key in BUILTIN_TTS_PROVIDERS: 434 return None 435 config = _get_named_provider_config(tts_config, key) 436 if _is_command_provider_config(config): 437 return config 438 return None 439 440 441 def _iter_command_providers(tts_config: Dict[str, Any]): 442 """Yield (name, config) pairs for every declared command-type provider.""" 443 if not isinstance(tts_config, dict): 444 return 445 providers = _get_provider_section(tts_config, "providers") 446 for name, cfg in (providers or {}).items(): 447 if isinstance(name, str) and name.lower() not in BUILTIN_TTS_PROVIDERS: 448 if _is_command_provider_config(cfg): 449 yield name, cfg 450 451 452 def _get_command_tts_timeout(config: Dict[str, Any]) -> float: 453 """Return timeout in seconds, falling back when invalid.""" 454 raw = config.get("timeout", config.get("timeout_seconds", DEFAULT_COMMAND_TTS_TIMEOUT_SECONDS)) 455 try: 456 value = float(raw) 457 except (TypeError, ValueError): 458 return float(DEFAULT_COMMAND_TTS_TIMEOUT_SECONDS) 459 if value <= 0: 460 return float(DEFAULT_COMMAND_TTS_TIMEOUT_SECONDS) 461 return value 462 463 464 def _get_command_tts_output_format( 465 config: Dict[str, Any], 466 output_path: Optional[str] = None, 467 ) -> str: 468 """Return the validated output format (mp3/wav/ogg/flac).""" 469 if output_path: 470 suffix = Path(output_path).suffix.lower().strip().lstrip(".") 471 if suffix in COMMAND_TTS_OUTPUT_FORMATS: 472 return suffix 473 raw = ( 474 config.get("format") 475 or config.get("output_format") 476 or DEFAULT_COMMAND_TTS_OUTPUT_FORMAT 477 ) 478 fmt = str(raw).lower().strip().lstrip(".") 479 return fmt if fmt in COMMAND_TTS_OUTPUT_FORMATS else DEFAULT_COMMAND_TTS_OUTPUT_FORMAT 480 481 482 def _is_command_tts_voice_compatible(config: Dict[str, Any]) -> bool: 483 """Return True only when the user explicitly opted in to voice delivery.""" 484 value = config.get("voice_compatible", False) 485 if isinstance(value, str): 486 return value.strip().lower() in {"1", "true", "yes", "on"} 487 return bool(value) 488 489 490 def _shell_quote_context(command_template: str, position: int) -> Optional[str]: 491 """Return the shell quote character active right before *position*. 492 493 Returns ``"'"`` / ``'"'`` when inside a single- / double-quoted region 494 of the template, ``None`` for bare context. 495 """ 496 quote: Optional[str] = None 497 escaped = False 498 i = 0 499 while i < position: 500 char = command_template[i] 501 if quote == "'": 502 if char == "'": 503 quote = None 504 elif quote == '"': 505 if escaped: 506 escaped = False 507 elif char == "\\": 508 escaped = True 509 elif char == '"': 510 quote = None 511 else: 512 if char == "'": 513 quote = "'" 514 elif char == '"': 515 quote = '"' 516 elif char == "\\": 517 i += 1 518 i += 1 519 return quote 520 521 522 def _quote_command_tts_placeholder(value: str, quote_context: Optional[str]) -> str: 523 """Quote a placeholder value for its position in a shell command template.""" 524 if quote_context == "'": 525 return value.replace("'", r"'\''") 526 if quote_context == '"': 527 return ( 528 value 529 .replace("\\", "\\\\") 530 .replace('"', r'\"') 531 .replace("$", r"\$") 532 .replace("`", r"\`") 533 ) 534 if os.name == "nt": 535 return subprocess.list2cmdline([value]) 536 return shlex.quote(value) 537 538 539 def _render_command_tts_template( 540 command_template: str, 541 placeholders: Dict[str, str], 542 ) -> str: 543 """Replace supported placeholders while preserving ``{{`` / ``}}``.""" 544 names = "|".join(re.escape(name) for name in placeholders) 545 pattern = re.compile( 546 rf"(?<!\$)(?:\{{\{{(?P<double>{names})\}}\}}|\{{(?P<single>{names})\}})" 547 ) 548 replacements: list[tuple[str, str]] = [] 549 550 def replace_match(match: re.Match[str]) -> str: 551 name = match.group("double") or match.group("single") 552 token = f"__HERMES_TTS_PLACEHOLDER_{len(replacements)}__" 553 replacements.append(( 554 token, 555 _quote_command_tts_placeholder( 556 placeholders[name], 557 _shell_quote_context(command_template, match.start()), 558 ), 559 )) 560 return token 561 562 rendered = pattern.sub(replace_match, command_template) 563 rendered = rendered.replace("{{", "{").replace("}}", "}") 564 for token, value in replacements: 565 rendered = rendered.replace(token, value) 566 return rendered 567 568 569 def _terminate_command_tts_process_tree(proc: subprocess.Popen) -> None: 570 """Best-effort termination of a shell process and all of its children.""" 571 if proc.poll() is not None: 572 return 573 574 if os.name == "nt": 575 try: 576 subprocess.run( 577 ["taskkill", "/F", "/T", "/PID", str(proc.pid)], 578 stdout=subprocess.DEVNULL, 579 stderr=subprocess.DEVNULL, 580 timeout=5, 581 ) 582 except Exception: 583 proc.kill() 584 return 585 586 try: 587 os.killpg(proc.pid, signal.SIGTERM) 588 except ProcessLookupError: 589 return 590 except Exception: 591 proc.terminate() 592 593 try: 594 proc.wait(timeout=2) 595 return 596 except subprocess.TimeoutExpired: 597 pass 598 599 try: 600 os.killpg(proc.pid, signal.SIGKILL) 601 except ProcessLookupError: 602 return 603 except Exception: 604 proc.kill() 605 606 607 def _run_command_tts(command: str, timeout: float) -> subprocess.CompletedProcess: 608 """Run a command-provider shell command with process-tree timeout cleanup.""" 609 popen_kwargs: Dict[str, Any] = { 610 "shell": True, 611 "stdout": subprocess.PIPE, 612 "stderr": subprocess.PIPE, 613 "text": True, 614 } 615 if os.name == "nt": 616 popen_kwargs["creationflags"] = getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) 617 else: 618 popen_kwargs["start_new_session"] = True 619 620 proc = subprocess.Popen(command, **popen_kwargs) 621 try: 622 stdout, stderr = proc.communicate(timeout=timeout) 623 except subprocess.TimeoutExpired as exc: 624 _terminate_command_tts_process_tree(proc) 625 try: 626 stdout, stderr = proc.communicate(timeout=1) 627 except Exception: 628 stdout = getattr(exc, "output", None) 629 stderr = getattr(exc, "stderr", None) 630 raise subprocess.TimeoutExpired( 631 command, 632 timeout, 633 output=stdout, 634 stderr=stderr, 635 ) from exc 636 637 if proc.returncode: 638 raise subprocess.CalledProcessError( 639 proc.returncode, 640 command, 641 output=stdout, 642 stderr=stderr, 643 ) 644 return subprocess.CompletedProcess(command, proc.returncode, stdout, stderr) 645 646 647 def _configured_command_tts_output_path(path: Path, config: Dict[str, Any]) -> Path: 648 """Return an output path whose extension matches the provider's output_format.""" 649 fmt = _get_command_tts_output_format(config) 650 return path.with_suffix(f".{fmt}") 651 652 653 def _generate_command_tts( 654 text: str, 655 output_path: str, 656 provider_name: str, 657 config: Dict[str, Any], 658 tts_config: Dict[str, Any], 659 ) -> str: 660 """Generate speech by running a user-configured shell command. 661 662 Returns the absolute path of the audio file the command wrote. 663 Raises ``ValueError`` when the provider config is invalid, and 664 ``RuntimeError`` for timeouts / non-zero exits / empty output. 665 """ 666 command_template = str(config.get("command") or "").strip() 667 if not command_template: 668 raise ValueError( 669 f"tts.providers.{provider_name}.command is not configured" 670 ) 671 672 output = Path(output_path).expanduser() 673 output.parent.mkdir(parents=True, exist_ok=True) 674 if output.exists(): 675 output.unlink() 676 677 timeout = _get_command_tts_timeout(config) 678 output_format = _get_command_tts_output_format(config, str(output)) 679 speed = config.get("speed", tts_config.get("speed", "")) 680 681 with tempfile.TemporaryDirectory() as tmpdir: 682 text_path = Path(tmpdir) / "input.txt" 683 text_path.write_text(text, encoding="utf-8") 684 685 placeholders = { 686 "input_path": str(text_path), 687 "text_path": str(text_path), 688 "output_path": str(output), 689 "format": output_format, 690 "voice": str(config.get("voice", "")), 691 "model": str(config.get("model", "")), 692 "speed": str(speed), 693 } 694 command = _render_command_tts_template(command_template, placeholders) 695 696 try: 697 _run_command_tts(command, timeout) 698 except subprocess.TimeoutExpired as exc: 699 raise RuntimeError( 700 f"TTS provider '{provider_name}' timed out after {timeout:g}s" 701 ) from exc 702 except subprocess.CalledProcessError as exc: 703 detail_parts = [] 704 if exc.stderr: 705 detail_parts.append(f"stderr: {exc.stderr.strip()}") 706 if exc.stdout: 707 detail_parts.append(f"stdout: {exc.stdout.strip()}") 708 detail = "; ".join(detail_parts) or "no command output" 709 raise RuntimeError( 710 f"TTS provider '{provider_name}' exited with code " 711 f"{exc.returncode}: {detail}" 712 ) from exc 713 714 if not output.exists() or output.stat().st_size <= 0: 715 raise RuntimeError( 716 f"TTS provider '{provider_name}' produced no output at {output}" 717 ) 718 return str(output) 719 720 721 def _has_any_command_tts_provider(tts_config: Optional[Dict[str, Any]] = None) -> bool: 722 """Return True when any command-type TTS provider is configured.""" 723 if tts_config is None: 724 tts_config = _load_tts_config() 725 for _name, _cfg in _iter_command_providers(tts_config): 726 return True 727 return False 728 729 730 # =========================================================================== 731 # ffmpeg Opus conversion (Edge TTS MP3 -> OGG Opus for Telegram) 732 # =========================================================================== 733 def _has_ffmpeg() -> bool: 734 """Check if ffmpeg is available on the system.""" 735 return shutil.which("ffmpeg") is not None 736 737 738 def _convert_to_opus(mp3_path: str) -> Optional[str]: 739 """ 740 Convert an MP3 file to OGG Opus format for Telegram voice bubbles. 741 742 Args: 743 mp3_path: Path to the input MP3 file. 744 745 Returns: 746 Path to the .ogg file, or None if conversion fails. 747 """ 748 if not _has_ffmpeg(): 749 return None 750 751 ogg_path = mp3_path.rsplit(".", 1)[0] + ".ogg" 752 try: 753 result = subprocess.run( 754 ["ffmpeg", "-i", mp3_path, "-acodec", "libopus", 755 "-ac", "1", "-b:a", "64k", "-vbr", "off", ogg_path, "-y"], 756 capture_output=True, timeout=30, 757 ) 758 if result.returncode != 0: 759 logger.warning("ffmpeg conversion failed with return code %d: %s", 760 result.returncode, result.stderr.decode('utf-8', errors='ignore')[:200]) 761 return None 762 if os.path.exists(ogg_path) and os.path.getsize(ogg_path) > 0: 763 return ogg_path 764 except subprocess.TimeoutExpired: 765 logger.warning("ffmpeg OGG conversion timed out after 30s") 766 except FileNotFoundError: 767 logger.warning("ffmpeg not found in PATH") 768 except Exception as e: 769 logger.warning("ffmpeg OGG conversion failed: %s", e, exc_info=True) 770 return None 771 772 773 # =========================================================================== 774 # Provider: Edge TTS (free) 775 # =========================================================================== 776 async def _generate_edge_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 777 """ 778 Generate audio using Edge TTS. 779 780 Args: 781 text: Text to convert. 782 output_path: Where to save the MP3 file. 783 tts_config: TTS config dict. 784 785 Returns: 786 Path to the saved audio file. 787 """ 788 _edge_tts = _import_edge_tts() 789 edge_config = tts_config.get("edge", {}) 790 voice = edge_config.get("voice", DEFAULT_EDGE_VOICE) 791 speed = float(edge_config.get("speed", tts_config.get("speed", 1.0))) 792 793 kwargs = {"voice": voice} 794 if speed != 1.0: 795 pct = round((speed - 1.0) * 100) 796 kwargs["rate"] = f"{pct:+d}%" 797 798 communicate = _edge_tts.Communicate(text, **kwargs) 799 await communicate.save(output_path) 800 return output_path 801 802 803 # =========================================================================== 804 # Provider: ElevenLabs (premium) 805 # =========================================================================== 806 def _generate_elevenlabs(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 807 """ 808 Generate audio using ElevenLabs. 809 810 Args: 811 text: Text to convert. 812 output_path: Where to save the audio file. 813 tts_config: TTS config dict. 814 815 Returns: 816 Path to the saved audio file. 817 """ 818 api_key = (get_env_value("ELEVENLABS_API_KEY") or "") 819 if not api_key: 820 raise ValueError("ELEVENLABS_API_KEY not set. Get one at https://elevenlabs.io/") 821 822 el_config = tts_config.get("elevenlabs", {}) 823 voice_id = el_config.get("voice_id", DEFAULT_ELEVENLABS_VOICE_ID) 824 model_id = el_config.get("model_id", DEFAULT_ELEVENLABS_MODEL_ID) 825 826 # Determine output format based on file extension 827 if output_path.endswith(".ogg"): 828 output_format = "opus_48000_64" 829 else: 830 output_format = "mp3_44100_128" 831 832 ElevenLabs = _import_elevenlabs() 833 client = ElevenLabs(api_key=api_key) 834 audio_generator = client.text_to_speech.convert( 835 text=text, 836 voice_id=voice_id, 837 model_id=model_id, 838 output_format=output_format, 839 ) 840 841 # audio_generator yields chunks -- write them all 842 with open(output_path, "wb") as f: 843 for chunk in audio_generator: 844 f.write(chunk) 845 846 return output_path 847 848 849 # =========================================================================== 850 # Provider: OpenAI TTS 851 # =========================================================================== 852 def _generate_openai_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 853 """ 854 Generate audio using OpenAI TTS. 855 856 Args: 857 text: Text to convert. 858 output_path: Where to save the audio file. 859 tts_config: TTS config dict. 860 861 Returns: 862 Path to the saved audio file. 863 """ 864 api_key, base_url = _resolve_openai_audio_client_config() 865 866 oai_config = tts_config.get("openai", {}) 867 model = oai_config.get("model", DEFAULT_OPENAI_MODEL) 868 voice = oai_config.get("voice", DEFAULT_OPENAI_VOICE) 869 base_url = oai_config.get("base_url", base_url) 870 speed = float(oai_config.get("speed", tts_config.get("speed", 1.0))) 871 872 # Determine response format from extension 873 if output_path.endswith(".ogg"): 874 response_format = "opus" 875 else: 876 response_format = "mp3" 877 878 OpenAIClient = _import_openai_client() 879 client = OpenAIClient(api_key=api_key, base_url=base_url) 880 try: 881 create_kwargs = dict( 882 model=model, 883 voice=voice, 884 input=text, 885 response_format=response_format, 886 extra_headers={"x-idempotency-key": str(uuid.uuid4())}, 887 ) 888 if speed != 1.0: 889 create_kwargs["speed"] = max(0.25, min(4.0, speed)) 890 response = client.audio.speech.create(**create_kwargs) 891 892 response.stream_to_file(output_path) 893 return output_path 894 finally: 895 close = getattr(client, "close", None) 896 if callable(close): 897 close() 898 899 900 # =========================================================================== 901 # Provider: xAI TTS 902 # =========================================================================== 903 def _generate_xai_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 904 """ 905 Generate audio using xAI TTS. 906 907 xAI exposes a dedicated /v1/tts endpoint instead of the OpenAI audio.speech 908 API shape, so this is implemented as a separate backend. 909 """ 910 import requests 911 912 api_key = (get_env_value("XAI_API_KEY") or "").strip() 913 if not api_key: 914 raise ValueError("XAI_API_KEY not set. Get one at https://console.x.ai/") 915 916 xai_config = tts_config.get("xai", {}) 917 voice_id = str(xai_config.get("voice_id", DEFAULT_XAI_VOICE_ID)).strip() or DEFAULT_XAI_VOICE_ID 918 language = str(xai_config.get("language", DEFAULT_XAI_LANGUAGE)).strip() or DEFAULT_XAI_LANGUAGE 919 sample_rate = int(xai_config.get("sample_rate", DEFAULT_XAI_SAMPLE_RATE)) 920 bit_rate = int(xai_config.get("bit_rate", DEFAULT_XAI_BIT_RATE)) 921 base_url = str( 922 xai_config.get("base_url") 923 or get_env_value("XAI_BASE_URL") 924 or DEFAULT_XAI_BASE_URL 925 ).strip().rstrip("/") 926 927 # Match the documented minimal POST /v1/tts shape by default. Only send 928 # output_format when Hermes actually needs a non-default format/override. 929 codec = "wav" if output_path.endswith(".wav") else "mp3" 930 payload: Dict[str, Any] = { 931 "text": text, 932 "voice_id": voice_id, 933 "language": language, 934 } 935 if ( 936 codec != "mp3" 937 or sample_rate != DEFAULT_XAI_SAMPLE_RATE 938 or (codec == "mp3" and bit_rate != DEFAULT_XAI_BIT_RATE) 939 ): 940 output_format: Dict[str, Any] = {"codec": codec} 941 if sample_rate: 942 output_format["sample_rate"] = sample_rate 943 if codec == "mp3" and bit_rate: 944 output_format["bit_rate"] = bit_rate 945 payload["output_format"] = output_format 946 947 response = requests.post( 948 f"{base_url}/tts", 949 headers={ 950 "Authorization": f"Bearer {api_key}", 951 "Content-Type": "application/json", 952 "User-Agent": hermes_xai_user_agent(), 953 }, 954 json=payload, 955 timeout=60, 956 ) 957 response.raise_for_status() 958 959 with open(output_path, "wb") as f: 960 f.write(response.content) 961 962 return output_path 963 964 965 # =========================================================================== 966 # Provider: MiniMax TTS 967 # =========================================================================== 968 def _generate_minimax_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 969 """ 970 Generate audio using MiniMax TTS API. 971 972 MiniMax returns hex-encoded audio data. Supports streaming (SSE) and 973 non-streaming modes. This implementation uses non-streaming for simplicity. 974 975 Args: 976 text: Text to convert (max 10,000 characters). 977 output_path: Where to save the audio file. 978 tts_config: TTS config dict. 979 980 Returns: 981 Path to the saved audio file. 982 """ 983 import requests 984 985 api_key = (get_env_value("MINIMAX_API_KEY") or "") 986 if not api_key: 987 raise ValueError("MINIMAX_API_KEY not set. Get one at https://platform.minimax.io/") 988 989 mm_config = tts_config.get("minimax", {}) 990 model = mm_config.get("model", DEFAULT_MINIMAX_MODEL) 991 voice_id = mm_config.get("voice_id", DEFAULT_MINIMAX_VOICE_ID) 992 speed = mm_config.get("speed", tts_config.get("speed", 1)) 993 vol = mm_config.get("vol", 1) 994 pitch = mm_config.get("pitch", 0) 995 base_url = mm_config.get("base_url", DEFAULT_MINIMAX_BASE_URL) 996 997 # Determine audio format from output extension 998 if output_path.endswith(".wav"): 999 audio_format = "wav" 1000 elif output_path.endswith(".flac"): 1001 audio_format = "flac" 1002 else: 1003 audio_format = "mp3" 1004 1005 payload = { 1006 "model": model, 1007 "text": text, 1008 "stream": False, 1009 "voice_setting": { 1010 "voice_id": voice_id, 1011 "speed": speed, 1012 "vol": vol, 1013 "pitch": pitch, 1014 }, 1015 "audio_setting": { 1016 "sample_rate": 32000, 1017 "bitrate": 128000, 1018 "format": audio_format, 1019 "channel": 1, 1020 }, 1021 } 1022 1023 headers = { 1024 "Content-Type": "application/json", 1025 "Authorization": f"Bearer {api_key}", 1026 } 1027 1028 response = requests.post(base_url, json=payload, headers=headers, timeout=60) 1029 response.raise_for_status() 1030 1031 result = response.json() 1032 base_resp = result.get("base_resp", {}) 1033 status_code = base_resp.get("status_code", -1) 1034 1035 if status_code != 0: 1036 status_msg = base_resp.get("status_msg", "unknown error") 1037 raise RuntimeError(f"MiniMax TTS API error (code {status_code}): {status_msg}") 1038 1039 hex_audio = result.get("data", {}).get("audio", "") 1040 if not hex_audio: 1041 raise RuntimeError("MiniMax TTS returned empty audio data") 1042 1043 # MiniMax returns hex-encoded audio (not base64) 1044 audio_bytes = bytes.fromhex(hex_audio) 1045 1046 with open(output_path, "wb") as f: 1047 f.write(audio_bytes) 1048 1049 return output_path 1050 1051 1052 # =========================================================================== 1053 # Provider: Mistral (Voxtral TTS) 1054 # =========================================================================== 1055 def _generate_mistral_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 1056 """Generate audio using Mistral Voxtral TTS API. 1057 1058 The API returns base64-encoded audio; this function decodes it 1059 and writes the raw bytes to *output_path*. 1060 Supports native Opus output for Telegram voice bubbles. 1061 """ 1062 api_key = (get_env_value("MISTRAL_API_KEY") or "") 1063 if not api_key: 1064 raise ValueError("MISTRAL_API_KEY not set. Get one at https://console.mistral.ai/") 1065 1066 mi_config = tts_config.get("mistral", {}) 1067 model = mi_config.get("model", DEFAULT_MISTRAL_TTS_MODEL) 1068 voice_id = mi_config.get("voice_id") or DEFAULT_MISTRAL_TTS_VOICE_ID 1069 1070 if output_path.endswith(".ogg"): 1071 response_format = "opus" 1072 elif output_path.endswith(".wav"): 1073 response_format = "wav" 1074 elif output_path.endswith(".flac"): 1075 response_format = "flac" 1076 else: 1077 response_format = "mp3" 1078 1079 Mistral = _import_mistral_client() 1080 try: 1081 with Mistral(api_key=api_key) as client: 1082 response = client.audio.speech.complete( 1083 model=model, 1084 input=text, 1085 voice_id=voice_id, 1086 response_format=response_format, 1087 ) 1088 audio_bytes = base64.b64decode(response.audio_data) 1089 except ValueError: 1090 raise 1091 except Exception as e: 1092 logger.error("Mistral TTS failed: %s", e, exc_info=True) 1093 raise RuntimeError(f"Mistral TTS failed: {type(e).__name__}") from e 1094 1095 with open(output_path, "wb") as f: 1096 f.write(audio_bytes) 1097 1098 return output_path 1099 1100 1101 # =========================================================================== 1102 # Provider: Google Gemini TTS 1103 # =========================================================================== 1104 def _wrap_pcm_as_wav( 1105 pcm_bytes: bytes, 1106 sample_rate: int = GEMINI_TTS_SAMPLE_RATE, 1107 channels: int = GEMINI_TTS_CHANNELS, 1108 sample_width: int = GEMINI_TTS_SAMPLE_WIDTH, 1109 ) -> bytes: 1110 """Wrap raw signed-little-endian PCM with a standard WAV RIFF header. 1111 1112 Gemini TTS returns audio/L16;codec=pcm;rate=24000 -- raw PCM samples with 1113 no container. We add a minimal WAV header so the file is playable and 1114 ffmpeg can re-encode it to MP3/Opus downstream. 1115 """ 1116 import struct 1117 1118 byte_rate = sample_rate * channels * sample_width 1119 block_align = channels * sample_width 1120 data_size = len(pcm_bytes) 1121 fmt_chunk = struct.pack( 1122 "<4sIHHIIHH", 1123 b"fmt ", 1124 16, # fmt chunk size (PCM) 1125 1, # audio format (PCM) 1126 channels, 1127 sample_rate, 1128 byte_rate, 1129 block_align, 1130 sample_width * 8, 1131 ) 1132 data_chunk_header = struct.pack("<4sI", b"data", data_size) 1133 riff_size = 4 + len(fmt_chunk) + len(data_chunk_header) + data_size 1134 riff_header = struct.pack("<4sI4s", b"RIFF", riff_size, b"WAVE") 1135 return riff_header + fmt_chunk + data_chunk_header + pcm_bytes 1136 1137 1138 def _generate_gemini_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 1139 """Generate audio using Google Gemini TTS. 1140 1141 Gemini's generateContent endpoint with responseModalities=["AUDIO"] returns 1142 raw 24kHz mono 16-bit PCM (L16) as base64. We wrap it with a WAV RIFF 1143 header to produce a playable file, then ffmpeg-convert to MP3 / Opus if 1144 the caller requested those formats (same pattern as NeuTTS). 1145 1146 Args: 1147 text: Text to convert (prompt-style; supports inline direction like 1148 "Say cheerfully:" and audio tags like [whispers]). 1149 output_path: Where to save the audio file (.wav, .mp3, or .ogg). 1150 tts_config: TTS config dict. 1151 1152 Returns: 1153 Path to the saved audio file. 1154 """ 1155 import requests 1156 1157 api_key = (get_env_value("GEMINI_API_KEY") or get_env_value("GOOGLE_API_KEY") or "").strip() 1158 if not api_key: 1159 raise ValueError( 1160 "GEMINI_API_KEY not set. Get one at https://aistudio.google.com/app/apikey" 1161 ) 1162 1163 gemini_config = tts_config.get("gemini", {}) 1164 model = str(gemini_config.get("model", DEFAULT_GEMINI_TTS_MODEL)).strip() or DEFAULT_GEMINI_TTS_MODEL 1165 voice = str(gemini_config.get("voice", DEFAULT_GEMINI_TTS_VOICE)).strip() or DEFAULT_GEMINI_TTS_VOICE 1166 base_url = str( 1167 gemini_config.get("base_url") 1168 or get_env_value("GEMINI_BASE_URL") 1169 or DEFAULT_GEMINI_TTS_BASE_URL 1170 ).strip().rstrip("/") 1171 1172 payload: Dict[str, Any] = { 1173 "contents": [{"parts": [{"text": text}]}], 1174 "generationConfig": { 1175 "responseModalities": ["AUDIO"], 1176 "speechConfig": { 1177 "voiceConfig": { 1178 "prebuiltVoiceConfig": {"voiceName": voice}, 1179 }, 1180 }, 1181 }, 1182 } 1183 1184 endpoint = f"{base_url}/models/{model}:generateContent" 1185 response = requests.post( 1186 endpoint, 1187 params={"key": api_key}, 1188 headers={"Content-Type": "application/json"}, 1189 json=payload, 1190 timeout=60, 1191 ) 1192 if response.status_code != 200: 1193 # Surface the API error message when present 1194 try: 1195 err = response.json().get("error", {}) 1196 detail = err.get("message") or response.text[:300] 1197 except Exception: 1198 detail = response.text[:300] 1199 raise RuntimeError( 1200 f"Gemini TTS API error (HTTP {response.status_code}): {detail}" 1201 ) 1202 1203 try: 1204 data = response.json() 1205 parts = data["candidates"][0]["content"]["parts"] 1206 audio_part = next((p for p in parts if "inlineData" in p or "inline_data" in p), None) 1207 if audio_part is None: 1208 raise RuntimeError("Gemini TTS response contained no audio data") 1209 inline = audio_part.get("inlineData") or audio_part.get("inline_data") or {} 1210 audio_b64 = inline.get("data", "") 1211 except (KeyError, IndexError, TypeError) as e: 1212 raise RuntimeError(f"Gemini TTS response was malformed: {e}") from e 1213 1214 if not audio_b64: 1215 raise RuntimeError("Gemini TTS returned empty audio data") 1216 1217 pcm_bytes = base64.b64decode(audio_b64) 1218 wav_bytes = _wrap_pcm_as_wav(pcm_bytes) 1219 1220 # Fast path: caller wants WAV directly, just write. 1221 if output_path.lower().endswith(".wav"): 1222 with open(output_path, "wb") as f: 1223 f.write(wav_bytes) 1224 return output_path 1225 1226 # Otherwise write WAV to a temp file and ffmpeg-convert to the target 1227 # format (.mp3 or .ogg). If ffmpeg is missing, fall back to renaming the 1228 # WAV -- this matches the NeuTTS behavior and keeps the tool usable on 1229 # systems without ffmpeg (audio still plays, just with a misleading 1230 # extension). 1231 with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: 1232 tmp.write(wav_bytes) 1233 wav_path = tmp.name 1234 1235 try: 1236 ffmpeg = shutil.which("ffmpeg") 1237 if ffmpeg: 1238 # For .ogg output, force libopus encoding (Telegram voice bubbles 1239 # require Opus specifically; ffmpeg's default for .ogg is Vorbis). 1240 if output_path.lower().endswith(".ogg"): 1241 cmd = [ 1242 ffmpeg, "-i", wav_path, 1243 "-acodec", "libopus", "-ac", "1", 1244 "-b:a", "64k", "-vbr", "off", 1245 "-y", "-loglevel", "error", 1246 output_path, 1247 ] 1248 else: 1249 cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path] 1250 result = subprocess.run(cmd, capture_output=True, timeout=30) 1251 if result.returncode != 0: 1252 stderr = result.stderr.decode("utf-8", errors="ignore")[:300] 1253 raise RuntimeError(f"ffmpeg conversion failed: {stderr}") 1254 else: 1255 logger.warning( 1256 "ffmpeg not found; writing raw WAV to %s (extension may be misleading)", 1257 output_path, 1258 ) 1259 shutil.copyfile(wav_path, output_path) 1260 finally: 1261 try: 1262 os.remove(wav_path) 1263 except OSError: 1264 pass 1265 1266 return output_path 1267 1268 1269 # =========================================================================== 1270 # NeuTTS (local, on-device TTS via neutts_cli) 1271 # =========================================================================== 1272 1273 def _check_neutts_available() -> bool: 1274 """Check if the neutts engine is importable (installed locally).""" 1275 try: 1276 import importlib.util 1277 return importlib.util.find_spec("neutts") is not None 1278 except Exception: 1279 return False 1280 1281 1282 def _check_kittentts_available() -> bool: 1283 """Check if the kittentts engine is importable (installed locally).""" 1284 try: 1285 import importlib.util 1286 return importlib.util.find_spec("kittentts") is not None 1287 except Exception: 1288 return False 1289 1290 1291 def _default_neutts_ref_audio() -> str: 1292 """Return path to the bundled default voice reference audio.""" 1293 return str(Path(__file__).parent / "neutts_samples" / "jo.wav") 1294 1295 1296 def _default_neutts_ref_text() -> str: 1297 """Return path to the bundled default voice reference transcript.""" 1298 return str(Path(__file__).parent / "neutts_samples" / "jo.txt") 1299 1300 1301 def _generate_neutts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 1302 """Generate speech using the local NeuTTS engine. 1303 1304 Runs synthesis in a subprocess via tools/neutts_synth.py to keep the 1305 ~500MB model in a separate process that exits after synthesis. 1306 Outputs WAV; the caller handles conversion for Telegram if needed. 1307 """ 1308 import sys 1309 1310 neutts_config = tts_config.get("neutts", {}) 1311 ref_audio = neutts_config.get("ref_audio", "") or _default_neutts_ref_audio() 1312 ref_text = neutts_config.get("ref_text", "") or _default_neutts_ref_text() 1313 model = neutts_config.get("model", "neuphonic/neutts-air-q4-gguf") 1314 device = neutts_config.get("device", "cpu") 1315 1316 # NeuTTS outputs WAV natively — use a .wav path for generation, 1317 # let the caller convert to the final format afterward. 1318 wav_path = output_path 1319 if not output_path.endswith(".wav"): 1320 wav_path = output_path.rsplit(".", 1)[0] + ".wav" 1321 1322 synth_script = str(Path(__file__).parent / "neutts_synth.py") 1323 cmd = [ 1324 sys.executable, synth_script, 1325 "--text", text, 1326 "--out", wav_path, 1327 "--ref-audio", ref_audio, 1328 "--ref-text", ref_text, 1329 "--model", model, 1330 "--device", device, 1331 ] 1332 1333 result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) 1334 if result.returncode != 0: 1335 stderr = result.stderr.strip() 1336 # Filter out the "OK:" line from stderr 1337 error_lines = [l for l in stderr.splitlines() if not l.startswith("OK:")] 1338 raise RuntimeError(f"NeuTTS synthesis failed: {chr(10).join(error_lines) or 'unknown error'}") 1339 1340 # If the caller wanted .mp3 or .ogg, convert from WAV 1341 if wav_path != output_path: 1342 ffmpeg = shutil.which("ffmpeg") 1343 if ffmpeg: 1344 conv_cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path] 1345 subprocess.run(conv_cmd, check=True, timeout=30) 1346 os.remove(wav_path) 1347 else: 1348 # No ffmpeg — just rename the WAV to the expected path 1349 os.rename(wav_path, output_path) 1350 1351 return output_path 1352 1353 1354 # =========================================================================== 1355 # Provider: Piper (local, neural VITS, 44 languages) 1356 # =========================================================================== 1357 1358 # Module-level cache for Piper voice instances. Voices are keyed on their 1359 # absolute .onnx model path so switching voices doesn't invalidate older 1360 # cached voices. 1361 _piper_voice_cache: Dict[str, Any] = {} 1362 1363 1364 def _check_piper_available() -> bool: 1365 """Check whether the piper-tts package is importable.""" 1366 try: 1367 import importlib.util 1368 return importlib.util.find_spec("piper") is not None 1369 except Exception: 1370 return False 1371 1372 1373 def _get_piper_voices_dir() -> Path: 1374 """Return the directory where Hermes caches Piper voice models. 1375 1376 Resolves to ``~/.hermes/cache/piper-voices/`` under the active 1377 HERMES_HOME so voice downloads follow profile boundaries. 1378 """ 1379 from hermes_constants import get_hermes_dir 1380 root = Path(get_hermes_dir("cache/piper-voices", "piper_voices_cache")) 1381 root.mkdir(parents=True, exist_ok=True) 1382 return root 1383 1384 1385 def _resolve_piper_voice_path(voice: str, download_dir: Path) -> str: 1386 """Resolve *voice* (a model name or path) to a concrete .onnx file path. 1387 1388 Accepts any of: 1389 - Absolute / expanded path to an .onnx file the user already has 1390 - A voice *name* like ``en_US-lessac-medium`` (downloads to 1391 ``download_dir`` on first use via ``python -m piper.download_voices``) 1392 1393 Raises RuntimeError if the model can't be located or downloaded. 1394 """ 1395 if not voice: 1396 voice = DEFAULT_PIPER_VOICE 1397 1398 # Case 1: user gave a direct file path. 1399 candidate = Path(voice).expanduser() 1400 if candidate.suffix.lower() == ".onnx" and candidate.exists(): 1401 return str(candidate) 1402 1403 # Case 2: user gave a voice *name*. See if it's already downloaded. 1404 cached = download_dir / f"{voice}.onnx" 1405 if cached.exists() and (download_dir / f"{voice}.onnx.json").exists(): 1406 return str(cached) 1407 1408 # Case 3: download the voice. piper ships a download helper module. 1409 import sys as _sys 1410 logger.info("[Piper] Downloading voice '%s' to %s (first use)", voice, download_dir) 1411 try: 1412 result = subprocess.run( 1413 [_sys.executable, "-m", "piper.download_voices", voice, 1414 "--download-dir", str(download_dir)], 1415 capture_output=True, text=True, timeout=300, 1416 ) 1417 except subprocess.TimeoutExpired as exc: 1418 raise RuntimeError( 1419 f"Piper voice download timed out after 300s for '{voice}'" 1420 ) from exc 1421 1422 if result.returncode != 0: 1423 stderr = (result.stderr or "").strip() or "no stderr output" 1424 raise RuntimeError( 1425 f"Piper voice download failed for '{voice}': {stderr[:400]}" 1426 ) 1427 1428 if not cached.exists(): 1429 raise RuntimeError( 1430 f"Piper voice download completed but {cached} is missing — " 1431 f"check voice name (see: https://github.com/OHF-Voice/piper1-gpl/" 1432 f"blob/main/docs/VOICES.md)" 1433 ) 1434 return str(cached) 1435 1436 1437 def _generate_piper_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 1438 """Generate speech using the local Piper engine. 1439 1440 Loads the voice model once per process (cached by absolute path) and 1441 writes a WAV file. Caller is responsible for converting to MP3/Opus 1442 via ffmpeg when a different output format is required. 1443 """ 1444 PiperVoice = _import_piper() 1445 import wave 1446 1447 piper_config = tts_config.get("piper", {}) if isinstance(tts_config, dict) else {} 1448 voice_name = piper_config.get("voice") or DEFAULT_PIPER_VOICE 1449 download_dir = Path(piper_config.get("voices_dir") or _get_piper_voices_dir()).expanduser() 1450 download_dir.mkdir(parents=True, exist_ok=True) 1451 use_cuda = bool(piper_config.get("use_cuda", False)) 1452 1453 model_path = _resolve_piper_voice_path(voice_name, download_dir) 1454 1455 cache_key = f"{model_path}::cuda={use_cuda}" 1456 global _piper_voice_cache 1457 if cache_key not in _piper_voice_cache: 1458 logger.info("[Piper] Loading voice: %s", model_path) 1459 _piper_voice_cache[cache_key] = PiperVoice.load(model_path, use_cuda=use_cuda) 1460 logger.info("[Piper] Voice loaded") 1461 voice = _piper_voice_cache[cache_key] 1462 1463 # Optional synthesis knobs — only pass a SynthesisConfig when at least 1464 # one advanced knob is configured, so we don't depend on a newer Piper 1465 # version than the user's installed one unless we need to. 1466 syn_config = None 1467 has_advanced = any( 1468 k in piper_config 1469 for k in ("length_scale", "noise_scale", "noise_w_scale", "volume", "normalize_audio") 1470 ) 1471 if has_advanced: 1472 try: 1473 from piper import SynthesisConfig # type: ignore 1474 syn_config = SynthesisConfig( 1475 length_scale=float(piper_config.get("length_scale", 1.0)), 1476 noise_scale=float(piper_config.get("noise_scale", 0.667)), 1477 noise_w_scale=float(piper_config.get("noise_w_scale", 0.8)), 1478 volume=float(piper_config.get("volume", 1.0)), 1479 normalize_audio=bool(piper_config.get("normalize_audio", True)), 1480 ) 1481 except ImportError: 1482 logger.warning( 1483 "[Piper] SynthesisConfig not available in this piper-tts " 1484 "version — advanced knobs ignored" 1485 ) 1486 1487 # Piper outputs WAV. Caller handles downstream MP3/Opus conversion. 1488 wav_path = output_path 1489 if not output_path.endswith(".wav"): 1490 wav_path = output_path.rsplit(".", 1)[0] + ".wav" 1491 1492 with wave.open(wav_path, "wb") as wav_file: 1493 if syn_config is not None: 1494 voice.synthesize_wav(text, wav_file, syn_config=syn_config) 1495 else: 1496 voice.synthesize_wav(text, wav_file) 1497 1498 # Convert to desired format if caller requested mp3/ogg 1499 if wav_path != output_path: 1500 ffmpeg = shutil.which("ffmpeg") 1501 if ffmpeg: 1502 conv_cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path] 1503 subprocess.run(conv_cmd, check=True, timeout=30) 1504 try: 1505 os.remove(wav_path) 1506 except OSError: 1507 pass 1508 else: 1509 # No ffmpeg — keep WAV and return that path 1510 os.rename(wav_path, output_path) 1511 1512 return output_path 1513 1514 1515 # =========================================================================== 1516 # Provider: KittenTTS (local, lightweight) 1517 # =========================================================================== 1518 1519 # Module-level cache for KittenTTS model instance 1520 _kittentts_model_cache: Dict[str, Any] = {} 1521 1522 1523 def _generate_kittentts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str: 1524 """Generate speech using KittenTTS local ONNX model. 1525 1526 KittenTTS is a lightweight TTS engine (25-80MB models) that runs 1527 entirely on CPU without requiring a GPU or API key. 1528 1529 Args: 1530 text: Text to convert to speech. 1531 output_path: Where to save the audio file. 1532 tts_config: TTS config dict. 1533 1534 Returns: 1535 Path to the saved audio file. 1536 """ 1537 KittenTTS = _import_kittentts() 1538 kt_config = tts_config.get("kittentts", {}) 1539 model_name = kt_config.get("model", DEFAULT_KITTENTTS_MODEL) 1540 voice = kt_config.get("voice", DEFAULT_KITTENTTS_VOICE) 1541 speed = kt_config.get("speed", 1.0) 1542 clean_text = kt_config.get("clean_text", True) 1543 1544 # Use cached model instance if available 1545 global _kittentts_model_cache 1546 if model_name not in _kittentts_model_cache: 1547 logger.info("[KittenTTS] Loading model: %s", model_name) 1548 _kittentts_model_cache[model_name] = KittenTTS(model_name) 1549 logger.info("[KittenTTS] Model loaded successfully") 1550 1551 model = _kittentts_model_cache[model_name] 1552 1553 # Generate audio (returns numpy array at 24kHz) 1554 audio = model.generate(text, voice=voice, speed=speed, clean_text=clean_text) 1555 1556 # Save as WAV 1557 import soundfile as sf 1558 wav_path = output_path 1559 if not output_path.endswith(".wav"): 1560 wav_path = output_path.rsplit(".", 1)[0] + ".wav" 1561 1562 sf.write(wav_path, audio, 24000) 1563 1564 # Convert to desired format if needed 1565 if wav_path != output_path: 1566 ffmpeg = shutil.which("ffmpeg") 1567 if ffmpeg: 1568 conv_cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path] 1569 subprocess.run(conv_cmd, check=True, timeout=30) 1570 os.remove(wav_path) 1571 else: 1572 # No ffmpeg — rename the WAV to the expected path 1573 os.rename(wav_path, output_path) 1574 1575 return output_path 1576 1577 1578 # =========================================================================== 1579 # Main tool function 1580 # =========================================================================== 1581 def text_to_speech_tool( 1582 text: str, 1583 output_path: Optional[str] = None, 1584 ) -> str: 1585 """ 1586 Convert text to speech audio. 1587 1588 Reads provider/voice config from ~/.hermes/config.yaml (tts: section). 1589 The model sends text; the user configures voice and provider. 1590 1591 On messaging platforms, the returned MEDIA:<path> tag is intercepted 1592 by the send pipeline and delivered as a native voice message. 1593 In CLI mode, the file is saved to ~/voice-memos/. 1594 1595 Args: 1596 text: The text to convert to speech. 1597 output_path: Optional custom save path. Defaults to ~/voice-memos/<timestamp>.mp3 1598 1599 Returns: 1600 str: JSON result with success, file_path, and optionally MEDIA tag. 1601 """ 1602 if not text or not text.strip(): 1603 return tool_error("Text is required", success=False) 1604 1605 tts_config = _load_tts_config() 1606 provider = _get_provider(tts_config) 1607 1608 # User-declared command provider (type: command under tts.providers.<name>) 1609 # resolves BEFORE the built-in dispatch. Built-in names short-circuit here 1610 # so a user's ``tts.providers.openai.command`` can't override the real 1611 # OpenAI handler. 1612 command_provider_config = _resolve_command_provider_config(provider, tts_config) 1613 1614 # Truncate very long text with a warning. The cap is per-provider 1615 # (OpenAI 4096, xAI 15k, MiniMax 10k, ElevenLabs model-aware, etc.). 1616 max_len = _resolve_max_text_length(provider, tts_config) 1617 if len(text) > max_len: 1618 logger.warning( 1619 "TTS text too long for provider %s (%d chars), truncating to %d", 1620 provider, len(text), max_len, 1621 ) 1622 text = text[:max_len] 1623 1624 # Detect platform from gateway env var to choose the best output format. 1625 # Telegram voice bubbles require Opus (.ogg); OpenAI and ElevenLabs can 1626 # produce Opus natively (no ffmpeg needed). Edge TTS always outputs MP3 1627 # and needs ffmpeg for conversion. 1628 from gateway.session_context import get_session_env 1629 platform = get_session_env("HERMES_SESSION_PLATFORM", "").lower() 1630 want_opus = (platform == "telegram") 1631 1632 # Determine output path 1633 if output_path: 1634 file_path = Path(output_path).expanduser() 1635 if command_provider_config is not None: 1636 # Respect caller-supplied path but align the extension with the 1637 # provider's configured output_format so the command writes to a 1638 # path the caller actually expects. 1639 file_path = _configured_command_tts_output_path( 1640 file_path, command_provider_config 1641 ) 1642 else: 1643 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 1644 out_dir = Path(DEFAULT_OUTPUT_DIR) 1645 out_dir.mkdir(parents=True, exist_ok=True) 1646 if command_provider_config is not None: 1647 fmt = _get_command_tts_output_format(command_provider_config) 1648 file_path = out_dir / f"tts_{timestamp}.{fmt}" 1649 # Use .ogg for Telegram with providers that support native Opus output, 1650 # otherwise fall back to .mp3 (Edge TTS will attempt ffmpeg conversion later). 1651 elif want_opus and provider in ("openai", "elevenlabs", "mistral", "gemini"): 1652 file_path = out_dir / f"tts_{timestamp}.ogg" 1653 else: 1654 file_path = out_dir / f"tts_{timestamp}.mp3" 1655 1656 # Ensure parent directory exists 1657 requested_file_path = file_path 1658 1659 # Some providers cannot natively write OGG/Opus. If the caller explicitly 1660 # requested .ogg for one of those providers, generate into a truthful 1661 # sibling path first (.mp3 or .wav) and let the post-processing step 1662 # convert it when possible. 1663 if command_provider_config is None: 1664 file_path = _rewrite_requested_output_path_for_truthful_generation(provider, file_path) 1665 1666 file_path.parent.mkdir(parents=True, exist_ok=True) 1667 file_str = str(file_path) 1668 1669 try: 1670 # Generate audio with the configured provider 1671 if command_provider_config is not None: 1672 logger.info( 1673 "Generating speech with command TTS provider '%s'...", provider, 1674 ) 1675 file_str = _generate_command_tts( 1676 text, file_str, provider, command_provider_config, tts_config, 1677 ) 1678 1679 elif provider == "elevenlabs": 1680 try: 1681 _import_elevenlabs() 1682 except ImportError: 1683 return json.dumps({ 1684 "success": False, 1685 "error": "ElevenLabs provider selected but 'elevenlabs' package not installed. Run: pip install elevenlabs" 1686 }, ensure_ascii=False) 1687 logger.info("Generating speech with ElevenLabs...") 1688 _generate_elevenlabs(text, file_str, tts_config) 1689 1690 elif provider == "openai": 1691 try: 1692 _import_openai_client() 1693 except ImportError: 1694 return json.dumps({ 1695 "success": False, 1696 "error": "OpenAI provider selected but 'openai' package not installed." 1697 }, ensure_ascii=False) 1698 logger.info("Generating speech with OpenAI TTS...") 1699 _generate_openai_tts(text, file_str, tts_config) 1700 1701 elif provider == "minimax": 1702 logger.info("Generating speech with MiniMax TTS...") 1703 _generate_minimax_tts(text, file_str, tts_config) 1704 1705 elif provider == "xai": 1706 logger.info("Generating speech with xAI TTS...") 1707 _generate_xai_tts(text, file_str, tts_config) 1708 1709 elif provider == "mistral": 1710 try: 1711 _import_mistral_client() 1712 except ImportError: 1713 return json.dumps({ 1714 "success": False, 1715 "error": "Mistral provider selected but 'mistralai' package not installed. " 1716 "Run: pip install 'hermes-agent[mistral]'" 1717 }, ensure_ascii=False) 1718 logger.info("Generating speech with Mistral Voxtral TTS...") 1719 _generate_mistral_tts(text, file_str, tts_config) 1720 1721 elif provider == "gemini": 1722 logger.info("Generating speech with Google Gemini TTS...") 1723 _generate_gemini_tts(text, file_str, tts_config) 1724 1725 elif provider == "neutts": 1726 if not _check_neutts_available(): 1727 return json.dumps({ 1728 "success": False, 1729 "error": "NeuTTS provider selected but neutts is not installed. " 1730 "Run hermes setup and choose NeuTTS, or install espeak-ng and run python -m pip install -U neutts[all]." 1731 }, ensure_ascii=False) 1732 logger.info("Generating speech with NeuTTS (local)...") 1733 _generate_neutts(text, file_str, tts_config) 1734 1735 elif provider == "kittentts": 1736 try: 1737 _import_kittentts() 1738 except ImportError: 1739 return json.dumps({ 1740 "success": False, 1741 "error": "KittenTTS provider selected but 'kittentts' package not installed. " 1742 "Run 'hermes setup tts' and choose KittenTTS, or install manually: " 1743 "pip install https://github.com/KittenML/KittenTTS/releases/download/0.8.1/kittentts-0.8.1-py3-none-any.whl" 1744 }, ensure_ascii=False) 1745 logger.info("Generating speech with KittenTTS (local, ~25MB)...") 1746 _generate_kittentts(text, file_str, tts_config) 1747 1748 elif provider == "piper": 1749 try: 1750 _import_piper() 1751 except ImportError: 1752 return json.dumps({ 1753 "success": False, 1754 "error": "Piper provider selected but 'piper-tts' package not installed. " 1755 "Run 'hermes tools' and select Piper under TTS, or install manually: " 1756 "pip install piper-tts", 1757 }, ensure_ascii=False) 1758 logger.info("Generating speech with Piper (local)...") 1759 _generate_piper_tts(text, file_str, tts_config) 1760 1761 else: 1762 # Default: Edge TTS (free), with NeuTTS as local fallback 1763 edge_available = True 1764 try: 1765 _import_edge_tts() 1766 except ImportError: 1767 edge_available = False 1768 1769 if edge_available: 1770 logger.info("Generating speech with Edge TTS...") 1771 try: 1772 import concurrent.futures 1773 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: 1774 pool.submit( 1775 lambda: asyncio.run(_generate_edge_tts(text, file_str, tts_config)) 1776 ).result(timeout=60) 1777 except RuntimeError: 1778 asyncio.run(_generate_edge_tts(text, file_str, tts_config)) 1779 elif _check_neutts_available(): 1780 logger.info("Edge TTS not available, falling back to NeuTTS (local)...") 1781 provider = "neutts" 1782 if command_provider_config is None: 1783 file_path = _rewrite_requested_output_path_for_truthful_generation(provider, file_path) 1784 file_path.parent.mkdir(parents=True, exist_ok=True) 1785 file_str = str(file_path) 1786 _generate_neutts(text, file_str, tts_config) 1787 else: 1788 return json.dumps({ 1789 "success": False, 1790 "error": "No TTS provider available. Install edge-tts (pip install edge-tts) " 1791 "or set up NeuTTS for local synthesis." 1792 }, ensure_ascii=False) 1793 1794 # Check the file was actually created 1795 if not os.path.exists(file_str) or os.path.getsize(file_str) == 0: 1796 return json.dumps({ 1797 "success": False, 1798 "error": f"TTS generation produced no output (provider: {provider})" 1799 }, ensure_ascii=False) 1800 1801 # Try Opus conversion for Telegram compatibility 1802 # Edge TTS outputs MP3, NeuTTS/KittenTTS output WAV — all need ffmpeg conversion 1803 voice_compatible = False 1804 if command_provider_config is not None: 1805 # Command providers are documents by default. Voice-bubble 1806 # delivery only kicks in when the user explicitly opts in 1807 # via ``voice_compatible: true`` in their provider config. 1808 if _is_command_tts_voice_compatible(command_provider_config): 1809 if not file_str.endswith(".ogg"): 1810 opus_path = _convert_to_opus(file_str) 1811 if opus_path: 1812 file_str = opus_path 1813 voice_compatible = file_str.endswith(".ogg") 1814 elif provider in ("edge", "neutts", "minimax", "xai", "kittentts", "piper") and not file_str.endswith(".ogg"): 1815 conversion_input = file_str 1816 opus_path = _convert_to_opus(conversion_input) 1817 if opus_path: 1818 file_str = opus_path 1819 voice_compatible = True 1820 if Path(conversion_input) != Path(file_str) and Path(conversion_input) != requested_file_path: 1821 try: 1822 os.remove(conversion_input) 1823 except OSError: 1824 pass 1825 elif provider in ("elevenlabs", "openai", "mistral", "gemini"): 1826 voice_compatible = file_str.endswith(".ogg") 1827 1828 file_size = os.path.getsize(file_str) 1829 logger.info("TTS audio saved: %s (%s bytes, provider: %s)", file_str, f"{file_size:,}", provider) 1830 1831 # Build response with MEDIA tag for platform delivery 1832 media_tag = f"MEDIA:{file_str}" 1833 if voice_compatible: 1834 media_tag = f"[[audio_as_voice]]\n{media_tag}" 1835 1836 return json.dumps({ 1837 "success": True, 1838 "file_path": file_str, 1839 "media_tag": media_tag, 1840 "provider": provider, 1841 "voice_compatible": voice_compatible, 1842 }, ensure_ascii=False) 1843 1844 except ValueError as e: 1845 # Configuration errors (missing API keys, etc.) 1846 error_msg = f"TTS configuration error ({provider}): {e}" 1847 logger.error("%s", error_msg) 1848 return tool_error(error_msg, success=False) 1849 except FileNotFoundError as e: 1850 # Missing dependencies or files 1851 error_msg = f"TTS dependency missing ({provider}): {e}" 1852 logger.error("%s", error_msg, exc_info=True) 1853 return tool_error(error_msg, success=False) 1854 except Exception as e: 1855 # Unexpected errors 1856 error_msg = f"TTS generation failed ({provider}): {e}" 1857 logger.error("%s", error_msg, exc_info=True) 1858 return tool_error(error_msg, success=False) 1859 1860 1861 # =========================================================================== 1862 # Requirements check 1863 # =========================================================================== 1864 def check_tts_requirements() -> bool: 1865 """ 1866 Check if at least one TTS provider is available. 1867 1868 Edge TTS needs no API key and is the default, so if the package 1869 is installed, TTS is available. A user-declared command provider 1870 also satisfies the requirement. 1871 1872 Returns: 1873 bool: True if at least one provider can work. 1874 """ 1875 # Any configured command provider counts as available. 1876 if _has_any_command_tts_provider(): 1877 return True 1878 try: 1879 _import_edge_tts() 1880 return True 1881 except ImportError: 1882 pass 1883 try: 1884 _import_elevenlabs() 1885 if get_env_value("ELEVENLABS_API_KEY"): 1886 return True 1887 except ImportError: 1888 pass 1889 try: 1890 _import_openai_client() 1891 if _has_openai_audio_backend(): 1892 return True 1893 except ImportError: 1894 pass 1895 if get_env_value("MINIMAX_API_KEY"): 1896 return True 1897 if get_env_value("XAI_API_KEY"): 1898 return True 1899 if get_env_value("GEMINI_API_KEY") or get_env_value("GOOGLE_API_KEY"): 1900 return True 1901 try: 1902 _import_mistral_client() 1903 if get_env_value("MISTRAL_API_KEY"): 1904 return True 1905 except ImportError: 1906 pass 1907 if _check_neutts_available(): 1908 return True 1909 if _check_kittentts_available(): 1910 return True 1911 if _check_piper_available(): 1912 return True 1913 return False 1914 1915 1916 def _resolve_openai_audio_client_config() -> tuple[str, str]: 1917 """Return direct OpenAI audio config or a managed gateway fallback. 1918 1919 When ``tts.use_gateway`` is set in config, the Tool Gateway is preferred 1920 even if direct OpenAI credentials are present. 1921 """ 1922 direct_api_key = resolve_openai_audio_api_key() 1923 if direct_api_key and not prefers_gateway("tts"): 1924 return direct_api_key, DEFAULT_OPENAI_BASE_URL 1925 1926 managed_gateway = resolve_managed_tool_gateway("openai-audio") 1927 if managed_gateway is None: 1928 message = "Neither VOICE_TOOLS_OPENAI_KEY nor OPENAI_API_KEY is set" 1929 if managed_nous_tools_enabled(): 1930 message += ", and the managed OpenAI audio gateway is unavailable" 1931 raise ValueError(message) 1932 1933 return managed_gateway.nous_user_token, urljoin( 1934 f"{managed_gateway.gateway_origin.rstrip('/')}/", "v1" 1935 ) 1936 1937 1938 def _has_openai_audio_backend() -> bool: 1939 """Return True when OpenAI audio can use direct credentials or the managed gateway.""" 1940 return bool(resolve_openai_audio_api_key() or resolve_managed_tool_gateway("openai-audio")) 1941 1942 1943 # =========================================================================== 1944 # Streaming TTS: sentence-by-sentence pipeline for ElevenLabs 1945 # =========================================================================== 1946 # Sentence boundary pattern: punctuation followed by space or newline 1947 _SENTENCE_BOUNDARY_RE = re.compile(r'(?<=[.!?])(?:\s|\n)|(?:\n\n)') 1948 1949 # Markdown stripping patterns (same as cli.py _voice_speak_response) 1950 _MD_CODE_BLOCK = re.compile(r'```[\s\S]*?```') 1951 _MD_LINK = re.compile(r'\[([^\]]+)\]\([^)]+\)') 1952 _MD_URL = re.compile(r'https?://\S+') 1953 _MD_BOLD = re.compile(r'\*\*(.+?)\*\*') 1954 _MD_ITALIC = re.compile(r'\*(.+?)\*') 1955 _MD_INLINE_CODE = re.compile(r'`(.+?)`') 1956 _MD_HEADER = re.compile(r'^#+\s*', flags=re.MULTILINE) 1957 _MD_LIST_ITEM = re.compile(r'^\s*[-*]\s+', flags=re.MULTILINE) 1958 _MD_HR = re.compile(r'---+') 1959 _MD_EXCESS_NL = re.compile(r'\n{3,}') 1960 1961 1962 def _strip_markdown_for_tts(text: str) -> str: 1963 """Remove markdown formatting that shouldn't be spoken aloud.""" 1964 text = _MD_CODE_BLOCK.sub(' ', text) 1965 text = _MD_LINK.sub(r'\1', text) 1966 text = _MD_URL.sub('', text) 1967 text = _MD_BOLD.sub(r'\1', text) 1968 text = _MD_ITALIC.sub(r'\1', text) 1969 text = _MD_INLINE_CODE.sub(r'\1', text) 1970 text = _MD_HEADER.sub('', text) 1971 text = _MD_LIST_ITEM.sub('', text) 1972 text = _MD_HR.sub('', text) 1973 text = _MD_EXCESS_NL.sub('\n\n', text) 1974 return text.strip() 1975 1976 1977 def stream_tts_to_speaker( 1978 text_queue: queue.Queue, 1979 stop_event: threading.Event, 1980 tts_done_event: threading.Event, 1981 display_callback: Optional[Callable[[str], None]] = None, 1982 ): 1983 """Consume text deltas from *text_queue*, buffer them into sentences, 1984 and stream each sentence through ElevenLabs TTS to the speaker in 1985 real-time. 1986 1987 Protocol: 1988 * The producer puts ``str`` deltas onto *text_queue*. 1989 * A ``None`` sentinel signals end-of-text (flush remaining buffer). 1990 * *stop_event* can be set to abort early (e.g. user interrupt). 1991 * *tts_done_event* is **set** in the ``finally`` block so callers 1992 waiting on it (continuous voice mode) know playback is finished. 1993 """ 1994 tts_done_event.clear() 1995 1996 try: 1997 # --- TTS client setup (optional -- display_callback works without it) --- 1998 client = None 1999 output_stream = None 2000 voice_id = DEFAULT_ELEVENLABS_VOICE_ID 2001 model_id = DEFAULT_ELEVENLABS_STREAMING_MODEL_ID 2002 2003 tts_config = _load_tts_config() 2004 el_config = tts_config.get("elevenlabs", {}) 2005 voice_id = el_config.get("voice_id", voice_id) 2006 model_id = el_config.get("streaming_model_id", 2007 el_config.get("model_id", model_id)) 2008 # Per-sentence cap for the streaming path. Look up the cap against 2009 # the *streaming* model_id (defaults to eleven_flash_v2_5 = 40k chars), 2010 # not the sync model_id. A user override 2011 # (tts.elevenlabs.max_text_length) still wins. 2012 stream_max_len = _resolve_max_text_length( 2013 "elevenlabs", 2014 {**tts_config, "elevenlabs": {**el_config, "model_id": model_id}}, 2015 ) 2016 2017 api_key = (get_env_value("ELEVENLABS_API_KEY") or "") 2018 if not api_key: 2019 logger.warning("ELEVENLABS_API_KEY not set; streaming TTS audio disabled") 2020 else: 2021 try: 2022 ElevenLabs = _import_elevenlabs() 2023 client = ElevenLabs(api_key=api_key) 2024 except ImportError: 2025 logger.warning("elevenlabs package not installed; streaming TTS disabled") 2026 2027 # Open a single sounddevice output stream for the lifetime of 2028 # this function. ElevenLabs pcm_24000 produces signed 16-bit 2029 # little-endian mono PCM at 24 kHz. 2030 if client is not None: 2031 try: 2032 sd = _import_sounddevice() 2033 output_stream = sd.OutputStream( 2034 samplerate=24000, channels=1, dtype="int16", 2035 ) 2036 output_stream.start() 2037 except (ImportError, OSError) as exc: 2038 logger.debug("sounddevice not available: %s", exc) 2039 output_stream = None 2040 except Exception as exc: 2041 logger.warning("sounddevice OutputStream failed: %s", exc) 2042 output_stream = None 2043 2044 sentence_buf = "" 2045 min_sentence_len = 20 2046 long_flush_len = 100 2047 queue_timeout = 0.5 2048 _spoken_sentences: list[str] = [] # track spoken sentences to skip duplicates 2049 # Regex to strip complete <think>...</think> blocks from buffer 2050 _think_block_re = re.compile(r'<think[\s>].*?</think>', flags=re.DOTALL) 2051 2052 def _speak_sentence(sentence: str): 2053 """Display sentence and optionally generate + play audio.""" 2054 if stop_event.is_set(): 2055 return 2056 cleaned = _strip_markdown_for_tts(sentence).strip() 2057 if not cleaned: 2058 return 2059 # Skip duplicate/near-duplicate sentences (LLM repetition) 2060 cleaned_lower = cleaned.lower().rstrip(".!,") 2061 for prev in _spoken_sentences: 2062 if prev.lower().rstrip(".!,") == cleaned_lower: 2063 return 2064 _spoken_sentences.append(cleaned) 2065 # Display raw sentence on screen before TTS processing 2066 if display_callback is not None: 2067 display_callback(sentence) 2068 # Skip audio generation if no TTS client available 2069 if client is None: 2070 return 2071 # Truncate very long sentences (ElevenLabs streaming path) 2072 if len(cleaned) > stream_max_len: 2073 cleaned = cleaned[:stream_max_len] 2074 try: 2075 audio_iter = client.text_to_speech.convert( 2076 text=cleaned, 2077 voice_id=voice_id, 2078 model_id=model_id, 2079 output_format="pcm_24000", 2080 ) 2081 if output_stream is not None: 2082 for chunk in audio_iter: 2083 if stop_event.is_set(): 2084 break 2085 import numpy as _np 2086 audio_array = _np.frombuffer(chunk, dtype=_np.int16) 2087 output_stream.write(audio_array.reshape(-1, 1)) 2088 else: 2089 # Fallback: write chunks to temp file and play via system player 2090 _play_via_tempfile(audio_iter, stop_event) 2091 except Exception as exc: 2092 logger.warning("Streaming TTS sentence failed: %s", exc) 2093 2094 def _play_via_tempfile(audio_iter, stop_evt): 2095 """Write PCM chunks to a temp WAV file and play it.""" 2096 tmp_path = None 2097 try: 2098 import wave 2099 tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) 2100 tmp_path = tmp.name 2101 with wave.open(tmp, "wb") as wf: 2102 wf.setnchannels(1) 2103 wf.setsampwidth(2) # 16-bit 2104 wf.setframerate(24000) 2105 for chunk in audio_iter: 2106 if stop_evt.is_set(): 2107 break 2108 wf.writeframes(chunk) 2109 from tools.voice_mode import play_audio_file 2110 play_audio_file(tmp_path) 2111 except Exception as exc: 2112 logger.warning("Temp-file TTS fallback failed: %s", exc) 2113 finally: 2114 if tmp_path: 2115 try: 2116 os.unlink(tmp_path) 2117 except OSError: 2118 pass 2119 2120 while not stop_event.is_set(): 2121 # Read next delta from queue 2122 try: 2123 delta = text_queue.get(timeout=queue_timeout) 2124 except queue.Empty: 2125 # Timeout: if we have accumulated a long buffer, flush it 2126 if len(sentence_buf) > long_flush_len: 2127 _speak_sentence(sentence_buf) 2128 sentence_buf = "" 2129 continue 2130 2131 if delta is None: 2132 # End-of-text sentinel: strip any remaining think blocks, flush 2133 sentence_buf = _think_block_re.sub('', sentence_buf) 2134 if sentence_buf.strip(): 2135 _speak_sentence(sentence_buf) 2136 break 2137 2138 sentence_buf += delta 2139 2140 # --- Think block filtering --- 2141 # Strip complete <think>...</think> blocks from buffer. 2142 # Works correctly even when tags span multiple deltas. 2143 sentence_buf = _think_block_re.sub('', sentence_buf) 2144 2145 # If an incomplete <think tag is at the end, wait for more data 2146 # before extracting sentences (the closing tag may arrive next). 2147 if '<think' in sentence_buf and '</think>' not in sentence_buf: 2148 continue 2149 2150 # Check for sentence boundaries 2151 while True: 2152 m = _SENTENCE_BOUNDARY_RE.search(sentence_buf) 2153 if m is None: 2154 break 2155 end_pos = m.end() 2156 sentence = sentence_buf[:end_pos] 2157 sentence_buf = sentence_buf[end_pos:] 2158 # Merge short fragments into the next sentence 2159 if len(sentence.strip()) < min_sentence_len: 2160 sentence_buf = sentence + sentence_buf 2161 break 2162 _speak_sentence(sentence) 2163 2164 # Drain any remaining items from the queue 2165 while True: 2166 try: 2167 text_queue.get_nowait() 2168 except queue.Empty: 2169 break 2170 2171 # output_stream is closed in the finally block below 2172 2173 except Exception as exc: 2174 logger.warning("Streaming TTS pipeline error: %s", exc) 2175 finally: 2176 # Always close the audio output stream to avoid locking the device 2177 if output_stream is not None: 2178 try: 2179 output_stream.stop() 2180 output_stream.close() 2181 except Exception: 2182 pass 2183 tts_done_event.set() 2184 2185 2186 # =========================================================================== 2187 # Main -- quick diagnostics 2188 # =========================================================================== 2189 if __name__ == "__main__": 2190 print("🔊 Text-to-Speech Tool Module") 2191 print("=" * 50) 2192 2193 def _check(importer, label): 2194 try: 2195 importer() 2196 return True 2197 except ImportError: 2198 return False 2199 2200 print("\nProvider availability:") 2201 print(f" Edge TTS: {'installed' if _check(_import_edge_tts, 'edge') else 'not installed (pip install edge-tts)'}") 2202 print(f" ElevenLabs: {'installed' if _check(_import_elevenlabs, 'el') else 'not installed (pip install elevenlabs)'}") 2203 print(f" API Key: {'set' if get_env_value('ELEVENLABS_API_KEY') else 'not set'}") 2204 print(f" OpenAI: {'installed' if _check(_import_openai_client, 'oai') else 'not installed'}") 2205 print( 2206 " API Key: " 2207 f"{'set' if resolve_openai_audio_api_key() else 'not set (VOICE_TOOLS_OPENAI_KEY or OPENAI_API_KEY)'}" 2208 ) 2209 print(f" MiniMax: {'API key set' if get_env_value('MINIMAX_API_KEY') else 'not set (MINIMAX_API_KEY)'}") 2210 print(f" Piper: {'installed' if _check_piper_available() else 'not installed (pip install piper-tts)'}") 2211 print(f" ffmpeg: {'✅ found' if _has_ffmpeg() else '❌ not found (needed for Telegram Opus)'}") 2212 print(f"\n Output dir: {DEFAULT_OUTPUT_DIR}") 2213 2214 config = _load_tts_config() 2215 provider = _get_provider(config) 2216 print(f" Configured provider: {provider}") 2217 2218 2219 # --------------------------------------------------------------------------- 2220 # Registry 2221 # --------------------------------------------------------------------------- 2222 from tools.registry import registry, tool_error 2223 2224 TTS_SCHEMA = { 2225 "name": "text_to_speech", 2226 "description": "Convert text to speech audio. Returns a MEDIA: path that the platform delivers as native audio. Compatible providers render as a voice bubble on Telegram; otherwise audio is sent as a regular attachment. In CLI mode, saves to ~/voice-memos/. Voice and provider are user-configured (built-in providers like edge/openai or custom command providers under tts.providers.<name>), not model-selected.", 2227 "parameters": { 2228 "type": "object", 2229 "properties": { 2230 "text": { 2231 "type": "string", 2232 "description": "The text to convert to speech. Provider-specific character caps apply and are enforced automatically (OpenAI 4096, xAI 15000, MiniMax 10000, ElevenLabs 5k-40k depending on model); over-long input is truncated." 2233 }, 2234 "output_path": { 2235 "type": "string", 2236 "description": f"Optional custom file path to save the audio. Defaults to {display_hermes_home()}/audio_cache/<timestamp>.mp3" 2237 } 2238 }, 2239 "required": ["text"] 2240 } 2241 } 2242 2243 registry.register( 2244 name="text_to_speech", 2245 toolset="tts", 2246 schema=TTS_SCHEMA, 2247 handler=lambda args, **kw: text_to_speech_tool( 2248 text=args.get("text", ""), 2249 output_path=args.get("output_path")), 2250 check_fn=check_tts_requirements, 2251 emoji="🔊", 2252 )