/ tests / gateway / test_unknown_command.py
test_unknown_command.py
  1  """Tests for gateway warning when an unrecognized /command is dispatched.
  2  
  3  Without this warning, unknown slash commands get forwarded to the LLM as plain
  4  text, which often leads to silent failure (e.g. the model inventing a bogus
  5  delegate_task call instead of telling the user the command doesn't exist).
  6  """
  7  
  8  from datetime import datetime
  9  from types import SimpleNamespace
 10  from unittest.mock import AsyncMock, MagicMock
 11  
 12  import pytest
 13  
 14  from gateway.config import GatewayConfig, Platform, PlatformConfig
 15  from gateway.platforms.base import MessageEvent
 16  from gateway.session import SessionEntry, SessionSource, build_session_key
 17  
 18  
 19  def _make_source() -> SessionSource:
 20      return SessionSource(
 21          platform=Platform.TELEGRAM,
 22          user_id="u1",
 23          chat_id="c1",
 24          user_name="tester",
 25          chat_type="dm",
 26      )
 27  
 28  
 29  def _make_event(text: str) -> MessageEvent:
 30      return MessageEvent(text=text, source=_make_source(), message_id="m1")
 31  
 32  
 33  def _make_runner():
 34      from gateway.run import GatewayRunner
 35  
 36      runner = object.__new__(GatewayRunner)
 37      runner.config = GatewayConfig(
 38          platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")}
 39      )
 40      adapter = MagicMock()
 41      adapter.send = AsyncMock()
 42      runner.adapters = {Platform.TELEGRAM: adapter}
 43      runner._voice_mode = {}
 44      runner.hooks = SimpleNamespace(
 45          emit=AsyncMock(),
 46          emit_collect=AsyncMock(return_value=[]),
 47          loaded_hooks=False,
 48      )
 49  
 50      session_entry = SessionEntry(
 51          session_key=build_session_key(_make_source()),
 52          session_id="sess-1",
 53          created_at=datetime.now(),
 54          updated_at=datetime.now(),
 55          platform=Platform.TELEGRAM,
 56          chat_type="dm",
 57      )
 58      runner.session_store = MagicMock()
 59      runner.session_store.get_or_create_session.return_value = session_entry
 60      runner.session_store.load_transcript.return_value = []
 61      runner.session_store.has_any_sessions.return_value = True
 62      runner.session_store.append_to_transcript = MagicMock()
 63      runner.session_store.rewrite_transcript = MagicMock()
 64      runner.session_store.update_session = MagicMock()
 65      runner._running_agents = {}
 66      runner._pending_messages = {}
 67      runner._pending_approvals = {}
 68      runner._session_db = None
 69      runner._reasoning_config = None
 70      runner._provider_routing = {}
 71      runner._fallback_model = None
 72      runner._show_reasoning = False
 73      runner._is_user_authorized = lambda _source: True
 74      runner._set_session_env = lambda _context: None
 75      runner._should_send_voice_reply = lambda *_args, **_kwargs: False
 76      runner._send_voice_reply = AsyncMock()
 77      runner._capture_gateway_honcho_if_configured = lambda *args, **kwargs: None
 78      runner._emit_gateway_run_progress = AsyncMock()
 79      return runner
 80  
 81  
 82  @pytest.mark.asyncio
 83  async def test_unknown_slash_command_returns_guidance(monkeypatch):
 84      """A genuinely unknown /foobar should return user-facing guidance, not
 85      silently drop through to the LLM."""
 86      import gateway.run as gateway_run
 87  
 88      runner = _make_runner()
 89      # If the LLM were called, this would fail: the guard must short-circuit
 90      # before _run_agent is invoked.
 91      runner._run_agent = AsyncMock(
 92          side_effect=AssertionError(
 93              "unknown slash command leaked through to the agent"
 94          )
 95      )
 96  
 97      monkeypatch.setattr(
 98          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
 99      )
