/ tests / tools / test_voice_cli_integration.py
test_voice_cli_integration.py
   1  """Tests for CLI voice mode integration -- command parsing, markdown stripping,
   2  state management, streaming TTS activation, voice message prefix, _vprint."""
   3  
   4  import ast
   5  import os
   6  import queue
   7  import threading
   8  from types import SimpleNamespace
   9  from unittest.mock import MagicMock, patch
  10  
  11  import pytest
  12  
  13  
  14  def _make_voice_cli(**overrides):
  15      """Create a minimal HermesCLI with only voice-related attrs initialized.
  16  
  17      Uses ``__new__()`` to bypass ``__init__`` so no config/env/API setup is
  18      needed.  Only the voice state attributes (from __init__ lines 3749-3758)
  19      are populated.
  20      """
  21      from cli import HermesCLI
  22  
  23      cli = HermesCLI.__new__(HermesCLI)
  24      cli._voice_lock = threading.Lock()
  25      cli._voice_mode = False
  26      cli._voice_tts = False
  27      cli._voice_recorder = None
  28      cli._voice_recording = False
  29      cli._voice_processing = False
  30      cli._voice_continuous = False
  31      cli._voice_tts_done = threading.Event()
  32      cli._voice_tts_done.set()
  33      cli._pending_input = queue.Queue()
  34      cli._app = None
  35      cli._attached_images = []
  36      cli.console = SimpleNamespace(width=80)
  37      for k, v in overrides.items():
  38          setattr(cli, k, v)
  39      return cli
  40  
  41  
  42  # ============================================================================
  43  # Markdown stripping — import real function from tts_tool
  44  # ============================================================================
  45  
  46  from tools.tts_tool import _strip_markdown_for_tts
  47  
  48  
  49  class TestMarkdownStripping:
  50      def test_strips_bold(self):
  51          assert _strip_markdown_for_tts("This is **bold** text") == "This is bold text"
  52  
  53      def test_strips_italic(self):
  54          assert _strip_markdown_for_tts("This is *italic* text") == "This is italic text"
  55  
  56      def test_strips_inline_code(self):
  57          assert _strip_markdown_for_tts("Run `pip install foo`") == "Run pip install foo"
  58  
  59      def test_strips_fenced_code_blocks(self):
  60          text = "Here is code:\n```python\nprint('hello')\n```\nDone."
  61          result = _strip_markdown_for_tts(text)
  62          assert "print" not in result
  63          assert "Done." in result
  64  
  65      def test_strips_headers(self):
  66          assert _strip_markdown_for_tts("## Summary\nSome text") == "Summary\nSome text"
  67  
  68      def test_strips_list_markers(self):
  69          text = "- item one\n- item two\n* item three"
  70          result = _strip_markdown_for_tts(text)
  71          assert "item one" in result
  72          assert "- " not in result
  73          assert "* " not in result
  74  
  75      def test_strips_urls(self):
  76          text = "Visit https://example.com for details"
  77          result = _strip_markdown_for_tts(text)
  78          assert "https://" not in result
  79          assert "Visit" in result
  80  
  81      def test_strips_markdown_links(self):
  82          text = "See [the docs](https://example.com/docs) for info"
  83          result = _strip_markdown_for_tts(text)
  84          assert "the docs" in result
  85          assert "https://" not in result
  86          assert "[" not in result
  87  
  88      def test_strips_horizontal_rules(self):
  89          text = "Part one\n---\nPart two"
  90          result = _strip_markdown_for_tts(text)
  91          assert "---" not in result
  92          assert "Part one" in result
  93          assert "Part two" in result
  94  
  95      def test_empty_after_stripping_returns_empty(self):
  96          text = "```python\nprint('hello')\n```"
  97          result = _strip_markdown_for_tts(text)
  98          assert result == ""
  99  
 100      def test_long_text_not_truncated(self):
 101          """_strip_markdown_for_tts does NOT truncate — that's the caller's job."""
 102          text = "a" * 5000
 103          result = _strip_markdown_for_tts(text)
 104          assert len(result) == 5000
 105  
 106      def test_complex_response(self):
 107          text = (
 108              "## Answer\n\n"
 109              "Here's how to do it:\n\n"
 110              "```python\ndef hello():\n    print('hi')\n```\n\n"
 111              "Run it with `python main.py`. "
 112              "See [docs](https://example.com) for more.\n\n"
 113              "- Step one\n- Step two\n\n"
 114              "---\n\n"
 115              "**Good luck!**"
 116          )
 117          result = _strip_markdown_for_tts(text)
 118          assert "```" not in result
 119          assert "https://" not in result
 120          assert "**" not in result
 121          assert "---" not in result
 122          assert "Answer" in result
 123          assert "Good luck!" in result
 124          assert "docs" in result
 125  
 126  
 127  # ============================================================================
 128  # Voice command parsing
 129  # ============================================================================
 130  
 131  class TestVoiceCommandParsing:
 132      """Test _handle_voice_command logic without full CLI setup."""
 133  
 134      def test_parse_subcommands(self):
 135          """Verify subcommand extraction from /voice commands."""
 136          test_cases = [
 137              ("/voice on", "on"),
 138              ("/voice off", "off"),
 139              ("/voice tts", "tts"),
 140              ("/voice status", "status"),
 141              ("/voice", ""),
 142              ("/voice  ON  ", "on"),
 143          ]
 144          for command, expected in test_cases:
 145              parts = command.strip().split(maxsplit=1)
 146              subcommand = parts[1].lower().strip() if len(parts) > 1 else ""
 147              assert subcommand == expected, f"Failed for {command!r}: got {subcommand!r}"
 148  
 149  
 150  # ============================================================================
 151  # Voice state thread safety
 152  # ============================================================================
 153  
 154  class TestVoiceStateLock:
 155      def test_lock_protects_state(self):
 156          """Verify that concurrent state changes don't corrupt state."""
 157          lock = threading.Lock()
 158          state = {"recording": False, "count": 0}
 159  
 160          def toggle_many(n):
 161              for _ in range(n):
 162                  with lock:
 163                      state["recording"] = not state["recording"]
 164                      state["count"] += 1
 165  
 166          threads = [threading.Thread(target=toggle_many, args=(1000,)) for _ in range(4)]
 167          for t in threads:
 168              t.start()
 169          for t in threads:
 170              t.join()
 171  
 172          assert state["count"] == 4000
 173  
 174  
 175  # ============================================================================
 176  # Streaming TTS lazy import activation (Bug A fix)
 177  # ============================================================================
 178  
 179  class TestStreamingTTSActivation:
 180      """Verify streaming TTS uses lazy imports to check availability."""
 181  
 182      def test_activates_when_elevenlabs_and_sounddevice_available(self):
 183          """use_streaming_tts should be True when provider is elevenlabs
 184          and both lazy imports succeed."""
 185          use_streaming_tts = False
 186          try:
 187              from tools.tts_tool import (
 188                  _load_tts_config as _load_tts_cfg,
 189                  _get_provider as _get_prov,
 190                  _import_elevenlabs,
 191                  _import_sounddevice,
 192              )
 193              assert callable(_import_elevenlabs)
 194              assert callable(_import_sounddevice)
 195          except ImportError:
 196              pytest.skip("tools.tts_tool not available")
 197  
 198          with patch("tools.tts_tool._load_tts_config") as mock_cfg, \
 199               patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \
 200               patch("tools.tts_tool._import_elevenlabs") as mock_el, \
 201               patch("tools.tts_tool._import_sounddevice") as mock_sd:
 202              mock_cfg.return_value = {"provider": "elevenlabs"}
 203              mock_el.return_value = MagicMock()
 204              mock_sd.return_value = MagicMock()
 205  
 206              from tools.tts_tool import (
 207                  _load_tts_config as load_cfg,
 208                  _get_provider as get_prov,
 209                  _import_elevenlabs as import_el,
 210                  _import_sounddevice as import_sd,
 211              )
 212              cfg = load_cfg()
 213              if get_prov(cfg) == "elevenlabs":
 214                  import_el()
 215                  import_sd()
 216                  use_streaming_tts = True
 217  
 218          assert use_streaming_tts is True
 219  
 220      def test_does_not_activate_when_elevenlabs_missing(self):
 221          """use_streaming_tts stays False when elevenlabs import fails."""
 222          use_streaming_tts = False
 223          with patch("tools.tts_tool._load_tts_config", return_value={"provider": "elevenlabs"}), \
 224               patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \
 225               patch("tools.tts_tool._import_elevenlabs", side_effect=ImportError("no elevenlabs")):
 226              try:
 227                  from tools.tts_tool import (
 228                      _load_tts_config as load_cfg,
 229                      _get_provider as get_prov,
 230                      _import_elevenlabs as import_el,
 231                      _import_sounddevice as import_sd,
 232                  )
 233                  cfg = load_cfg()
 234                  if get_prov(cfg) == "elevenlabs":
 235                      import_el()
 236                      import_sd()
 237                      use_streaming_tts = True
 238              except (ImportError, OSError):
 239                  pass
 240  
 241          assert use_streaming_tts is False
 242  
 243      def test_does_not_activate_when_sounddevice_missing(self):
 244          """use_streaming_tts stays False when sounddevice import fails."""
 245          use_streaming_tts = False
 246          with patch("tools.tts_tool._load_tts_config", return_value={"provider": "elevenlabs"}), \
 247               patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \
 248               patch("tools.tts_tool._import_elevenlabs", return_value=MagicMock()), \
 249               patch("tools.tts_tool._import_sounddevice", side_effect=OSError("no PortAudio")):
 250              try:
 251                  from tools.tts_tool import (
 252                      _load_tts_config as load_cfg,
 253                      _get_provider as get_prov,
 254                      _import_elevenlabs as import_el,
 255                      _import_sounddevice as import_sd,
 256                  )
 257                  cfg = load_cfg()
 258                  if get_prov(cfg) == "elevenlabs":
 259                      import_el()
 260                      import_sd()
 261                      use_streaming_tts = True
 262              except (ImportError, OSError):
 263                  pass
 264  
 265          assert use_streaming_tts is False
 266  
 267      def test_does_not_activate_for_non_elevenlabs_provider(self):
 268          """use_streaming_tts stays False when provider is not elevenlabs."""
 269          use_streaming_tts = False
 270          with patch("tools.tts_tool._load_tts_config", return_value={"provider": "edge"}), \
 271               patch("tools.tts_tool._get_provider", return_value="edge"):
 272              try:
 273                  from tools.tts_tool import (
 274                      _load_tts_config as load_cfg,
 275                      _get_provider as get_prov,
 276                      _import_elevenlabs as import_el,
 277                      _import_sounddevice as import_sd,
 278                  )
 279                  cfg = load_cfg()
 280                  if get_prov(cfg) == "elevenlabs":
 281                      import_el()
 282                      import_sd()
 283                      use_streaming_tts = True
 284              except (ImportError, OSError):
 285                  pass
 286  
 287          assert use_streaming_tts is False
 288  
 289      def test_stale_boolean_imports_no_longer_exist(self):
 290          """Confirm _HAS_ELEVENLABS and _HAS_AUDIO are not in tts_tool module."""
 291          import tools.tts_tool as tts_mod
 292          assert not hasattr(tts_mod, "_HAS_ELEVENLABS"), \
 293              "_HAS_ELEVENLABS should not exist -- lazy imports replaced it"
 294          assert not hasattr(tts_mod, "_HAS_AUDIO"), \
 295              "_HAS_AUDIO should not exist -- lazy imports replaced it"
 296  
 297  
 298  # ============================================================================
 299  # Voice mode user message prefix (Bug B fix)
 300  # ============================================================================
 301  
 302  class TestVoiceMessagePrefix:
 303      """Voice mode should inject instruction via user message prefix,
 304      not by modifying the system prompt (which breaks prompt cache)."""
 305  
 306      def test_prefix_added_when_voice_mode_active(self):
 307          """When voice mode is active and message is str, agent_message
 308          should have the voice instruction prefix."""
 309          voice_mode = True
 310          message = "What's the weather like?"
 311  
 312          agent_message = message
 313          if voice_mode and isinstance(message, str):
 314              agent_message = (
 315                  "[Voice input — respond concisely and conversationally, "
 316                  "2-3 sentences max. No code blocks or markdown.] "
 317                  + message
 318              )
 319  
 320          assert agent_message.startswith("[Voice input")
 321          assert "What's the weather like?" in agent_message
 322  
 323      def test_no_prefix_when_voice_mode_inactive(self):
 324          """When voice mode is off, message passes through unchanged."""
 325          voice_mode = False
 326          message = "What's the weather like?"
 327  
 328          agent_message = message
 329          if voice_mode and isinstance(message, str):
 330              agent_message = (
 331                  "[Voice input — respond concisely and conversationally, "
 332                  "2-3 sentences max. No code blocks or markdown.] "
 333                  + message
 334              )
 335  
 336          assert agent_message == message
 337  
 338      def test_no_prefix_for_multimodal_content(self):
 339          """When message is a list (multimodal), no prefix is added."""
 340          voice_mode = True
 341          message = [{"type": "text", "text": "describe this"}, {"type": "image_url"}]
 342  
 343          agent_message = message
 344          if voice_mode and isinstance(message, str):
 345              agent_message = (
 346                  "[Voice input — respond concisely and conversationally, "
 347                  "2-3 sentences max. No code blocks or markdown.] "
 348                  + message
 349              )
 350  
 351          assert agent_message is message
 352  
 353      def test_history_stays_clean(self):
 354          """conversation_history should contain the original message,
 355          not the prefixed version."""
 356          voice_mode = True
 357          message = "Hello there"
 358          conversation_history = []
 359  
 360          conversation_history.append({"role": "user", "content": message})
 361  
 362          agent_message = message
 363          if voice_mode and isinstance(message, str):
 364              agent_message = (
 365                  "[Voice input — respond concisely and conversationally, "
 366                  "2-3 sentences max. No code blocks or markdown.] "
 367                  + message
 368              )
 369  
 370          assert conversation_history[-1]["content"] == "Hello there"
 371          assert agent_message.startswith("[Voice input")
 372          assert agent_message != conversation_history[-1]["content"]
 373  
 374      def test_enable_voice_mode_does_not_modify_system_prompt(self):
 375          """_enable_voice_mode should NOT modify self.system_prompt or
 376          agent.ephemeral_system_prompt -- the system prompt must stay
 377          stable to preserve prompt cache."""
 378          cli = SimpleNamespace(
 379              _voice_mode=False,
 380              _voice_tts=False,
 381              _voice_lock=threading.Lock(),
 382              system_prompt="You are helpful",
 383              agent=SimpleNamespace(ephemeral_system_prompt="You are helpful"),
 384          )
 385  
 386          original_system = cli.system_prompt
 387          original_ephemeral = cli.agent.ephemeral_system_prompt
 388  
 389          cli._voice_mode = True
 390  
 391          assert cli.system_prompt == original_system
 392          assert cli.agent.ephemeral_system_prompt == original_ephemeral
 393  
 394  
 395  # ============================================================================
 396  # _vprint force parameter (Minor fix)
 397  # ============================================================================
 398  
 399  class TestVprintForceParameter:
 400      """_vprint should suppress output during streaming TTS unless force=True."""
 401  
 402      def _make_agent_with_stream(self, stream_active: bool):
 403          """Create a minimal agent-like object with _vprint."""
 404          agent = SimpleNamespace(
 405              _stream_callback=MagicMock() if stream_active else None,
 406          )
 407  
 408          def _vprint(*args, force=False, **kwargs):
 409              if not force and getattr(agent, "_stream_callback", None) is not None:
 410                  return
 411              print(*args, **kwargs)
 412  
 413          agent._vprint = _vprint
 414          return agent
 415  
 416      def test_suppressed_during_streaming(self, capsys):
 417          """Normal _vprint output is suppressed when streaming TTS is active."""
 418          agent = self._make_agent_with_stream(stream_active=True)
 419          agent._vprint("should be hidden")
 420          captured = capsys.readouterr()
 421          assert captured.out == ""
 422  
 423      def test_shown_when_not_streaming(self, capsys):
 424          """Normal _vprint output is shown when streaming is not active."""
 425          agent = self._make_agent_with_stream(stream_active=False)
 426          agent._vprint("should be shown")
 427          captured = capsys.readouterr()
 428          assert "should be shown" in captured.out
 429  
 430      def test_force_shown_during_streaming(self, capsys):
 431          """force=True bypasses the streaming suppression."""
 432          agent = self._make_agent_with_stream(stream_active=True)
 433          agent._vprint("critical error!", force=True)
 434          captured = capsys.readouterr()
 435          assert "critical error!" in captured.out
 436  
 437      def test_force_shown_when_not_streaming(self, capsys):
 438          """force=True works normally when not streaming (no regression)."""
 439          agent = self._make_agent_with_stream(stream_active=False)
 440          agent._vprint("normal message", force=True)
 441          captured = capsys.readouterr()
 442          assert "normal message" in captured.out
 443  
 444      def test_error_messages_use_force_in_run_agent(self):
 445          """Verify that critical error _vprint calls in run_agent.py
 446          include force=True."""
 447          with open("run_agent.py", "r") as f:
 448              source = f.read()
 449  
 450          tree = ast.parse(source)
 451  
 452          forced_error_count = 0
 453          unforced_error_count = 0
 454  
 455          for node in ast.walk(tree):
 456              if not isinstance(node, ast.Call):
 457                  continue
 458              func = node.func
 459              if not (isinstance(func, ast.Attribute) and func.attr == "_vprint"):
 460                  continue
 461              has_fatal = False
 462              for arg in node.args:
 463                  if isinstance(arg, ast.JoinedStr):
 464                      for val in arg.values:
 465                          if isinstance(val, ast.Constant) and isinstance(val.value, str):
 466                              if "\u274c" in val.value:
 467                                  has_fatal = True
 468                                  break
 469  
 470              if not has_fatal:
 471                  continue
 472  
 473              has_force = any(
 474                  kw.arg == "force"
 475                  and isinstance(kw.value, ast.Constant)
 476                  and kw.value.value is True
 477                  for kw in node.keywords
 478              )
 479  
 480              if has_force:
 481                  forced_error_count += 1
 482              else:
 483                  unforced_error_count += 1
 484  
 485          assert forced_error_count > 0, \
 486              "Expected at least one _vprint with force=True for error messages"
 487          assert unforced_error_count == 0, \
 488              f"Found {unforced_error_count} critical error _vprint calls without force=True"
 489  
 490  
 491  # ============================================================================
 492  # Bug fix regression tests
 493  # ============================================================================
 494  
 495  class TestEdgeTTSLazyImport:
 496      """Bug #3: _generate_edge_tts must use lazy import, not bare module name."""
 497  
 498      def test_generate_edge_tts_calls_lazy_import(self):
 499          """AST check: _generate_edge_tts must call _import_edge_tts(), not
 500          reference bare 'edge_tts' module name."""
 501          import ast as _ast
 502  
 503          with open("tools/tts_tool.py") as f:
 504              tree = _ast.parse(f.read())
 505  
 506          for node in _ast.walk(tree):
 507              if isinstance(node, _ast.AsyncFunctionDef) and node.name == "_generate_edge_tts":
 508                  # Collect all Name references (bare identifiers)
 509                  bare_refs = [
 510                      n.id for n in _ast.walk(node)
 511                      if isinstance(n, _ast.Name) and n.id == "edge_tts"
 512                  ]
 513                  assert bare_refs == [], (
 514                      f"_generate_edge_tts uses bare 'edge_tts' name — "
 515                      f"should use _import_edge_tts() lazy helper"
 516                  )
 517  
 518                  # Must have a call to _import_edge_tts
 519                  lazy_calls = [
 520                      n for n in _ast.walk(node)
 521                      if isinstance(n, _ast.Call)
 522                      and isinstance(n.func, _ast.Name)
 523                      and n.func.id == "_import_edge_tts"
 524                  ]
 525                  assert len(lazy_calls) >= 1, (
 526                      "_generate_edge_tts must call _import_edge_tts()"
 527                  )
 528                  break
 529          else:
 530              pytest.fail("_generate_edge_tts not found in tts_tool.py")
 531  
 532  
 533  class TestStreamingTTSOutputStreamCleanup:
 534      """Bug #7: output_stream must be closed in finally block."""
 535  
 536      def test_output_stream_closed_in_finally(self):
 537          """AST check: stream_tts_to_speaker's finally block must close
 538          output_stream even on exception."""
 539          import ast as _ast
 540  
 541          with open("tools/tts_tool.py") as f:
 542              tree = _ast.parse(f.read())
 543  
 544          for node in _ast.walk(tree):
 545              if isinstance(node, _ast.FunctionDef) and node.name == "stream_tts_to_speaker":
 546                  # Find the outermost try that has a finally with tts_done_event.set()
 547                  for child in _ast.walk(node):
 548                      if isinstance(child, _ast.Try) and child.finalbody:
 549                          finally_text = "\n".join(
 550                              _ast.dump(n) for n in child.finalbody
 551                          )
 552                          if "tts_done_event" in finally_text:
 553                              assert "output_stream" in finally_text, (
 554                                  "finally block must close output_stream"
 555                              )
 556                              return
 557                  pytest.fail("No finally block with tts_done_event found")
 558  
 559  
 560  class TestCtrlCResetsContinuousMode:
 561      """Bug #4: Ctrl+C cancel must reset _voice_continuous."""
 562  
 563      def test_ctrl_c_handler_resets_voice_continuous(self):
 564          """Source check: Ctrl+C voice cancel block must set
 565          _voice_continuous = False."""
 566          with open("cli.py") as f:
 567              source = f.read()
 568  
 569          # Find the Ctrl+C handler's voice cancel block
 570          lines = source.split("\n")
 571          in_cancel_block = False
 572          found_continuous_reset = False
 573          for i, line in enumerate(lines):
 574              if "Cancel active voice recording" in line:
 575                  in_cancel_block = True
 576              if in_cancel_block:
 577                  if "_voice_continuous = False" in line:
 578                      found_continuous_reset = True
 579                      break
 580                  # Block ends at next comment section or return
 581                  if "return" in line and in_cancel_block:
 582                      break
 583  
 584          assert found_continuous_reset, (
 585              "Ctrl+C voice cancel block must set _voice_continuous = False"
 586          )
 587  
 588  
 589  class TestDisableVoiceModeStopsTTS:
 590      """Bug #5: _disable_voice_mode must stop active TTS playback."""
 591  
 592      def test_disable_voice_mode_calls_stop_playback(self):
 593          """Source check: _disable_voice_mode must call stop_playback()."""
 594          import inspect
 595          from cli import HermesCLI
 596  
 597          source = inspect.getsource(HermesCLI._disable_voice_mode)
 598          assert "stop_playback" in source, (
 599              "_disable_voice_mode must call stop_playback()"
 600          )
 601          assert "_voice_tts_done.set()" in source, (
 602              "_disable_voice_mode must set _voice_tts_done"
 603          )
 604  
 605  
 606  class TestVoiceStatusUsesConfigKey:
 607      """Bug #8: voice-related UI surfaces must read record key from config."""
 608  
 609      def test_show_voice_status_not_hardcoded(self):
 610          """Source check: _show_voice_status must not hardcode Ctrl+B."""
 611          with open("cli.py") as f:
 612              source = f.read()
 613  
 614          lines = source.split("\n")
 615          in_method = False
 616          for line in lines:
 617              if "def _show_voice_status" in line:
 618                  in_method = True
 619              elif in_method and line.strip().startswith("def "):
 620                  break
 621              elif in_method:
 622                  assert 'Record key: Ctrl+B"' not in line, (
 623                      "_show_voice_status hardcodes 'Ctrl+B' — "
 624                      "should read from config"
 625                  )
 626  
 627      def test_show_voice_status_reads_config(self):
 628          """Source check: _show_voice_status must use load_config()."""
 629          with open("cli.py") as f:
 630              source = f.read()
 631  
 632          lines = source.split("\n")
 633          in_method = False
 634          method_lines = []
 635          for line in lines:
 636              if "def _show_voice_status" in line:
 637                  in_method = True
 638              elif in_method and line.strip().startswith("def "):
 639                  break
 640              elif in_method:
 641                  method_lines.append(line)
 642  
 643          method_source = "\n".join(method_lines)
 644          assert "load_config" in method_source or "record_key" in method_source, (
 645              "_show_voice_status should read record_key from config"
 646          )
 647  
 648      def test_tui_voice_status_not_hardcoded(self):
 649          """Source check: TUI voice status fragments must not hardcode Ctrl+B."""
 650          with open("cli.py") as f:
 651              source = f.read()
 652  
 653          lines = source.split("\n")
 654          in_method = False
 655          for line in lines:
 656              if "def _get_voice_status_fragments" in line:
 657                  in_method = True
 658              elif in_method and line.strip().startswith("def "):
 659                  break
 660              elif in_method:
 661                  assert 'Ctrl+B to stop' not in line
 662                  assert 'Ctrl+B to record' not in line
 663                  assert '" 🎤 Ctrl+B "' not in line
 664  
 665      def test_placeholder_not_hardcoded(self):
 666          """Source check: prompt placeholder must not hardcode Ctrl+B."""
 667          with open("cli.py") as f:
 668              source = f.read()
 669  
 670          assert 'return "type or Ctrl+B to record"' not in source
 671          assert 'return "recording... Ctrl+B to stop, Ctrl+C to cancel"' not in source
 672  
 673      def test_external_editor_binding_falls_back_when_voice_uses_ctrl_g(self):
 674          """Source check: Ctrl+G must not be claimed by both voice and editor."""
 675          with open("cli.py") as f:
 676              source = f.read()
 677  
 678          assert '_primary_editor_binding = ("c-x", "c-e") if _editor_voice_key == "c-g" else ("c-g",)' in source
 679          assert "@kb.add(*_primary_editor_binding, filter=_editor_filter)" in source
 680  
 681      def test_voice_binding_is_eager(self):
 682          """Source check: configured voice key must register eagerly to beat default bindings."""
 683          with open("cli.py") as f:
 684              source = f.read()
 685  
 686          assert '@kb.add(_voice_key, eager=True)' in source
 687  
 688  
 689  class TestChatTTSCleanupOnException:
 690      """Bug #2: chat() must clean up streaming TTS resources on exception."""
 691  
 692      def test_chat_has_finally_for_tts_cleanup(self):
 693          """AST check: chat() method must have a finally block that cleans up
 694          text_queue, stop_event, and tts_thread."""
 695          import ast as _ast
 696  
 697          with open("cli.py") as f:
 698              tree = _ast.parse(f.read())
 699  
 700          for node in _ast.walk(tree):
 701              if isinstance(node, _ast.FunctionDef) and node.name == "chat":
 702                  # Find Try nodes with finally blocks
 703                  for child in _ast.walk(node):
 704                      if isinstance(child, _ast.Try) and child.finalbody:
 705                          finally_text = "\n".join(
 706                              _ast.dump(n) for n in child.finalbody
 707                          )
 708                          if "text_queue" in finally_text:
 709                              assert "stop_event" in finally_text, (
 710                                  "finally must also handle stop_event"
 711                              )
 712                              assert "tts_thread" in finally_text, (
 713                                  "finally must also handle tts_thread"
 714                              )
 715                              return
 716                  pytest.fail(
 717                      "chat() must have a finally block cleaning up "
 718                      "text_queue/stop_event/tts_thread"
 719                  )
 720  
 721  
 722  class TestBrowserToolSignalHandlerRemoved:
 723      """browser_tool.py must NOT register SIGINT/SIGTERM handlers that call
 724      sys.exit() — this conflicts with prompt_toolkit's event loop and causes
 725      the process to become unkillable during voice mode."""
 726  
 727      def test_no_signal_handler_registration(self):
 728          """Source check: browser_tool.py must not call signal.signal()
 729          for SIGINT or SIGTERM."""
 730          with open("tools/browser_tool.py") as f:
 731              source = f.read()
 732  
 733          lines = source.split("\n")
 734          for i, line in enumerate(lines, 1):
 735              stripped = line.strip()
 736              # Skip comments
 737              if stripped.startswith("#"):
 738                  continue
 739              assert "signal.signal(signal.SIGINT" not in stripped, (
 740                  f"browser_tool.py:{i} registers SIGINT handler — "
 741                  f"use atexit instead to avoid prompt_toolkit conflicts"
 742              )
 743              assert "signal.signal(signal.SIGTERM" not in stripped, (
 744                  f"browser_tool.py:{i} registers SIGTERM handler — "
 745                  f"use atexit instead to avoid prompt_toolkit conflicts"
 746              )
 747  
 748  
 749  class TestKeyHandlerNeverBlocks:
 750      """The Ctrl+B key handler runs in prompt_toolkit's event-loop thread.
 751      Any blocking call freezes the entire UI.  Verify that:
 752      1. _voice_start_recording is NOT called directly (must be in daemon thread)
 753      2. _voice_processing guard prevents starting while stop/transcribe runs
 754      3. _voice_processing is set atomically with _voice_recording in stop_and_transcribe
 755      """
 756  
 757      def test_start_recording_not_called_directly_in_handler(self):
 758          """AST check: handle_voice_record must NOT call _voice_start_recording()
 759          directly — it must wrap it in a Thread to avoid blocking the UI."""
 760          import ast as _ast
 761  
 762          with open("cli.py") as f:
 763              tree = _ast.parse(f.read())
 764  
 765          for node in _ast.walk(tree):
 766              if isinstance(node, _ast.FunctionDef) and node.name == "handle_voice_record":
 767                  # Collect all direct calls to _voice_start_recording in this function.
 768                  # They should ONLY appear inside a nested def (the _start_recording wrapper).
 769                  for child in _ast.iter_child_nodes(node):
 770                      # Direct statements in the handler body (not nested defs)
 771                      if isinstance(child, _ast.Expr) and isinstance(child.value, _ast.Call):
 772                          call_src = _ast.dump(child.value)
 773                          assert "_voice_start_recording" not in call_src, (
 774                              "handle_voice_record calls _voice_start_recording directly "
 775                              "— must dispatch to a daemon thread"
 776                          )
 777                  break
 778  
 779      def test_processing_guard_in_start_path(self):
 780          """Source check: key handler must check _voice_processing before
 781          starting a new recording."""
 782          with open("cli.py") as f:
 783              source = f.read()
 784  
 785          lines = source.split("\n")
 786          in_handler = False
 787          in_else = False
 788          found_guard = False
 789          for line in lines:
 790              if "def handle_voice_record" in line:
 791                  in_handler = True
 792              elif in_handler and line.strip().startswith("def ") and "_start_recording" not in line:
 793                  break
 794              elif in_handler and "else:" in line:
 795                  in_else = True
 796              elif in_else and "_voice_processing" in line:
 797                  found_guard = True
 798                  break
 799  
 800          assert found_guard, (
 801              "Key handler START path must guard against _voice_processing "
 802              "to prevent blocking on AudioRecorder._lock"
 803          )
 804  
 805      def test_processing_set_atomically_with_recording_false(self):
 806          """Source check: _voice_stop_and_transcribe must set _voice_processing = True
 807          in the same lock block where it sets _voice_recording = False."""
 808          with open("cli.py") as f:
 809              source = f.read()
 810  
 811          lines = source.split("\n")
 812          in_method = False
 813          in_first_lock = False
 814          found_recording_false = False
 815          found_processing_true = False
 816          for line in lines:
 817              if "def _voice_stop_and_transcribe" in line:
 818                  in_method = True
 819              elif in_method and "with self._voice_lock:" in line and not in_first_lock:
 820                  in_first_lock = True
 821              elif in_first_lock:
 822                  stripped = line.strip()
 823                  if not stripped or stripped.startswith("#"):
 824                      continue
 825                  if "_voice_recording = False" in stripped:
 826                      found_recording_false = True
 827                  if "_voice_processing = True" in stripped:
 828                      found_processing_true = True
 829                  # End of with block (dedent)
 830                  if stripped and not line.startswith("            ") and not line.startswith("\t\t\t"):
 831                      break
 832  
 833          assert found_recording_false and found_processing_true, (
 834              "_voice_stop_and_transcribe must set _voice_processing = True "
 835              "atomically (same lock block) with _voice_recording = False"
 836          )
 837  
 838  
 839  # ============================================================================
 840  # Real behavior tests — CLI voice methods via _make_voice_cli()
 841  # ============================================================================
 842  
 843  class TestHandleVoiceCommandReal:
 844      """Tests _handle_voice_command routing with real CLI instance."""
 845  
 846      def _cli(self):
 847          cli = _make_voice_cli()
 848          cli._enable_voice_mode = MagicMock()
 849          cli._disable_voice_mode = MagicMock()
 850          cli._toggle_voice_tts = MagicMock()
 851          cli._show_voice_status = MagicMock()
 852          return cli
 853  
 854      @patch("cli._cprint")
 855      def test_on_calls_enable(self, _cp):
 856          cli = self._cli()
 857          cli._handle_voice_command("/voice on")
 858          cli._enable_voice_mode.assert_called_once()
 859  
 860      @patch("cli._cprint")
 861      def test_off_calls_disable(self, _cp):
 862          cli = self._cli()
 863          cli._handle_voice_command("/voice off")
 864          cli._disable_voice_mode.assert_called_once()
 865  
 866      @patch("cli._cprint")
 867      def test_tts_calls_toggle(self, _cp):
 868          cli = self._cli()
 869          cli._handle_voice_command("/voice tts")
 870          cli._toggle_voice_tts.assert_called_once()
 871  
 872      @patch("cli._cprint")
 873      def test_status_calls_show(self, _cp):
 874          cli = self._cli()
 875          cli._handle_voice_command("/voice status")
 876          cli._show_voice_status.assert_called_once()
 877  
 878      @patch("cli._cprint")
 879      def test_toggle_off_when_enabled(self, _cp):
 880          cli = self._cli()
 881          cli._voice_mode = True
 882          cli._handle_voice_command("/voice")
 883          cli._disable_voice_mode.assert_called_once()
 884  
 885      @patch("cli._cprint")
 886      def test_toggle_on_when_disabled(self, _cp):
 887          cli = self._cli()
 888          cli._voice_mode = False
 889          cli._handle_voice_command("/voice")
 890          cli._enable_voice_mode.assert_called_once()
 891  
 892      @patch("cli._cprint")
 893      def test_unknown_subcommand(self, mock_cp):
 894          cli = self._cli()
 895          cli._handle_voice_command("/voice foobar")
 896          cli._enable_voice_mode.assert_not_called()
 897          cli._disable_voice_mode.assert_not_called()
 898          # Should print usage via _cprint
 899          assert any("Unknown" in str(c) or "unknown" in str(c)
 900                      for c in mock_cp.call_args_list)
 901  
 902  
 903  class TestEnableVoiceModeReal:
 904      """Tests _enable_voice_mode with real CLI instance."""
 905  
 906      @patch("cli._cprint")
 907      @patch("hermes_cli.config.load_config", return_value={"voice": {}})
 908      @patch("tools.voice_mode.check_voice_requirements",
 909             return_value={"available": True, "details": "OK"})
 910      @patch("tools.voice_mode.detect_audio_environment",
 911             return_value={"available": True, "warnings": []})
 912      def test_success_sets_voice_mode(self, _env, _req, _cfg, _cp):
 913          cli = _make_voice_cli()
 914          cli._enable_voice_mode()
 915          assert cli._voice_mode is True
 916  
 917      @patch("cli._cprint")
 918      def test_already_enabled_noop(self, _cp):
 919          cli = _make_voice_cli(_voice_mode=True)
 920          cli._enable_voice_mode()
 921          assert cli._voice_mode is True
 922  
 923      @patch("cli._cprint")
 924      @patch("tools.voice_mode.detect_audio_environment",
 925             return_value={"available": False, "warnings": ["SSH session"]})
 926      def test_env_check_fails(self, _env, _cp):
 927          cli = _make_voice_cli()
 928          cli._enable_voice_mode()
 929          assert cli._voice_mode is False
 930  
 931      @patch("cli._cprint")
 932      @patch("tools.voice_mode.check_voice_requirements",
 933             return_value={"available": False, "details": "Missing",
 934                           "missing_packages": ["sounddevice"]})
 935      @patch("tools.voice_mode.detect_audio_environment",
 936             return_value={"available": True, "warnings": []})
 937      def test_requirements_fail(self, _env, _req, _cp):
 938          cli = _make_voice_cli()
 939          cli._enable_voice_mode()
 940          assert cli._voice_mode is False
 941  
 942      @patch("cli._cprint")
 943      @patch("hermes_cli.config.load_config", return_value={"voice": {"auto_tts": True}})
 944      @patch("tools.voice_mode.check_voice_requirements",
 945             return_value={"available": True, "details": "OK"})
 946      @patch("tools.voice_mode.detect_audio_environment",
 947             return_value={"available": True, "warnings": []})
 948      def test_auto_tts_from_config(self, _env, _req, _cfg, _cp):
 949          cli = _make_voice_cli()
 950          cli._enable_voice_mode()
 951          assert cli._voice_tts is True
 952  
 953      @patch("cli._cprint")
 954      @patch("hermes_cli.config.load_config", return_value={"voice": {}})
 955      @patch("tools.voice_mode.check_voice_requirements",
 956             return_value={"available": True, "details": "OK"})
 957      @patch("tools.voice_mode.detect_audio_environment",
 958             return_value={"available": True, "warnings": []})
 959      def test_no_auto_tts_default(self, _env, _req, _cfg, _cp):
 960          cli = _make_voice_cli()
 961          cli._enable_voice_mode()
 962          assert cli._voice_tts is False
 963  
 964      @patch("cli._cprint")
 965      @patch("hermes_cli.config.load_config", side_effect=Exception("broken config"))
 966      @patch("tools.voice_mode.check_voice_requirements",
 967             return_value={"available": True, "details": "OK"})
 968      @patch("tools.voice_mode.detect_audio_environment",
 969             return_value={"available": True, "warnings": []})
 970      def test_config_exception_still_enables(self, _env, _req, _cfg, _cp):
 971          cli = _make_voice_cli()
 972          cli._enable_voice_mode()
 973          assert cli._voice_mode is True
 974  
 975  
 976  class TestVoiceBeepConfigReal:
 977      """Tests the CLI voice beep toggle."""
 978  
 979      @patch("hermes_cli.config.load_config", return_value={"voice": {}})
 980      def test_beeps_enabled_by_default(self, _cfg):
 981          cli = _make_voice_cli()
 982          assert cli._voice_beeps_enabled() is True
 983  
 984      @patch("hermes_cli.config.load_config", return_value={"voice": {"beep_enabled": False}})
 985      def test_beeps_can_be_disabled(self, _cfg):
 986          cli = _make_voice_cli()
 987          assert cli._voice_beeps_enabled() is False
 988  
 989      @patch("cli._cprint")
 990      @patch("cli.threading.Thread")
 991      @patch("tools.voice_mode.play_beep")
 992      @patch("tools.voice_mode.create_audio_recorder")
 993      @patch(
 994          "tools.voice_mode.check_voice_requirements",
 995          return_value={
 996              "available": True,
 997              "audio_available": True,
 998              "stt_available": True,
 999              "details": "OK",
1000              "missing_packages": [],
1001          },
1002      )
1003      @patch(
1004          "hermes_cli.config.load_config",
1005          return_value={
1006              "voice": {
1007                  "beep_enabled": False,
1008                  "silence_threshold": 200,
1009                  "silence_duration": 3.0,
1010              }
1011          },
1012      )
1013      def test_start_recording_skips_beep_when_disabled(
1014          self, _cfg, _req, mock_create, mock_beep, mock_thread, _cp
1015      ):
1016          recorder = MagicMock()
1017          recorder.supports_silence_autostop = True
1018          mock_create.return_value = recorder
1019          mock_thread.return_value = MagicMock(start=MagicMock())
1020  
1021          cli = _make_voice_cli()
1022          cli._voice_start_recording()
1023  
1024          recorder.start.assert_called_once()
1025          mock_beep.assert_not_called()
1026  
1027  
1028  class TestDisableVoiceModeReal:
1029      """Tests _disable_voice_mode with real CLI instance."""
1030  
1031      @patch("cli._cprint")
1032      @patch("tools.voice_mode.stop_playback")
1033      def test_all_flags_reset(self, _sp, _cp):
1034          cli = _make_voice_cli(_voice_mode=True, _voice_tts=True,
1035                                _voice_continuous=True)
1036          cli._disable_voice_mode()
1037          assert cli._voice_mode is False
1038          assert cli._voice_tts is False
1039          assert cli._voice_continuous is False
1040  
1041      @patch("cli._cprint")
1042      @patch("tools.voice_mode.stop_playback")
1043      def test_active_recording_cancelled(self, _sp, _cp):
1044          recorder = MagicMock()
1045          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1046          cli._disable_voice_mode()
1047          recorder.cancel.assert_called_once()
1048          assert cli._voice_recording is False
1049  
1050      @patch("cli._cprint")
1051      @patch("tools.voice_mode.stop_playback")
1052      def test_stop_playback_called(self, mock_sp, _cp):
1053          cli = _make_voice_cli()
1054          cli._disable_voice_mode()
1055          mock_sp.assert_called_once()
1056  
1057      @patch("cli._cprint")
1058      @patch("tools.voice_mode.stop_playback")
1059      def test_tts_done_event_set(self, _sp, _cp):
1060          cli = _make_voice_cli()
1061          cli._voice_tts_done.clear()
1062          cli._disable_voice_mode()
1063          assert cli._voice_tts_done.is_set()
1064  
1065      @patch("cli._cprint")
1066      @patch("tools.voice_mode.stop_playback")
1067      def test_no_recorder_no_crash(self, _sp, _cp):
1068          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=None)
1069          cli._disable_voice_mode()
1070          assert cli._voice_mode is False
1071  
1072      @patch("cli._cprint")
1073      @patch("tools.voice_mode.stop_playback", side_effect=RuntimeError("boom"))
1074      def test_stop_playback_exception_swallowed(self, _sp, _cp):
1075          cli = _make_voice_cli(_voice_mode=True)
1076          cli._disable_voice_mode()
1077          assert cli._voice_mode is False
1078  
1079  
1080  class TestVoiceSpeakResponseReal:
1081      """Tests _voice_speak_response with real CLI instance."""
1082  
1083      def test_async_scheduling_clears_done_before_thread_start(self):
1084          cli = _make_voice_cli(_voice_tts=True)
1085          starts = []
1086  
1087          class FakeThread:
1088              def __init__(self, target=None, args=(), daemon=None):
1089                  self.target = target
1090                  self.args = args
1091                  self.daemon = daemon
1092  
1093              def start(self):
1094                  starts.append(cli._voice_tts_done.is_set())
1095  
1096          with patch("cli.threading.Thread", FakeThread):
1097              cli._voice_speak_response_async("Hello")
1098  
1099          assert starts == [False]
1100          assert not cli._voice_tts_done.is_set()
1101  
1102      @patch("cli._cprint")
1103      def test_early_return_when_tts_off(self, _cp):
1104          cli = _make_voice_cli(_voice_tts=False)
1105          with patch("tools.tts_tool.text_to_speech_tool") as mock_tts:
1106              cli._voice_speak_response("Hello")
1107              mock_tts.assert_not_called()
1108  
1109      @patch("cli._cprint")
1110      @patch("cli.os.unlink")
1111      @patch("cli.os.path.getsize", return_value=1000)
1112      @patch("cli.os.path.isfile", return_value=True)
1113      @patch("cli.os.makedirs")
1114      @patch("tools.voice_mode.play_audio_file")
1115      @patch("tools.tts_tool.text_to_speech_tool", return_value='{"success": true}')
1116      def test_markdown_stripped(self, mock_tts, _play, _mkd, _isf, _gsz, _unl, _cp):
1117          cli = _make_voice_cli(_voice_tts=True)
1118          cli._voice_speak_response("## Title\n**bold** and `code`")
1119          call_text = mock_tts.call_args.kwargs["text"]
1120          assert "##" not in call_text
1121          assert "**" not in call_text
1122          assert "`" not in call_text
1123  
1124      @patch("cli._cprint")
1125      @patch("cli.os.makedirs")
1126      @patch("tools.tts_tool.text_to_speech_tool", return_value='{"success": true}')
1127      def test_code_blocks_removed(self, mock_tts, _mkd, _cp):
1128          cli = _make_voice_cli(_voice_tts=True)
1129          cli._voice_speak_response("```python\nprint('hi')\n```\nSome text")
1130          call_text = mock_tts.call_args.kwargs["text"]
1131          assert "print" not in call_text
1132          assert "```" not in call_text
1133          assert "Some text" in call_text
1134  
1135      @patch("cli._cprint")
1136      @patch("cli.os.makedirs")
1137      def test_empty_after_strip_returns_early(self, _mkd, _cp):
1138          cli = _make_voice_cli(_voice_tts=True)
1139          with patch("tools.tts_tool.text_to_speech_tool") as mock_tts:
1140              cli._voice_speak_response("```python\nprint('hi')\n```")
1141              mock_tts.assert_not_called()
1142  
1143      @patch("cli._cprint")
1144      @patch("cli.os.makedirs")
1145      @patch("tools.tts_tool.text_to_speech_tool", return_value='{"success": true}')
1146      def test_long_text_truncated(self, mock_tts, _mkd, _cp):
1147          cli = _make_voice_cli(_voice_tts=True)
1148          cli._voice_speak_response("A" * 5000)
1149          call_text = mock_tts.call_args.kwargs["text"]
1150          assert len(call_text) <= 4000
1151  
1152      @patch("cli._cprint")
1153      @patch("cli.os.makedirs")
1154      @patch("tools.tts_tool.text_to_speech_tool", side_effect=RuntimeError("tts fail"))
1155      def test_exception_sets_done_event(self, _tts, _mkd, _cp):
1156          cli = _make_voice_cli(_voice_tts=True)
1157          cli._voice_tts_done.clear()
1158          cli._voice_speak_response("Hello")
1159          assert cli._voice_tts_done.is_set()
1160  
1161      @patch("cli._cprint")
1162      @patch("cli.os.unlink")
1163      @patch("cli.os.path.getsize", return_value=1000)
1164      @patch("cli.os.path.isfile", return_value=True)
1165      @patch("cli.os.makedirs")
1166      @patch("tools.voice_mode.play_audio_file")
1167      @patch("tools.tts_tool.text_to_speech_tool", return_value='{"success": true}')
1168      def test_play_audio_called(self, _tts, mock_play, _mkd, _isf, _gsz, _unl, _cp):
1169          cli = _make_voice_cli(_voice_tts=True)
1170          cli._voice_speak_response("Hello world")
1171          mock_play.assert_called_once()
1172  
1173  
1174  class TestVoiceStopAndTranscribeReal:
1175      """Tests _voice_stop_and_transcribe with real CLI instance."""
1176  
1177      @patch("cli._cprint")
1178      def test_guard_not_recording(self, _cp):
1179          cli = _make_voice_cli(_voice_recording=False)
1180          with patch("tools.voice_mode.transcribe_recording") as mock_tr:
1181              cli._voice_stop_and_transcribe()
1182              mock_tr.assert_not_called()
1183  
1184      @patch("cli._cprint")
1185      def test_no_recorder_returns_early(self, _cp):
1186          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=None)
1187          with patch("tools.voice_mode.transcribe_recording") as mock_tr:
1188              cli._voice_stop_and_transcribe()
1189              mock_tr.assert_not_called()
1190          assert cli._voice_recording is False
1191  
1192      @patch("cli._cprint")
1193      @patch("tools.voice_mode.play_beep")
1194      def test_no_speech_detected(self, _beep, _cp):
1195          recorder = MagicMock()
1196          recorder.stop.return_value = None
1197          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1198          cli._voice_stop_and_transcribe()
1199          assert cli._pending_input.empty()
1200  
1201      @patch("cli._cprint")
1202      @patch("hermes_cli.config.load_config", return_value={"voice": {"beep_enabled": False}})
1203      @patch("tools.voice_mode.play_beep")
1204      def test_no_speech_detected_skips_beep_when_disabled(self, mock_beep, _cfg, _cp):
1205          recorder = MagicMock()
1206          recorder.stop.return_value = None
1207          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1208          cli._voice_stop_and_transcribe()
1209          mock_beep.assert_not_called()
1210  
1211      @patch("cli._cprint")
1212      @patch("cli.os.unlink")
1213      @patch("cli.os.path.isfile", return_value=True)
1214      @patch("hermes_cli.config.load_config", return_value={"stt": {}})
1215      @patch("tools.voice_mode.transcribe_recording",
1216             return_value={"success": True, "transcript": "hello world"})
1217      @patch("tools.voice_mode.play_beep")
1218      def test_successful_transcription_queues_input(
1219          self, _beep, _tr, _cfg, _isf, _unl, _cp
1220      ):
1221          recorder = MagicMock()
1222          recorder.stop.return_value = "/tmp/test.wav"
1223          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1224          cli._voice_stop_and_transcribe()
1225          assert cli._pending_input.get_nowait() == "hello world"
1226  
1227      @patch("cli._cprint")
1228      @patch("cli.os.unlink")
1229      @patch("cli.os.path.isfile", return_value=True)
1230      @patch("hermes_cli.config.load_config", return_value={"stt": {}})
1231      @patch("tools.voice_mode.transcribe_recording",
1232             return_value={"success": True, "transcript": ""})
1233      @patch("tools.voice_mode.play_beep")
1234      def test_empty_transcript_not_queued(self, _beep, _tr, _cfg, _isf, _unl, _cp):
1235          recorder = MagicMock()
1236          recorder.stop.return_value = "/tmp/test.wav"
1237          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1238          cli._voice_stop_and_transcribe()
1239          assert cli._pending_input.empty()
1240  
1241      @patch("cli._cprint")
1242      @patch("cli.os.unlink")
1243      @patch("cli.os.path.isfile", return_value=True)
1244      @patch("hermes_cli.config.load_config", return_value={"stt": {}})
1245      @patch("tools.voice_mode.transcribe_recording",
1246             return_value={"success": False, "error": "API timeout"})
1247      @patch("tools.voice_mode.play_beep")
1248      def test_transcription_failure(self, _beep, _tr, _cfg, _isf, _unl, _cp):
1249          recorder = MagicMock()
1250          recorder.stop.return_value = "/tmp/test.wav"
1251          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1252          cli._voice_stop_and_transcribe()
1253          assert cli._pending_input.empty()
1254  
1255      @patch("cli._cprint")
1256      @patch("cli.os.unlink")
1257      @patch("cli.os.path.isfile", return_value=True)
1258      @patch("hermes_cli.config.load_config", return_value={"stt": {}})
1259      @patch("tools.voice_mode.transcribe_recording",
1260             side_effect=ConnectionError("network"))
1261      @patch("tools.voice_mode.play_beep")
1262      def test_exception_caught(self, _beep, _tr, _cfg, _isf, _unl, _cp):
1263          recorder = MagicMock()
1264          recorder.stop.return_value = "/tmp/test.wav"
1265          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1266          cli._voice_stop_and_transcribe()  # Should not raise
1267  
1268      @patch("cli._cprint")
1269      @patch("tools.voice_mode.play_beep")
1270      def test_processing_flag_cleared(self, _beep, _cp):
1271          recorder = MagicMock()
1272          recorder.stop.return_value = None
1273          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1274          cli._voice_stop_and_transcribe()
1275          assert cli._voice_processing is False
1276  
1277      @patch("cli._cprint")
1278      @patch("tools.voice_mode.play_beep")
1279      def test_continuous_restarts_on_no_speech(self, _beep, _cp):
1280          recorder = MagicMock()
1281          recorder.stop.return_value = None
1282          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder,
1283                                _voice_continuous=True)
1284          cli._voice_start_recording = MagicMock()
1285          cli._voice_stop_and_transcribe()
1286          cli._voice_start_recording.assert_called_once()
1287  
1288      @patch("cli._cprint")
1289      @patch("cli.os.unlink")
1290      @patch("cli.os.path.isfile", return_value=True)
1291      @patch("hermes_cli.config.load_config", return_value={"stt": {}})
1292      @patch("tools.voice_mode.transcribe_recording",
1293             return_value={"success": True, "transcript": "hello"})
1294      @patch("tools.voice_mode.play_beep")
1295      def test_continuous_no_restart_on_success(
1296          self, _beep, _tr, _cfg, _isf, _unl, _cp
1297      ):
1298          recorder = MagicMock()
1299          recorder.stop.return_value = "/tmp/test.wav"
1300          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder,
1301                                _voice_continuous=True)
1302          cli._voice_start_recording = MagicMock()
1303          cli._voice_stop_and_transcribe()
1304          cli._voice_start_recording.assert_not_called()
1305  
1306      @patch("cli._cprint")
1307      @patch("cli.os.unlink")
1308      @patch("cli.os.path.isfile", return_value=True)
1309      @patch("hermes_cli.config.load_config", return_value={"stt": {"model": "whisper-large-v3"}})
1310      @patch("tools.voice_mode.transcribe_recording",
1311             return_value={"success": True, "transcript": "hi"})
1312      @patch("tools.voice_mode.play_beep")
1313      def test_stt_model_from_config(self, _beep, mock_tr, _cfg, _isf, _unl, _cp):
1314          recorder = MagicMock()
1315          recorder.stop.return_value = "/tmp/test.wav"
1316          cli = _make_voice_cli(_voice_recording=True, _voice_recorder=recorder)
1317          cli._voice_stop_and_transcribe()
1318          mock_tr.assert_called_once_with("/tmp/test.wav", model="whisper-large-v3")
1319  
1320  
1321  # ---------------------------------------------------------------------------
1322  # Bugfix: _refresh_level must read _voice_recording under lock
1323  # ---------------------------------------------------------------------------
1324  
1325  
1326  class TestRefreshLevelLock:
1327      """Bug: _refresh_level thread read _voice_recording without lock."""
1328  
1329      def test_refresh_stops_when_recording_false(self):
1330          import threading, time
1331  
1332          lock = threading.Lock()
1333          recording = True
1334          iterations = 0
1335  
1336          def refresh_level():
1337              nonlocal iterations
1338              while True:
1339                  with lock:
1340                      still = recording
1341                  if not still:
1342                      break
1343                  iterations += 1
1344                  time.sleep(0.01)
1345  
1346          t = threading.Thread(target=refresh_level, daemon=True)
1347          t.start()
1348  
1349          time.sleep(0.05)
1350          with lock:
1351              recording = False
1352  
1353          t.join(timeout=1)
1354          assert not t.is_alive(), "Refresh thread did not stop"
1355          assert iterations > 0, "Refresh thread never ran"