test_quick_commands.py
1 """Tests for user-defined quick commands that bypass the agent loop.""" 2 import subprocess 3 from unittest.mock import MagicMock, patch, AsyncMock 4 from rich.text import Text 5 import pytest 6 7 8 # ── CLI tests ────────────────────────────────────────────────────────────── 9 10 class TestCLIQuickCommands: 11 """Test quick command dispatch in HermesCLI.process_command.""" 12 13 @staticmethod 14 def _printed_plain(call_arg): 15 if isinstance(call_arg, Text): 16 return call_arg.plain 17 return str(call_arg) 18 19 def _make_cli(self, quick_commands): 20 from cli import HermesCLI 21 cli = HermesCLI.__new__(HermesCLI) 22 cli.config = {"quick_commands": quick_commands} 23 cli.console = MagicMock() 24 cli.agent = None 25 cli.conversation_history = [] 26 # session_id is accessed by the fallback skill/fuzzy-match path in 27 # process_command; without it, tests that exercise `/alias args` 28 # can trip an AttributeError when cross-test state leaks a skill 29 # command matching the alias target. 30 cli.session_id = "test-session" 31 return cli 32 33 def test_exec_command_runs_and_prints_output(self): 34 cli = self._make_cli({"dn": {"type": "exec", "command": "echo daily-note"}}) 35 result = cli.process_command("/dn") 36 assert result is True 37 cli.console.print.assert_called_once() 38 printed = self._printed_plain(cli.console.print.call_args[0][0]) 39 assert printed == "daily-note" 40 41 def test_exec_command_uses_chat_console_when_tui_is_live(self): 42 cli = self._make_cli({"dn": {"type": "exec", "command": "echo daily-note"}}) 43 cli._app = object() 44 live_console = MagicMock() 45 46 with patch("cli.ChatConsole", return_value=live_console): 47 result = cli.process_command("/dn") 48 49 assert result is True 50 live_console.print.assert_called_once() 51 printed = self._printed_plain(live_console.print.call_args[0][0]) 52 assert printed == "daily-note" 53 cli.console.print.assert_not_called() 54 55 def test_exec_command_stderr_shown_on_no_stdout(self): 56 cli = self._make_cli({"err": {"type": "exec", "command": "echo error >&2"}}) 57 result = cli.process_command("/err") 58 assert result is True 59 # stderr fallback — should print something 60 cli.console.print.assert_called_once() 61 62 def test_exec_command_no_output_shows_fallback(self): 63 cli = self._make_cli({"empty": {"type": "exec", "command": "true"}}) 64 cli.process_command("/empty") 65 cli.console.print.assert_called_once() 66 args = cli.console.print.call_args[0][0] 67 assert "no output" in args.lower() 68 69 def test_alias_command_routes_to_target(self): 70 """Alias quick commands rewrite to the target command.""" 71 cli = self._make_cli({"shortcut": {"type": "alias", "target": "/help"}}) 72 with patch.object(cli, "process_command", wraps=cli.process_command) as spy: 73 cli.process_command("/shortcut") 74 # Should recursively call process_command with /help 75 spy.assert_any_call("/help") 76 77 def test_alias_command_passes_args(self): 78 """Alias quick commands forward user arguments to the target.""" 79 cli = self._make_cli({"sc": {"type": "alias", "target": "/context"}}) 80 with patch.object(cli, "process_command", wraps=cli.process_command) as spy: 81 cli.process_command("/sc some args") 82 spy.assert_any_call("/context some args") 83 84 def test_alias_no_target_shows_error(self): 85 cli = self._make_cli({"broken": {"type": "alias", "target": ""}}) 86 cli.process_command("/broken") 87 cli.console.print.assert_called_once() 88 args = cli.console.print.call_args[0][0] 89 assert "no target defined" in args.lower() 90 91 def test_unsupported_type_shows_error(self): 92 cli = self._make_cli({"bad": {"type": "prompt", "command": "echo hi"}}) 93 cli.process_command("/bad") 94 cli.console.print.assert_called_once() 95 args = cli.console.print.call_args[0][0] 96 assert "unsupported type" in args.lower() 97 98 def test_missing_command_field_shows_error(self): 99 cli = self._make_cli({"oops": {"type": "exec"}}) 100 cli.process_command("/oops") 101 cli.console.print.assert_called_once() 102 args = cli.console.print.call_args[0][0] 103 assert "no command defined" in args.lower() 104 105 def test_quick_command_takes_priority_over_skill_commands(self): 106 """Quick commands must be checked before skill slash commands.""" 107 cli = self._make_cli({"mygif": {"type": "exec", "command": "echo overridden"}}) 108 with patch("cli._skill_commands", {"/mygif": {"name": "gif-search"}}): 109 cli.process_command("/mygif") 110 cli.console.print.assert_called_once() 111 printed = self._printed_plain(cli.console.print.call_args[0][0]) 112 assert printed == "overridden" 113 114 def test_unknown_command_still_shows_error(self): 115 cli = self._make_cli({}) 116 with patch("cli._cprint") as mock_cprint: 117 cli.process_command("/nonexistent") 118 mock_cprint.assert_called() 119 printed = " ".join(str(c) for c in mock_cprint.call_args_list) 120 assert "unknown command" in printed.lower() 121 122 def test_timeout_shows_error(self): 123 cli = self._make_cli({"slow": {"type": "exec", "command": "sleep 100"}}) 124 with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("sleep", 30)): 125 cli.process_command("/slow") 126 cli.console.print.assert_called_once() 127 args = cli.console.print.call_args[0][0] 128 assert "timed out" in args.lower() 129 130 131 # ── Gateway tests ────────────────────────────────────────────────────────── 132 133 class TestGatewayQuickCommands: 134 """Test quick command dispatch in GatewayRunner._handle_message.""" 135 136 def _make_event(self, command, args=""): 137 event = MagicMock() 138 event.get_command.return_value = command 139 event.get_command_args.return_value = args 140 event.text = f"/{command} {args}".strip() 141 event.source = MagicMock() 142 event.source.user_id = "test_user" 143 event.source.user_name = "Test User" 144 event.source.platform.value = "telegram" 145 event.source.chat_type = "dm" 146 event.source.chat_id = "123" 147 return event 148 149 @pytest.mark.asyncio 150 async def test_exec_command_returns_output(self): 151 from gateway.run import GatewayRunner 152 runner = GatewayRunner.__new__(GatewayRunner) 153 runner.config = {"quick_commands": {"limits": {"type": "exec", "command": "echo ok"}}} 154 runner._running_agents = {} 155 runner._pending_messages = {} 156 runner._is_user_authorized = MagicMock(return_value=True) 157 158 event = self._make_event("limits") 159 result = await runner._handle_message(event) 160 assert result == "ok" 161 162 @pytest.mark.asyncio 163 async def test_unsupported_type_returns_error(self): 164 from gateway.run import GatewayRunner 165 runner = GatewayRunner.__new__(GatewayRunner) 166 runner.config = {"quick_commands": {"bad": {"type": "prompt", "command": "echo hi"}}} 167 runner._running_agents = {} 168 runner._pending_messages = {} 169 runner._is_user_authorized = MagicMock(return_value=True) 170 171 event = self._make_event("bad") 172 result = await runner._handle_message(event) 173 assert result is not None 174 assert "unsupported type" in result.lower() 175 176 @pytest.mark.asyncio 177 async def test_timeout_returns_error(self): 178 from gateway.run import GatewayRunner 179 import asyncio 180 runner = GatewayRunner.__new__(GatewayRunner) 181 runner.config = {"quick_commands": {"slow": {"type": "exec", "command": "sleep 100"}}} 182 runner._running_agents = {} 183 runner._pending_messages = {} 184 runner._is_user_authorized = MagicMock(return_value=True) 185 186 event = self._make_event("slow") 187 with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError): 188 result = await runner._handle_message(event) 189 assert result is not None 190 assert "timed out" in result.lower() 191 192 @pytest.mark.asyncio 193 async def test_gateway_config_object_supports_quick_commands(self): 194 from gateway.config import GatewayConfig 195 from gateway.run import GatewayRunner 196 197 runner = GatewayRunner.__new__(GatewayRunner) 198 runner.config = GatewayConfig( 199 quick_commands={"limits": {"type": "exec", "command": "echo ok"}} 200 ) 201 runner._running_agents = {} 202 runner._pending_messages = {} 203 runner._is_user_authorized = MagicMock(return_value=True) 204 205 event = self._make_event("limits") 206 result = await runner._handle_message(event) 207 assert result == "ok"