100  
101      result = await runner._handle_message(_make_event("/definitely-not-a-command"))
102  
103      assert result is not None
104      assert "Unknown command" in result
105      assert "/definitely-not-a-command" in result
106      assert "/commands" in result
107      runner._run_agent.assert_not_called()
108  
109  
110  @pytest.mark.asyncio
111  async def test_unknown_slash_command_underscored_form_also_guarded(monkeypatch):
112      """Telegram may send /foo_bar — same guard must trigger for underscored
113      commands that normalize to unknown hyphenated names."""
114      import gateway.run as gateway_run
115  
116      runner = _make_runner()
117      runner._run_agent = AsyncMock(
118          side_effect=AssertionError(
119              "unknown slash command leaked through to the agent"
120          )
121      )
122  
123      monkeypatch.setattr(
124          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
125      )
126  
127      result = await runner._handle_message(_make_event("/made_up_thing"))
128  
129      assert result is not None
130      assert "Unknown command" in result
131      assert "/made_up_thing" in result
132      runner._run_agent.assert_not_called()
133  
134  
135  @pytest.mark.asyncio
136  async def test_known_slash_command_not_flagged_as_unknown(monkeypatch):
137      """A real built-in like /status must NOT hit the unknown-command guard."""
138      runner = _make_runner()
139      # Make _handle_status_command exist via the normal path by running a real
140      # dispatch. If the guard fires, the return string will mention "Unknown".
141      runner._running_agents[build_session_key(_make_source())] = MagicMock()
142  
143      result = await runner._handle_message(_make_event("/status"))
144  
145      assert result is not None
146      assert "Unknown command" not in result
147  
148  
149  @pytest.mark.asyncio
150  async def test_underscored_alias_for_hyphenated_builtin_not_flagged(monkeypatch):
151      """Telegram autocomplete sends /reload_mcp for the /reload-mcp built-in.
152      That must NOT be flagged as unknown."""
153      import gateway.run as gateway_run
154  
155      runner = _make_runner()
156      # Prevent real MCP work; we only care that the unknown guard doesn't fire.
157      async def _noop_reload(*_a, **_kw):
158          return "mcp reloaded"
159  
160      runner._handle_reload_mcp_command = _noop_reload  # type: ignore[attr-defined]
161  
162      monkeypatch.setattr(
163          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
164      )
165  
166      result = await runner._handle_message(_make_event("/reload_mcp"))
167  
168      # Whatever /reload_mcp returns, it must not be the unknown-command guard.
169      if result is not None:
170          assert "Unknown command" not in result
171  
172  
173  # ------------------------------------------------------------------
174  # command:<name> decision hook — deny / handled / rewrite
175  # ------------------------------------------------------------------
176  
177  @pytest.mark.asyncio
178  async def test_command_hook_can_deny_before_dispatch(monkeypatch):
179      """A handler returning {"decision": "deny"} blocks a slash command early."""
180      import gateway.run as gateway_run
181  
182      runner = _make_runner()
183      runner._run_agent = AsyncMock(
184          side_effect=AssertionError("denied slash command leaked to the agent")
185      )
186      runner._handle_status_command = AsyncMock(
187          side_effect=AssertionError("denied slash command reached its handler")
188      )
189      runner.hooks.emit_collect = AsyncMock(
190          return_value=[{"decision": "deny", "message": "Blocked by ACL"}]
191      )
192  
193      monkeypatch.setattr(
194          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
195      )
196  
197      result = await runner._handle_message(_make_event("/status"))
198  
199      assert result == "Blocked by ACL"
200      runner._run_agent.assert_not_called()
201      # The emit_collect call should use the canonical command name.
202      call_args = runner.hooks.emit_collect.await_args
203      assert call_args.args[0] == "command:status"
204  
205  
206  @pytest.mark.asyncio
207  async def test_command_hook_deny_without_message_uses_default(monkeypatch):
208      """A deny decision with no message falls back to a generic blocked string."""
209      import gateway.run as gateway_run
210  
211      runner = _make_runner()
212      runner._handle_status_command = AsyncMock(
213          side_effect=AssertionError("denied slash command reached its handler")
214      )
215      runner.hooks.emit_collect = AsyncMock(return_value=[{"decision": "deny"}])
216  
217      monkeypatch.setattr(
218          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
219      )
220  
221      result = await runner._handle_message(_make_event("/status"))
222  
223      assert result is not None
224      assert "blocked" in result.lower()
225  
226  
227  @pytest.mark.asyncio
228  async def test_command_hook_can_mark_command_as_handled(monkeypatch):
229      """A handled decision short-circuits dispatch cleanly with a custom reply."""
230      import gateway.run as gateway_run
231  
232      runner = _make_runner()
233      runner._handle_status_command = AsyncMock(
234          side_effect=AssertionError("handled slash command reached its handler")
235      )
236      runner.hooks.emit_collect = AsyncMock(
237          return_value=[{"decision": "handled", "message": "Already handled upstream"}]
238      )
239  
240      monkeypatch.setattr(
241          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
242      )
243  
244      result = await runner._handle_message(_make_event("/status"))
245  
246      assert result == "Already handled upstream"
247  
248  
249  @pytest.mark.asyncio
250  async def test_command_hook_allow_decision_is_passthrough(monkeypatch):
251      """A handler returning {"decision": "allow"} must NOT prevent normal dispatch."""
252      import gateway.run as gateway_run
253  
254      runner = _make_runner()
255      runner._handle_status_command = AsyncMock(return_value="status: ok")
256      runner.hooks.emit_collect = AsyncMock(
257          return_value=[{"decision": "allow"}]
258      )
259  
260      monkeypatch.setattr(
261          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
262      )
263  
264      result = await runner._handle_message(_make_event("/status"))
265  
266      assert result == "status: ok"
267      runner._handle_status_command.assert_awaited_once()
268  
269  
270  @pytest.mark.asyncio
271  async def test_command_hook_non_dict_return_values_ignored(monkeypatch):
272      """Hook return values that aren't dicts must not break dispatch."""
273      import gateway.run as gateway_run
274  
275      runner = _make_runner()
276      runner._handle_status_command = AsyncMock(return_value="status: ok")
277      runner.hooks.emit_collect = AsyncMock(
278          return_value=["some string", 42, None, {}]
279      )
280  
281      monkeypatch.setattr(
282          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
283      )
284  
285      result = await runner._handle_message(_make_event("/status"))
286  
287      assert result == "status: ok"
288  
289  
290  @pytest.mark.asyncio
291  async def test_command_hook_fires_for_plugin_registered_command(monkeypatch):
292      """Plugin-registered slash commands should also trigger command:<name> hooks."""
293      import gateway.run as gateway_run
294  
295      runner = _make_runner()
296      runner._run_agent = AsyncMock(
297          side_effect=AssertionError("plugin command leaked to the agent")
298      )
299      runner.hooks.emit_collect = AsyncMock(
300          return_value=[{"decision": "handled", "message": "intercepted"}]
301      )
302  
303      monkeypatch.setattr(
304          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
305      )
306      # Stub plugin command lookup so is_gateway_known_command() recognizes /metricas.
307      from hermes_cli import plugins as _plugins_mod
308  
309      monkeypatch.setattr(
310          _plugins_mod,
311          "get_plugin_commands",
312          lambda: {"metricas": {"description": "Metrics", "args_hint": "dias:7"}},
313      )
314  
315      result = await runner._handle_message(_make_event("/metricas dias:7"))
316  
317      assert result == "intercepted"
318      # Hook event name uses the plugin command as canonical.
319      call_args = runner.hooks.emit_collect.await_args
320      assert call_args.args[0] == "command:metricas"
321      # Args are passed through in both "args" and "raw_args" keys.
322      ctx = call_args.args[1]
323      assert ctx["raw_args"] == "dias:7"
324  
325  
326  @pytest.mark.asyncio
327  async def test_command_hook_rewrite_routes_to_plugin(monkeypatch):
328      """A rewrite decision should re-resolve the command and route to the new one."""
329      import gateway.run as gateway_run
330  
331      runner = _make_runner()
332      runner._run_agent = AsyncMock(
333          side_effect=AssertionError("rewritten command leaked to the agent")
334      )
335  
336      call_log = []
337  
338      async def _emit_collect(event_type, ctx):
339          call_log.append(event_type)
340          if event_type == "command:status":
341              return [
342                  {
343                      "decision": "rewrite",
344                      "command_name": "metricas",
345                      "raw_args": "dias:7",
346                  }
347              ]
348          return []
349  
350      runner.hooks.emit_collect = AsyncMock(side_effect=_emit_collect)
351  
352      monkeypatch.setattr(
353          gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}
354      )
355      from hermes_cli import plugins as _plugins_mod
356  
357      monkeypatch.setattr(
358          _plugins_mod,
359          "get_plugin_commands",
360          lambda: {"metricas": {"description": "Metrics", "args_hint": "dias:7"}},
361      )
362      monkeypatch.setattr(
363          _plugins_mod,
364          "get_plugin_command_handler",
365          lambda name: (lambda args: f"metrics {args}") if name == "metricas" else None,
366      )
367  
368      result = await runner._handle_message(_make_event("/status"))
369  
370      assert result == "metrics dias:7"
371      # First emit_collect fires on the original command; after rewrite the
372      # dispatcher does NOT re-fire for the new command (one decision per turn).
373      assert call_log == ["command:status"]