test_voice_cli_integration.py
1 """Tests for CLI voice mode integration -- command parsing, markdown stripping, 2 state management, streaming TTS activation, voice message prefix, _vprint.""" 3 4 import ast 5 import os 6 import queue 7 import threading 8 from types import SimpleNamespace 9 from unittest.mock import MagicMock, patch 10 11 import pytest 12 13 14 def _make_voice_cli(**overrides): 15 """Create a minimal HermesCLI with only voice-related attrs initialized. 16 17 Uses ``__new__()`` to bypass ``__init__`` so no config/env/API setup is 18 needed. Only the voice state attributes (from __init__ lines 3749-3758) 19 are populated. 20 """ 21 from cli import HermesCLI 22 23 cli = HermesCLI.__new__(HermesCLI) 24 cli._voice_lock = threading.Lock() 25 cli._voice_mode = False 26 cli._voice_tts = False 27 cli._voice_recorder = None 28 cli._voice_recording = False 29 cli._voice_processing = False 30 cli._voice_continuous = False 31 cli._voice_tts_done = threading.Event() 32 cli._voice_tts_done.set() 33 cli._pending_input = queue.Queue() 34 cli._app = None 35 cli._attached_images = [] 36 cli.console = SimpleNamespace(width=80) 37 for k, v in overrides.items(): 38 setattr(cli, k, v) 39 return cli 40 41 42 # ============================================================================ 43 # Markdown stripping — import real function from tts_tool 44 # ============================================================================ 45 46 from tools.tts_tool import _strip_markdown_for_tts 47 48 49 class TestMarkdownStripping: 50 def test_strips_bold(self): 51 assert _strip_markdown_for_tts("This is **bold** text") == "This is bold text" 52 53 def test_strips_italic(self): 54 assert _strip_markdown_for_tts("This is *italic* text") == "This is italic text" 55 56 def test_strips_inline_code(self): 57 assert _strip_markdown_for_tts("Run `pip install foo`") == "Run pip install foo" 58 59 def test_strips_fenced_code_blocks(self): 60 text = "Here is code:\n```python\nprint('hello')\n```\nDone." 61 result = _strip_markdown_for_tts(text) 62 assert "print" not in result 63 assert "Done." in result 64 65 def test_strips_headers(self): 66 assert _strip_markdown_for_tts("## Summary\nSome text") == "Summary\nSome text" 67 68 def test_strips_list_markers(self): 69 text = "- item one\n- item two\n* item three" 70 result = _strip_markdown_for_tts(text) 71 assert "item one" in result 72 assert "- " not in result 73 assert "* " not in result 74 75 def test_strips_urls(self): 76 text = "Visit https://example.com for details" 77 result = _strip_markdown_for_tts(text) 78 assert "https://" not in result 79 assert "Visit" in result 80 81 def test_strips_markdown_links(self): 82 text = "See [the docs](https://example.com/docs) for info" 83 result = _strip_markdown_for_tts(text) 84 assert "the docs" in result 85 assert "https://" not in result 86 assert "[" not in result 87 88 def test_strips_horizontal_rules(self): 89 text = "Part one\n---\nPart two" 90 result = _strip_markdown_for_tts(text) 91 assert "---" not in result 92 assert "Part one" in result 93 assert "Part two" in result 94 95 def test_empty_after_stripping_returns_empty(self): 96 text = "```python\nprint('hello')\n```" 97 result = _strip_markdown_for_tts(text) 98 assert result == "" 99 100 def test_long_text_not_truncated(self): 101 """_strip_markdown_for_tts does NOT truncate — that's the caller's job.""" 102 text = "a" * 5000 103 result = _strip_markdown_for_tts(text) 104 assert len(result) == 5000 105 106 def test_complex_response(self): 107 text = ( 108 "## Answer\n\n" 109 "Here's how to do it:\n\n" 110 "```python\ndef hello():\n print('hi')\n```\n\n" 111 "Run it with `python main.py`. " 112 "See [docs](https://example.com) for more.\n\n" 113 "- Step one\n- Step two\n\n" 114 "---\n\n" 115 "**Good luck!**" 116 ) 117 result = _strip_markdown_for_tts(text) 118 assert "```" not in result 119 assert "https://" not in result 120 assert "**" not in result 121 assert "---" not in result 122 assert "Answer" in result 123 assert "Good luck!" in result 124 assert "docs" in result 125 126 127 # ============================================================================ 128 # Voice command parsing 129 # ============================================================================ 130 131 class TestVoiceCommandParsing: 132 """Test _handle_voice_command logic without full CLI setup.""" 133 134 def test_parse_subcommands(self): 135 """Verify subcommand extraction from /voice commands.""" 136 test_cases = [ 137 ("/voice on", "on"), 138 ("/voice off", "off"), 139 ("/voice tts", "tts"), 140 ("/voice status", "status"), 141 ("/voice", ""), 142 ("/voice ON ", "on"), 143 ] 144 for command, expected in test_cases: 145 parts = command.strip().split(maxsplit=1) 146 subcommand = parts[1].lower().strip() if len(parts) > 1 else "" 147 assert subcommand == expected, f"Failed for {command!r}: got {subcommand!r}" 148 149 150 # ============================================================================ 151 # Voice state thread safety 152 # ============================================================================ 153 154 class TestVoiceStateLock: 155 def test_lock_protects_state(self): 156 """Verify that concurrent state changes don't corrupt state.""" 157 lock = threading.Lock() 158 state = {"recording": False, "count": 0} 159 160 def toggle_many(n): 161 for _ in range(n): 162 with lock: 163 state["recording"] = not state["recording"] 164 state["count"] += 1 165 166 threads = [threading.Thread(target=toggle_many, args=(1000,)) for _ in range(4)] 167 for t in threads: 168 t.start() 169 for t in threads: 170 t.join() 171 172 assert state["count"] == 4000 173 174 175 # ============================================================================ 176 # Streaming TTS lazy import activation (Bug A fix) 177 # ============================================================================ 178 179 class TestStreamingTTSActivation: 180 """Verify streaming TTS uses lazy imports to check availability.""" 181 182 def test_activates_when_elevenlabs_and_sounddevice_available(self): 183 """use_streaming_tts should be True when provider is elevenlabs 184 and both lazy imports succeed.""" 185 use_streaming_tts = False 186 try: 187 from tools.tts_tool import ( 188 _load_tts_config as _load_tts_cfg, 189 _get_provider as _get_prov, 190 _import_elevenlabs, 191 _import_sounddevice, 192 ) 193 assert callable(_import_elevenlabs) 194 assert callable(_import_sounddevice) 195 except ImportError: 196 pytest.skip("tools.tts_tool not available") 197 198 with patch("tools.tts_tool._load_tts_config") as mock_cfg, \ 199 patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \ 200 patch("tools.tts_tool._import_elevenlabs") as mock_el, \ 201 patch("tools.tts_tool._import_sounddevice") as mock_sd: 202 mock_cfg.return_value = {"provider": "elevenlabs"} 203 mock_el.return_value = MagicMock() 204 mock_sd.return_value = MagicMock() 205 206 from tools.tts_tool import ( 207 _load_tts_config as load_cfg, 208 _get_provider as get_prov, 209 _import_elevenlabs as import_el, 210 _import_sounddevice as import_sd, 211 ) 212 cfg = load_cfg() 213 if get_prov(cfg) == "elevenlabs": 214 import_el() 215 import_sd() 216 use_streaming_tts = True 217 218 assert use_streaming_tts is True 219 220 def test_does_not_activate_when_elevenlabs_missing(self): 221 """use_streaming_tts stays False when elevenlabs import fails.""" 222 use_streaming_tts = False 223 with patch("tools.tts_tool._load_tts_config", return_value={"provider": "elevenlabs"}), \ 224 patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \ 225 patch("tools.tts_tool._import_elevenlabs", side_effect=ImportError("no elevenlabs")): 226 try: 227 from tools.tts_tool import ( 228 _load_tts_config as load_cfg, 229 _get_provider as get_prov, 230 _import_elevenlabs as import_el, 231 _import_sounddevice as import_sd, 232 ) 233 cfg = load_cfg() 234 if get_prov(cfg) == "elevenlabs": 235 import_el() 236 import_sd() 237 use_streaming_tts = True 238 except (ImportError, OSError): 239 pass 240 241 assert use_streaming_tts is False 242 243 def test_does_not_activate_when_sounddevice_missing(self): 244 """use_streaming_tts stays False when sounddevice import fails.""" 245 use_streaming_tts = False 246 with patch("tools.tts_tool._load_tts_config", return_value={"provider": "elevenlabs"}), \ 247 patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \ 248 patch("tools.tts_tool._import_elevenlabs", return_value=MagicMock()), \ 249 patch("tools.tts_tool._import_sounddevice", side_effect=OSError("no PortAudio")): 250 try: 251 from tools.tts_tool import ( 252 _load_tts_config as load_cfg, 253 _get_provider as get_prov, 254 _import_elevenlabs as import_el, 255 _import_sounddevice as import_sd, 256 ) 257 cfg = load_cfg() 258 if get_prov(cfg) == "elevenlabs": 259 import_el() 260 import_sd() 261 use_streaming_tts = True 262 except (ImportError, OSError): 263 pass 264 265 assert use_streaming_tts is False 266 267 def test_does_not_activate_for_non_elevenlabs_provider(self): 268 """use_streaming_tts stays False when provider is not elevenlabs.""" 269 use_streaming_tts = False 270 with patch("tools.tts_tool._load_tts_config", return_value={"provider": "edge"}), \ 271 patch("tools.tts_tool._get_provider", return_value="edge"): 272 try: 273 from tools.tts_tool import ( 274 _load_tts_config as load_cfg, 275 _get_provider as get_prov, 276 _import_elevenlabs as import_el, 277 _import_sounddevice as import_sd, 278 ) 279 cfg = load_cfg() 280 if get_prov(cfg) == "elevenlabs": 281 import_el() 282 import_sd() 283 use_streaming_tts = True 284 except (ImportError, OSError): 285 pass 286 287 assert use_streaming_tts is False 288 289 def test_stale_boolean_imports_no_longer_exist(self): 290 """Confirm _HAS_ELEVENLABS and _HAS_AUDIO are not in tts_tool module.""" 291 import tools.tts_tool as tts_mod 292 assert not hasattr(tts_mod, "_HAS_ELEVENLABS"), \ 293 "_HAS_ELEVENLABS should not exist -- lazy imports replaced it" 294 assert not hasattr(tts_mod, "_HAS_AUDIO"), \ 295 "_HAS_AUDIO should not exist -- lazy imports replaced it" 296 297 298 # ============================================================================ 299 # Voice mode user message prefix (Bug B fix) 300 # ============================================================================ 301 302 class TestVoiceMessagePrefix: 303 """Voice mode should inject instruction via user message prefix, 304 not by modifying the system prompt (which breaks prompt cache).""" 305 306 def test_prefix_added_when_voice_mode_active(self): 307 """When voice mode is active and message is str, agent_message 308 should have the voice instruction prefix.""" 309 voice_mode = True 310 message = "What's the weather like?" 311 312 agent_message = message 313 if voice_mode and isinstance(message, str): 314 agent_message = ( 315 "[Voice input — respond concisely and conversationally, " 316 "2-3 sentences max. No code blocks or markdown.] " 317 + message 318 ) 319 320 assert agent_message.startswith("[Voice input") 321 assert "What's the weather like?" in agent_message 322 323 def test_no_prefix_when_voice_mode_inactive(self): 324 """When voice mode is off, message passes through unchanged.""" 325 voice_mode = False 326 message = "What's the weather like?" 327 328 agent_message = message 329 if voice_mode and isinstance(message, str): 330 agent_message = ( 331 "[Voice input — respond concisely and conversationally, " 332 "2-3 sentences max. No code blocks or markdown.] " 333 + message 334 ) 335 336 assert agent_message == message 337 338 def test_no_prefix_for_multimodal_content(self): 339 """When message is a list (multimodal), no prefix is added.""" 340 voice_mode = True 341 message = [{"type": "text", "text": "describe this"}, {"type": "image_url"}] 342 343 agent_message = message 344 if voice_mode and isinstance(message, str): 345 agent_message = ( 346 "[Voice input — respond concisely and conversationally, " 347 "2-3 sentences max. No code blocks or markdown.] " 348 + message 349 ) 350 351 assert agent_message is message 352 353 def test_history_stays_clean(self): 354 """conversation_history should contain the original message, 355 not the prefixed version.""" 356 voice_mode = True 357 message = "Hello there" 358 conversation_history = [] 359 360 conversation_history.append({"role": "user", "content": message}) 361 362 agent_message = message 363 if voice_mode and isinstance(message, str): 364 agent_message = ( 365 "[Voice input — respond concisely and conversationally, " 366 "2-3 sentences max. No code blocks or markdown.] " 367 + message 368 ) 369 370 assert conversation_history[-1]["content"] == "Hello there" 371 assert agent_message.startswith("[Voice input") 372 assert agent_message != conversation_history[-1]["content"] 373 374 def test_enable_voice_mode_does_not_modify_system_prompt(self): 375 """_enable_voice_mode should NOT modify self.system_prompt or 376 agent.ephemeral_system_prompt -- the system prompt must stay 377 stable to preserve prompt cache.""" 378 cli = SimpleNamespace( 379 _voice_mode=False, 380 _voice_tts=False, 381 _voice_lock=threading.Lock(), 382 system_prompt="You are helpful", 383 agent=SimpleNamespace(ephemeral_system_prompt="You are helpful"), 384 ) 385 386 original_system = cli.system_prompt 387 original_ephemeral = cli.agent.ephemeral_system_prompt 388 389 cli._voice_mode = True 390 391 assert cli.system_prompt == original_system 392 assert cli.agent.ephemeral_system_prompt == original_ephemeral 393 394 395 # ============================================================================ 396 # _vprint force parameter (Minor fix) 397 # ============================================================================ 398 399 class TestVprintForceParameter: 400 """_vprint should suppress output during streaming TTS unless force=True.""" 401 402 def _make_agent_with_stream(self, stream_active: bool): 403 """Create a minimal agent-like object with _vprint.""" 404 agent = SimpleNamespace( 405 _stream_callback=MagicMock() if stream_active else None, 406 ) 407 408 def _vprint(*args, force=False, **kwargs): 409 if not force and getattr(agent, "_stream_callback", None) is not None: 410 return 411 print(*args, **kwargs) 412 413 agent._vprint = _vprint 414 return agent 415 416 def test_suppressed_during_streaming(self, capsys): 417 """Normal _vprint output is suppressed when streaming TTS is active.""" 418 agent = self._make_agent_with_stream(stream_active=True) 419 agent._vprint("should be hidden") 420 captured = capsys.readouterr() 421 assert captured.out == "" 422 423 def test_shown_when_not_streaming(self, capsys): 424 """Normal _vprint output is shown when streaming is not active.""" 425 agent = self._make_agent_with_stream(stream_active=False) 426 agent._vprint("should be shown") 427 captured = capsys.readouterr() 428 assert "should be shown" in captured.out 429 430 def test_force_shown_during_streaming(self, capsys): 431 """force=True bypasses the streaming suppression.""" 432 agent = self._make_agent_with_stream(stream_active=True) 433 agent._vprint("critical error!", force=True) 434 captured = capsys.readouterr() 435 assert "critical error!" in captured.out 436 437 def test_force_shown_when_not_streaming(self, capsys): 438 """force=True works normally when not streaming (no regression).""" 439 agent = self._make_agent_with_stream(stream_active=False) 440 agent._vprint("normal message", force=True) 441 captured = capsys.readouterr() 442 assert "normal message" in captured.out 443 444 def test_error_messages_use_force_in_run_agent(self): 445 """Verify that critical error _vprint calls in run_agent.py 446 include force=True.""" 447 with open("run_agent.py", "r") as f: 448 source = f.read() 449 450 tree = ast.parse(source) 451 452 forced_error_count = 0 453 unforced_error_count = 0 454 455 for node in ast.walk(tree): 456 if not isinstance(node, ast.Call): 457 continue 458 func = node.func 459 if not (isinstance(func, ast.Attribute) and func.attr == "_vprint"): 460 continue 461 has_fatal = False 462 for arg in node.args: 463 if isinstance(arg, ast.JoinedStr): 464 for val in arg.values: 465 if isinstance(val, ast.Constant) and isinstance(val.value, str): 466 if "\u274c" in val.value: 467 has_fatal = True 468 break 469 470 if not has_fatal: 471 continue 472 473 has_force = any( 474 kw.arg == "force" 475 and isinstance(kw.value, ast.Constant) 476 and kw.value.value is True 477 for kw in node.keywords 478 ) 479 480 if has_force: 481 forced_error_count += 1 482 else: 483 unforced_error_count += 1 484 485 assert forced_error_count > 0, \ 486 "Expected at least one _vprint with force=True for error messages" 487 assert unforced_error_count == 0, \ 488 f"Found {unforced_error_count} critical error _vprint calls without force=True" 489 490 491 # ============================================================================ 492 # Bug fix regression tests 493 # ============================================================================ 494 495 class TestEdgeTTSLazyImport: 496 """Bug #3: _generate_edge_tts must use lazy import, not bare module name.""" 497 498 def test_generate_edge_tts_calls_lazy_import(self): 499 """AST check: _generate_edge_tts must call _import_edge_tts(), not 500 reference bare 'edge_tts' module name.""" 501 import ast as _ast 502 503 with open("tools/tts_tool.py") as f: 504 tree = _ast.parse(f.read()) 505 506 for node in _ast.walk(tree): 507 if isinstance(node, _ast.AsyncFunctionDef) and node.name == "_generate_edge_tts": 508 # Collect all Name references (bare identifiers) 509 bare_refs = [ 510 n.id for n in _ast.walk(node) 511 if isinstance(n, _ast.Name) and n.id == "edge_tts" 512 ] 513 assert bare_refs == [], ( 514 f"_generate_edge_tts uses bare 'edge_tts' name — " 515 f"should use _import_edge_tts() lazy helper" 516 ) 517 518 # Must have a call to _import_edge_tts 519 lazy_calls = [ 520 n for n in _ast.walk(node) 521 if isinstance(n, _ast.Call) 522 and isinstance(n.func, _ast.Name) 523 and n.func.id == "_import_edge_tts" 524 ] 525 assert len(lazy_calls) >= 1, ( 526 "_generate_edge_tts must call _import_edge_tts()" 527 ) 528 break 529 else: 530 pytest.fail("_generate_edge_tts not found in tts_tool.py") 531 532 533 class TestStreamingTTSOutputStreamCleanup: 534 """Bug #7: output_stream must be closed in finally block.""" 535 536 def test_output_stream_closed_in_finally(self): 537 """AST check: stream_tts_to_speaker's finally block must close 538 output_stream even on exception.""" 539 import ast as _ast 540 541 with open("tools/tts_tool.py") as f: 542 tree = _ast.parse(f.read()) 543 544 for node in _ast.walk(tree): 545 if isinstance(node, _ast.FunctionDef) and node.name == "stream_tts_to_speaker": 546 # Find the outermost try that has a finally with tts_done_event.set() 547 for child in _ast.walk(node): 548 if isinstance(child, _ast.Try) and child.finalbody: 549 finally_text = "\n".join( 550 _ast.dump(n) for n in child.finalbody 551 ) 552 if "tts_done_event" in finally_text: 553 assert "output_stream" in finally_text, ( 554 "finally block must close output_stream" 555 ) 556 return 557 pytest.fail("No finally block with tts_done_event found") 558 559 560 class TestCtrlCResetsContinuousMode: 561 """Bug #4: Ctrl+C cancel must reset _voice_continuous.""" 562 563 def test_ctrl_c_handler_resets_voice_continuous(self): 564 """Source check: Ctrl+C voice cancel block must set 565 _voice_continuous = False.""" 566 with open("cli.py") as f: 567 source = f.read() 568 569 # Find the Ctrl+C handler's voice cancel block 570 lines = source.split("\n") 571 in_cancel_block = False 572 found_continuous_reset = False 573 for i, line in enumerate(lines): 574 if "Cancel active voice recording" in line: 575 in_cancel_block = True 576 if in_cancel_block: 577 if "_voice_continuous = False" in line: 578 found_continuous_reset = True 579 break 580 # Block ends at next comment section or return 581 if "return" in line and in_cancel_block: 582 break 583 584 assert found_continuous_reset, ( 585 "Ctrl+C voice cancel block must set _voice_continuous = False" 586 ) 587 588 589 class TestDisableVoiceModeStopsTTS: 590 """Bug #5: _disable_voice_mode must stop active TTS playback.""" 591 592 def test_disable_voice_mode_calls_stop_playback(self): 593 """Source check: _disable_voice_mode must call stop_playback().""" 594 import inspect 595 from cli import HermesCLI 596 597 source = inspect.getsource(HermesCLI._disable_voice_mode) 598 assert "stop_playback" in source, ( 599 "_disable_voice_mode must call stop_playback()" 600 ) 601 assert "_voice_tts_done.set()" in source, ( 602 "_disable_voice_mode must set _voice_tts_done" 603 ) 604 605 606 class TestVoiceStatusUsesConfigKey: 607 """Bug #8: voice-related UI surfaces must read record key from config.""" 608 609 def test_show_voice_status_not_hardcoded(self): 610 """Source check: _show_voice_status must not hardcode Ctrl+B.""" 611 with open("cli.py") as f: 612 source = f.read() 613 614 lines = source.split("\n") 615 in_method = False 616 for line in lines: 617 if "def _show_voice_status" in line: 618 in_method = True 619 elif in_method and line.strip().startswith("def "): 620 break 621 elif in_method: 622 assert 'Record key: Ctrl+B"' not in line, ( 623 "_show_voice_status hardcodes 'Ctrl+B' — " 624 "should read from config" 625 ) 626 627 def test_show_voice_status_reads_config(self): 628 """Source check: _show_voice_status must use load_config().""" 629 with open("cli.py") as f: 630 source = f.read() 631 632 lines = source.split("\n") 633 in_method = False 634 method_lines = [] 635 for line in lines: 636 if "def _show_voice_status" in line: 637 in_method = True 638 elif in_method and line.strip().startswith("def "): 639 break 640 elif in_method: 641 method_lines.append(line) 642 643 method_source = "\n".join(method_lines) 644 assert "load_config" in method_source or "record_key" in method_source, ( 645 "_show_voice_status should read record_key from config" 646 ) 647 648 def test_tui_voice_status_not_hardcoded(self): 649 """Source check: TUI voice status fragments must not hardcode Ctrl+B.""" 650 with open("cli.py") as f: 651 source = f.read() 652 653 lines = source.split("\n") 654 in_method = False 655 for line in lines: 656 if "def _get_voice_status_fragments" in line: 657 in_method = True 658 elif in_method and line.strip().startswith("def "): 659 break 660 elif in_method: 661 assert 'Ctrl+B to stop' not in line 662 assert 'Ctrl+B to record' not in line 663 assert '" 🎤 Ctrl+B "' not in line 664 665 def test_placeholder_not_hardcoded(self): 666 """Source check: prompt placeholder must not hardcode Ctrl+B.""" 667 with open("cli.py") as f: 668 source = f.read() 669 670 assert 'return "type or Ctrl+B to record"' not in source 671 assert 'return "recording... Ctrl+B to stop, Ctrl+C to cancel"' not in source 672 673 def test_external_editor_binding_falls_back_when_voice_uses_ctrl_g(self): 674 """Source check: Ctrl+G must not be claimed by both voice and editor.""" 675 with open("cli.py") as f: 676 source = f.read() 677 678 assert '_primary_editor_binding = ("c-x", "c-e") if _editor_voice_key == "c-g" else ("c-g",)' in source 679 assert "@kb.add(*_primary_editor_binding, filter=_editor_filter)" in source 680 681 def test_voice_binding_is_eager(self): 682 """Source check: configured voice key must register eagerly to beat default bindings.""" 683 with open("cli.py") as f: 684 source = f.read() 685 686 assert '@kb.add(_voice_key, eager=True)' in source 687 688 689 class TestChatTTSCleanupOnException: 690 """Bug #2: chat() must clean up streaming TTS resources on exception.""" 691 692 def test_chat_has_finally_for_tts_cleanup(self): 693 """AST check: chat() method must have a finally block that cleans up 694 text_queue, stop_event, and tts_thread.""" 695 import ast as _ast 696 697 with open("cli.py") as f: 698 tree = _ast.parse(f.read()) 699 700 for node in _ast.walk(tree): 701 if isinstance(node, _ast.FunctionDef) and node.name == "chat": 702 # Find Try nodes with finally blocks 703 for child in _ast.walk(node): 704 if isinstance(child, _ast.Try) and child.finalbody: 705 finally_text = "\n".join( 706 _ast.dump(n) for n in child.finalbody 707 ) 708 if "text_queue" in finally_text: 709 assert "stop_event" in finally_text, ( 710 "finally must also handle stop_event" 711 ) 712 assert "tts_thread" in finally_text, ( 713 "finally must also handle tts_thread" 714 ) 715 return 716 pytest.fail( 717 "chat() must have a finally block cleaning up " 718 "text_queue/stop_event/tts_thread" 719 ) 720 721 722 class TestBrowserToolSignalHandlerRemoved: 723 """browser_tool.py must NOT register SIGINT/SIGTERM handlers that call 724 sys.exit() — this conflicts with prompt_toolkit's event loop and causes 725 the process to become unkillable during voice mode.""" 726 727 def test_no_signal_handler_registration(self): 728 """Source check: browser_tool.py must not call signal.signal() 729 for SIGINT or SIGTERM.""" 730 with open("tools/browser_tool.py") as f: 731 source = f.read() 732 733 lines = source.split("\n") 734 for i, line in enumerate(lines, 1): 735 stripped = line.strip() 736 # Skip comments 737 if stripped.startswith("#"): 738 continue 739 assert "signal.signal(signal.SIGINT" not in stripped, ( 740 f"browser_tool.py:{i} registers SIGINT handler — " 741 f"use atexit instead to avoid prompt_toolkit conflicts" 742 ) 743 assert "signal.signal(signal.SIGTERM" not in stripped, ( 744 f"browser_tool.py:{i} registers SIGTERM handler — " 745 f"use atexit instead to avoid prompt_toolkit conflicts" 746 ) 747 748 749 class TestKeyHandlerNeverBlocks: 750 """The Ctrl+B key handler runs in prompt_toolkit's event-loop thread. 751 Any blocking call freezes the entire UI. Verify that: 752 1. _voice_start_recording is NOT called directly (must be in daemon thread) 753 2. _voice_processing guard prevents starting while stop/transcribe runs 754 3. _voice_processing is set atomically with _voice_recording in stop_and_transcribe 755 """ 756 757 def test_start_recording_not_called_directly_in_handler(self): 758 """AST check: handle_voice_record must NOT call _voice_start_recording() 759 directly — it must wrap it in a Thread to avoid blocking the UI.""" 760 import ast as _ast 761 762 with open("cli.py") as f: 763 tree = _ast.parse(f.read()) 764 765 for node in _ast.walk(tree): 766 if isinstance(node, _ast.FunctionDef) and node.name == "handle_voice_record": 767 # Collect all direct calls to _voice_start_recording in this function. 768 # They should ONLY appear inside a nested def (the _start_recording wrapper). 769 for child in _ast.iter_child_nodes(node): 770 # Direct statements in the handler body (not nested defs) 771 if isinstance(child, _ast.Expr) and isinstance(child.value, _ast.Call): 772 call_src = _ast.dump(child.value) 773 assert "_voice_start_recording" not in call_src, ( 774 "handle_voice_record calls _voice_start_recording directly " 775 "— must dispatch to a daemon thread" 776 ) 777 break 778 779 def test_processing_guard_in_start_path(self): 780 """Source check: key handler must check _voice_processing before 781 starting a new recording.""" 782 with open("cli.py") as f: 783 source = f.read() 784 785 lines = source.split("\n") 786 in_handler = False 787 in_else = False 788 found_guard = False 789 for line in lines: 790 if "def handle_voice_record" in line: 791 in_handler = True 792 elif in_handler and line.strip().startswith("def ") and "_start_recording" not in line: 793 break 794 elif in_handler and "else:" in line: 795 in_else = True 796 elif in_else and "_voice_processing" in line: 797 found_guard = True 798 break 799 800 assert found_guard, ( 801 "Key handler START path must guard against _voice_processing " 802 "to prevent blocking on AudioRecorder._lock" 803 ) 804 805 def test_processing_set_atomically_with_recording_false(self): 806 """Source check: _voice_stop_and_transcribe must set _voice_processing = True 807 in the same lock block where it sets _voice_recording = False.""" 808 with open("cli.py") as f: 809 source = f.read() 810 811 lines = source.split("\n") 812 in_method = False 813 in_first_lock = False 814 found_recording_false = False 815 found_processing_true = False 816 for line in lines: 817 if "def _voice_stop_and_transcribe" in line: 818 in_method = True 819 elif in_method and "with self._voice_lock:" in line and not in_first_lock: 820 in_first_lock = True 821 elif in_first_lock: 822 stripped = line.strip() 823 if not stripped or stripped.startswith("#"): 824 continue 825 if "_voice_recording = False" in stripped: 826 found_recording_false = True 827 if "_voice_processing = True" in stripped: 828 found_processing_true = True 829 # End of with block (dedent) 830 if stripped and not line.startswith(" ") and not line.startswith("\t\t\t"): 831 break 832 833 assert found_recording_false and found_processing_true, ( 834 "_voice_stop_and_transcribe must set _voice_processing = True " 835 "atomically (same lock block) with _voice_recording = False" 836 ) 837 838 839 # ============================================================================ 840 # Real behavior tests — CLI voice methods via _make_voice_cli() 841 # ============================================================================ 842 843 class TestHandleVoiceCommandReal: 844 """Tests _handle_voice_command routing with real CLI instance.""" 845 846 def _cli(self): 847 cli = _make_voice_cli() 848 cli._enable_voice_mode = MagicMock() 849 cli._disable_voice_mode = MagicMock() 850 cli._toggle_voice_tts = MagicMock() 851 cli._show_voice_status = MagicMock() 852 return cli 853 854 @patch("cli._cprint") 855 def test_on_calls_enable(self, _cp): 856 cli = self._cli() 857 cli._handle_voice_command("/voice on") 858 cli._enable_voice_mode.assert_called_once() 859 860 @patch("cli._cprint") 861 def test_off_calls_disable(self, _cp): 862 cli = self._cli() 863 cli._handle_voice_command("/voice off") 864 cli._disable_voice_mode.assert_called_once() 865 866 @patch("cli._cprint") 867 def test_tts_calls_toggle(self, _cp): 868 cli = self._cli() 869 cli._handle_voice_command("/voice tts") 870 cli._toggle_voice_tts.assert_called_once() 871 872 @patch("cli._cprint") 873 def test_status_calls_show(self, _cp): 874 cli = self._cli() 875 cli._handle_voice_command("/voice status") 876 cli._show_voice_status.assert_called_once() 877 878 @patch("cli._cprint") 879 def test_toggle_off_when_enabled(self, _cp): 880 cli = self._cli() 881 cli._voice_mode = True 882 cli._handle_voice_command("/voice") 883 cli._disable_voice_mode.assert_called_once() 884 885 @patch("cli._cprint") 886 def test_toggle_on_when_disabled(self, _cp): 887 cli = self._cli() 888 cli._voice_mode = False 889 cli._handle_voice_command("/voice") 890 cli._enable_voice_mode.assert_called_once() 891 892 @patch("cli._cprint") 893 def test_unknown_subcommand(self, mock_cp): 894 cli = self._cli() 895 cli._handle_voice_command("/voice foobar") 896 cli._enable_voice_mode.assert_not_called() 897 cli._disable_voice_mode.assert_not_called() 898 # Should print usage via _cprint 899 assert any("Unknown" in str(c) or "unknown" in str(c) 900 for c in mock_cp.call_args_list) 901 902 903 class TestEnableVoiceModeReal: 904 """Tests _enable_voice_mode with real CLI instance.""" 905 906 @patch("cli._cprint") 907 @patch("hermes_cli.config.load_config", return_value={"voice": {}}) 908 @patch("tools.voice_mode.check_voice_requirements", 909 return_value={"available": True, "details": "OK"}) 910 @patch("tools.voice_mode.detect_audio_environment", 911 return_value={"available": True, "warnings": []}) 912 def test_success_sets_voice_mode(self, _env, _req, _cfg, _cp): 913 cli = _make_voice_cli() 914 cli._enable_voice_mode() 915 assert cli._voice_mode is True 916 917 @patch("cli._cprint") 918 def test_already_enabled_noop(self, _cp): 919 cli = _make_voice_cli(_voice_mode=True) 920 cli._enable_voice_mode() 921 assert cli._voice_mode is True 922 923 @patch("cli._cprint") 924 @patch("tools.voice_mode.detect_audio_environment", 925 return_value={"available": False, "warnings": ["SSH session"]}) 926 def test_env_check_fails(self, _env, _cp): 927 cli = _make_voice_cli() 928 cli._enable_voice_mode() 929 assert cli._voice_mode is False 930 931 @patch("cli._cprint") 932 @patch("tools.voice_mode.check_voice_requirements", 933 return_value={"available": False, "details": "Missing", 934 "missing_packages": ["sounddevice"]}) 935 @patch("tools.voice_mode.detect_audio_environment", 936 return_value={"available": True, "warnings": []}) 937 def test_requirements_fail(self, _env, _req, _cp): 938 cli = _make_voice_cli() 939 cli._enable_voice_mode() 940 assert cli._voice_mode is False 941 942 @patch("cli._cprint") 943 @patch("hermes_cli.config.load_config", return_value={"voice": {"auto_tts": True}}) 944 @patch("tools.voice_mode.check_voice_requirements", 945 return_value={"available": True, "details": "OK"}) 946 @patch("tools.voice_mode.detect_audio_environment", 947 return_value={"available": True, "warnings": []}) 948 def test_auto_tts_from_config(self, _env, _req, _cfg, _cp): 949 cli = _make_voice_cli() 950 cli._enable_voice_mode() 951 assert cli._voice_tts is True 952 953 @patch("cli._cprint") 954 @patch("hermes_cli.config.load_config", return_value={"voice": {}}) 955 @patch("tools.voice_mode.check_voice_requirements", 956 return_value={"available": True, "details": "OK"}) 957 @patch("tools.voice_mode.detect_audio_environment", 958 return_value={"available": True, "warnings": []}) 959 def test_no_auto_tts_default(self, _env, _req, _cfg, _cp): 960 cli = _make_voice_cli() 961 cli._enable_voice_mode() 962 assert cli._voice_tts is False 963 964 @patch("cli._cprint") 965 @patch("hermes_cli.config.load_config", side_effect=Exception("broken config")) 966 @patch("tools.voice_mode.check_voice_requirements", 967 return_value={"available": True, "details": "OK"}) 968 @patch("tools.voice_mode.detect_audio_environment", 969 return_value={"available": True, "warnings": []}) 970 def test_config_exception_still_enables(self, _env, _req, _cfg, _cp): 971 cli = _make_voice_cli() 972 cli._enable_voice_mode() 973 assert cli._voice_mode is True 974 975 976 class TestVoiceBeepConfigReal: 977 """Tests the CLI voice beep toggle.""" 978 979 @patch("hermes_cli.config.load_config", return_value={"voice": {}}) 980 def test_beeps_enabled_by_default(self, _cfg): 981 cli = _make_voice_cli() 982 assert cli._voice_beeps_enabled() is True 983 984 @patch("hermes_cli.config.load_config", return_value={"voice": {"beep_enabled": False}}) 985 def test_beeps_can_be_disabled(self, _cfg): 986 cli = _make_voice_cli() 987 assert cli._voice_beeps_enabled() is False 988 989 @patch("cli._cprint") 990 @patch("cli.threading.Thread") 991 @patch("tools.voice_mode.play_beep") 992 @patch("tools.voice_mode.create_audio_recorder") 993 @patch( 994 "tools.voice_mode.check_voice_requirements", 995 return_value={ 996 "available": True, 997 "audio_available": True, 998 "stt_available": True, 999 "details": "OK", 1000 "missing_packages": [], 1001 }, 1002 ) 1003 @patch( 1004 "hermes_cli.config.load_config", 1005 return_value={ 1006 "voice": { 1007 "beep_enabled": False, 1008 "silence_threshold": 200, 1009 "silence_duration": 3.0, 1010 } 1011 }, 1012 ) 1013 def test_start_recording_skips_beep_when_disabled( 1014 self, _cfg, _req, mock_create, mock_beep, mock_thread, _cp 1015 ): 1016 recorder = MagicMock() 1017 recorder.supports_silence_autostop = True 1018 mock_create.return_value = recorder 1019 mock_thread.return_value = MagicMock(start=MagicMock()) 1020 1021 cli = _make_voice_cli() 1022 cli._voice_start_recording() 1023 1024 recorder.start.assert_called_once() 1025 mock_beep.assert_not_called() 1026 1027 1028 class TestDisableVoiceModeReal: 1029 """Tests _disable_voice_mode with real CLI instance.""" 1030 1031 @patch("cli._cprint") 1032 @patch("tools.voice_mode.stop_playback") 1033 def test_all_flags_reset(self, _sp, _cp): 1034 cli = _make_voice_cli(_voice_mode=True, _voice_tts=True, 1035 _voice_continuous=True) 1036 cli._disable_voice_mode() 1037 assert cli._voice_mode is False 1038 assert cli._voice_tts is False 1039 assert cli._voice_continuous is False 1040 1041 @patch("cli._cprint") 1042 @patch("tools.voice_mode.stop_playback") 1043 def test_active_recording_cancelled(self, _sp, _cp): 1044 recorder = MagicMock() 1045 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1046 cli._disable_voice_mode() 1047 recorder.cancel.assert_called_once() 1048 assert cli._voice_recording is False 1049 1050 @patch("cli._cprint") 1051 @patch("tools.voice_mode.stop_playback") 1052 def test_stop_playback_called(self, mock_sp, _cp): 1053 cli = _make_voice_cli() 1054 cli._disable_voice_mode() 1055 mock_sp.assert_called_once() 1056 1057 @patch("cli._cprint") 1058 @patch("tools.voice_mode.stop_playback") 1059 def test_tts_done_event_set(self, _sp, _cp): 1060 cli = _make_voice_cli() 1061 cli._voice_tts_done.clear() 1062 cli._disable_voice_mode() 1063 assert cli._voice_tts_done.is_set() 1064 1065 @patch("cli._cprint") 1066 @patch("tools.voice_mode.stop_playback") 1067 def test_no_recorder_no_crash(self, _sp, _cp): 1068 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=None) 1069 cli._disable_voice_mode() 1070 assert cli._voice_mode is False 1071 1072 @patch("cli._cprint") 1073 @patch("tools.voice_mode.stop_playback", side_effect=RuntimeError("boom")) 1074 def test_stop_playback_exception_swallowed(self, _sp, _cp): 1075 cli = _make_voice_cli(_voice_mode=True) 1076 cli._disable_voice_mode() 1077 assert cli._voice_mode is False 1078 1079 1080 class TestVoiceSpeakResponseReal: 1081 """Tests _voice_speak_response with real CLI instance.""" 1082 1083 def test_async_scheduling_clears_done_before_thread_start(self): 1084 cli = _make_voice_cli(_voice_tts=True) 1085 starts = [] 1086 1087 class FakeThread: 1088 def __init__(self, target=None, args=(), daemon=None): 1089 self.target = target 1090 self.args = args 1091 self.daemon = daemon 1092 1093 def start(self): 1094 starts.append(cli._voice_tts_done.is_set()) 1095 1096 with patch("cli.threading.Thread", FakeThread): 1097 cli._voice_speak_response_async("Hello") 1098 1099 assert starts == [False] 1100 assert not cli._voice_tts_done.is_set() 1101 1102 @patch("cli._cprint") 1103 def test_early_return_when_tts_off(self, _cp): 1104 cli = _make_voice_cli(_voice_tts=False) 1105 with patch("tools.tts_tool.text_to_speech_tool") as mock_tts: 1106 cli._voice_speak_response("Hello") 1107 mock_tts.assert_not_called() 1108 1109 @patch("cli._cprint") 1110 @patch("cli.os.unlink") 1111 @patch("cli.os.path.getsize", return_value=1000) 1112 @patch("cli.os.path.isfile", return_value=True) 1113 @patch("cli.os.makedirs") 1114 @patch("tools.voice_mode.play_audio_file") 1115 @patch("tools.tts_tool.text_to_speech_tool", return_value='{"success": true}') 1116 def test_markdown_stripped(self, mock_tts, _play, _mkd, _isf, _gsz, _unl, _cp): 1117 cli = _make_voice_cli(_voice_tts=True) 1118 cli._voice_speak_response("## Title\n**bold** and `code`") 1119 call_text = mock_tts.call_args.kwargs["text"] 1120 assert "##" not in call_text 1121 assert "**" not in call_text 1122 assert "`" not in call_text 1123 1124 @patch("cli._cprint") 1125 @patch("cli.os.makedirs") 1126 @patch("tools.tts_tool.text_to_speech_tool", return_value='{"success": true}') 1127 def test_code_blocks_removed(self, mock_tts, _mkd, _cp): 1128 cli = _make_voice_cli(_voice_tts=True) 1129 cli._voice_speak_response("```python\nprint('hi')\n```\nSome text") 1130 call_text = mock_tts.call_args.kwargs["text"] 1131 assert "print" not in call_text 1132 assert "```" not in call_text 1133 assert "Some text" in call_text 1134 1135 @patch("cli._cprint") 1136 @patch("cli.os.makedirs") 1137 def test_empty_after_strip_returns_early(self, _mkd, _cp): 1138 cli = _make_voice_cli(_voice_tts=True) 1139 with patch("tools.tts_tool.text_to_speech_tool") as mock_tts: 1140 cli._voice_speak_response("```python\nprint('hi')\n```") 1141 mock_tts.assert_not_called() 1142 1143 @patch("cli._cprint") 1144 @patch("cli.os.makedirs") 1145 @patch("tools.tts_tool.text_to_speech_tool", return_value='{"success": true}') 1146 def test_long_text_truncated(self, mock_tts, _mkd, _cp): 1147 cli = _make_voice_cli(_voice_tts=True) 1148 cli._voice_speak_response("A" * 5000) 1149 call_text = mock_tts.call_args.kwargs["text"] 1150 assert len(call_text) <= 4000 1151 1152 @patch("cli._cprint") 1153 @patch("cli.os.makedirs") 1154 @patch("tools.tts_tool.text_to_speech_tool", side_effect=RuntimeError("tts fail")) 1155 def test_exception_sets_done_event(self, _tts, _mkd, _cp): 1156 cli = _make_voice_cli(_voice_tts=True) 1157 cli._voice_tts_done.clear() 1158 cli._voice_speak_response("Hello") 1159 assert cli._voice_tts_done.is_set() 1160 1161 @patch("cli._cprint") 1162 @patch("cli.os.unlink") 1163 @patch("cli.os.path.getsize", return_value=1000) 1164 @patch("cli.os.path.isfile", return_value=True) 1165 @patch("cli.os.makedirs") 1166 @patch("tools.voice_mode.play_audio_file") 1167 @patch("tools.tts_tool.text_to_speech_tool", return_value='{"success": true}') 1168 def test_play_audio_called(self, _tts, mock_play, _mkd, _isf, _gsz, _unl, _cp): 1169 cli = _make_voice_cli(_voice_tts=True) 1170 cli._voice_speak_response("Hello world") 1171 mock_play.assert_called_once() 1172 1173 1174 class TestVoiceStopAndTranscribeReal: 1175 """Tests _voice_stop_and_transcribe with real CLI instance.""" 1176 1177 @patch("cli._cprint") 1178 def test_guard_not_recording(self, _cp): 1179 cli = _make_voice_cli(_voice_recording=False) 1180 with patch("tools.voice_mode.transcribe_recording") as mock_tr: 1181 cli._voice_stop_and_transcribe() 1182 mock_tr.assert_not_called() 1183 1184 @patch("cli._cprint") 1185 def test_no_recorder_returns_early(self, _cp): 1186 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=None) 1187 with patch("tools.voice_mode.transcribe_recording") as mock_tr: 1188 cli._voice_stop_and_transcribe() 1189 mock_tr.assert_not_called() 1190 assert cli._voice_recording is False 1191 1192 @patch("cli._cprint") 1193 @patch("tools.voice_mode.play_beep") 1194 def test_no_speech_detected(self, _beep, _cp): 1195 recorder = MagicMock() 1196 recorder.stop.return_value = None 1197 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1198 cli._voice_stop_and_transcribe() 1199 assert cli._pending_input.empty() 1200 1201 @patch("cli._cprint") 1202 @patch("hermes_cli.config.load_config", return_value={"voice": {"beep_enabled": False}}) 1203 @patch("tools.voice_mode.play_beep") 1204 def test_no_speech_detected_skips_beep_when_disabled(self, mock_beep, _cfg, _cp): 1205 recorder = MagicMock() 1206 recorder.stop.return_value = None 1207 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1208 cli._voice_stop_and_transcribe() 1209 mock_beep.assert_not_called() 1210 1211 @patch("cli._cprint") 1212 @patch("cli.os.unlink") 1213 @patch("cli.os.path.isfile", return_value=True) 1214 @patch("hermes_cli.config.load_config", return_value={"stt": {}}) 1215 @patch("tools.voice_mode.transcribe_recording", 1216 return_value={"success": True, "transcript": "hello world"}) 1217 @patch("tools.voice_mode.play_beep") 1218 def test_successful_transcription_queues_input( 1219 self, _beep, _tr, _cfg, _isf, _unl, _cp 1220 ): 1221 recorder = MagicMock() 1222 recorder.stop.return_value = "/tmp/test.wav" 1223 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1224 cli._voice_stop_and_transcribe() 1225 assert cli._pending_input.get_nowait() == "hello world" 1226 1227 @patch("cli._cprint") 1228 @patch("cli.os.unlink") 1229 @patch("cli.os.path.isfile", return_value=True) 1230 @patch("hermes_cli.config.load_config", return_value={"stt": {}}) 1231 @patch("tools.voice_mode.transcribe_recording", 1232 return_value={"success": True, "transcript": ""}) 1233 @patch("tools.voice_mode.play_beep") 1234 def test_empty_transcript_not_queued(self, _beep, _tr, _cfg, _isf, _unl, _cp): 1235 recorder = MagicMock() 1236 recorder.stop.return_value = "/tmp/test.wav" 1237 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1238 cli._voice_stop_and_transcribe() 1239 assert cli._pending_input.empty() 1240 1241 @patch("cli._cprint") 1242 @patch("cli.os.unlink") 1243 @patch("cli.os.path.isfile", return_value=True) 1244 @patch("hermes_cli.config.load_config", return_value={"stt": {}}) 1245 @patch("tools.voice_mode.transcribe_recording", 1246 return_value={"success": False, "error": "API timeout"}) 1247 @patch("tools.voice_mode.play_beep") 1248 def test_transcription_failure(self, _beep, _tr, _cfg, _isf, _unl, _cp): 1249 recorder = MagicMock() 1250 recorder.stop.return_value = "/tmp/test.wav" 1251 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1252 cli._voice_stop_and_transcribe() 1253 assert cli._pending_input.empty() 1254 1255 @patch("cli._cprint") 1256 @patch("cli.os.unlink") 1257 @patch("cli.os.path.isfile", return_value=True) 1258 @patch("hermes_cli.config.load_config", return_value={"stt": {}}) 1259 @patch("tools.voice_mode.transcribe_recording", 1260 side_effect=ConnectionError("network")) 1261 @patch("tools.voice_mode.play_beep") 1262 def test_exception_caught(self, _beep, _tr, _cfg, _isf, _unl, _cp): 1263 recorder = MagicMock() 1264 recorder.stop.return_value = "/tmp/test.wav" 1265 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1266 cli._voice_stop_and_transcribe() # Should not raise 1267 1268 @patch("cli._cprint") 1269 @patch("tools.voice_mode.play_beep") 1270 def test_processing_flag_cleared(self, _beep, _cp): 1271 recorder = MagicMock() 1272 recorder.stop.return_value = None 1273 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1274 cli._voice_stop_and_transcribe() 1275 assert cli._voice_processing is False 1276 1277 @patch("cli._cprint") 1278 @patch("tools.voice_mode.play_beep") 1279 def test_continuous_restarts_on_no_speech(self, _beep, _cp): 1280 recorder = MagicMock() 1281 recorder.stop.return_value = None 1282 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder, 1283 _voice_continuous=True) 1284 cli._voice_start_recording = MagicMock() 1285 cli._voice_stop_and_transcribe() 1286 cli._voice_start_recording.assert_called_once() 1287 1288 @patch("cli._cprint") 1289 @patch("cli.os.unlink") 1290 @patch("cli.os.path.isfile", return_value=True) 1291 @patch("hermes_cli.config.load_config", return_value={"stt": {}}) 1292 @patch("tools.voice_mode.transcribe_recording", 1293 return_value={"success": True, "transcript": "hello"}) 1294 @patch("tools.voice_mode.play_beep") 1295 def test_continuous_no_restart_on_success( 1296 self, _beep, _tr, _cfg, _isf, _unl, _cp 1297 ): 1298 recorder = MagicMock() 1299 recorder.stop.return_value = "/tmp/test.wav" 1300 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder, 1301 _voice_continuous=True) 1302 cli._voice_start_recording = MagicMock() 1303 cli._voice_stop_and_transcribe() 1304 cli._voice_start_recording.assert_not_called() 1305 1306 @patch("cli._cprint") 1307 @patch("cli.os.unlink") 1308 @patch("cli.os.path.isfile", return_value=True) 1309 @patch("hermes_cli.config.load_config", return_value={"stt": {"model": "whisper-large-v3"}}) 1310 @patch("tools.voice_mode.transcribe_recording", 1311 return_value={"success": True, "transcript": "hi"}) 1312 @patch("tools.voice_mode.play_beep") 1313 def test_stt_model_from_config(self, _beep, mock_tr, _cfg, _isf, _unl, _cp): 1314 recorder = MagicMock() 1315 recorder.stop.return_value = "/tmp/test.wav" 1316 cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder) 1317 cli._voice_stop_and_transcribe() 1318 mock_tr.assert_called_once_with("/tmp/test.wav", model="whisper-large-v3") 1319 1320 1321 # --------------------------------------------------------------------------- 1322 # Bugfix: _refresh_level must read _voice_recording under lock 1323 # --------------------------------------------------------------------------- 1324 1325 1326 class TestRefreshLevelLock: 1327 """Bug: _refresh_level thread read _voice_recording without lock.""" 1328 1329 def test_refresh_stops_when_recording_false(self): 1330 import threading, time 1331 1332 lock = threading.Lock() 1333 recording = True 1334 iterations = 0 1335 1336 def refresh_level(): 1337 nonlocal iterations 1338 while True: 1339 with lock: 1340 still = recording 1341 if not still: 1342 break 1343 iterations += 1 1344 time.sleep(0.01) 1345 1346 t = threading.Thread(target=refresh_level, daemon=True) 1347 t.start() 1348 1349 time.sleep(0.05) 1350 with lock: 1351 recording = False 1352 1353 t.join(timeout=1) 1354 assert not t.is_alive(), "Refresh thread did not stop" 1355 assert iterations > 0, "Refresh thread never ran"