/ tests / gateway / test_command_bypass_active_session.py
test_command_bypass_active_session.py
  1  """Regression tests: slash commands must bypass the base adapter's active-session guard.
  2  
  3  When an agent is running, the base adapter's Level 1 guard in
  4  handle_message() intercepts all incoming messages and queues them as
  5  pending.  Certain commands (/stop, /new, /reset, /approve, /deny,
  6  /status) must bypass this guard and be dispatched directly to the gateway
  7  runner — otherwise they are queued as user text and either:
  8    - leak into the conversation as agent input (/stop, /new), or
  9    - deadlock (/approve, /deny — agent blocks on Event.wait)
 10  
 11  These tests verify that the bypass works at the adapter level and that
 12  the safety net in _run_agent discards leaked command text.
 13  """
 14  
 15  import asyncio
 16  from unittest.mock import AsyncMock, MagicMock
 17  
 18  import pytest
 19  
 20  from gateway.config import Platform, PlatformConfig
 21  from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType
 22  from gateway.session import SessionSource, build_session_key
 23  
 24  
 25  # ---------------------------------------------------------------------------
 26  # Helpers
 27  # ---------------------------------------------------------------------------
 28  
 29  
 30  class _StubAdapter(BasePlatformAdapter):
 31      """Concrete adapter with abstract methods stubbed out."""
 32  
 33      async def connect(self):
 34          pass
 35  
 36      async def disconnect(self):
 37          pass
 38  
 39      async def send(self, chat_id, text, **kwargs):
 40          pass
 41  
 42      async def get_chat_info(self, chat_id):
 43          return {}
 44  
 45  
 46  def _make_adapter():
 47      """Create a minimal adapter for testing the active-session guard."""
 48      config = PlatformConfig(enabled=True, token="test-token")
 49      adapter = _StubAdapter(config, Platform.TELEGRAM)
 50      adapter.sent_responses = []
 51  
 52      async def _mock_handler(event):
 53          cmd = event.get_command()
 54          return f"handled:{cmd}" if cmd else f"handled:text:{event.text}"
 55  
 56      adapter._message_handler = _mock_handler
 57  
 58      async def _mock_send_retry(chat_id, content, **kwargs):
 59          adapter.sent_responses.append(content)
 60  
 61      adapter._send_with_retry = _mock_send_retry
 62      return adapter
 63  
 64  
 65  def _make_event(text="/stop", chat_id="12345"):
 66      source = SessionSource(
 67          platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"
 68      )
 69      return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
 70  
 71  
 72  def _session_key(chat_id="12345"):
 73      source = SessionSource(
 74          platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"
 75      )
 76      return build_session_key(source)
 77  
 78  
 79  # ---------------------------------------------------------------------------
 80  # Tests: commands bypass Level 1 when session is active
 81  # ---------------------------------------------------------------------------
 82  
 83  
 84  class TestCommandBypassActiveSession:
 85      """Commands that must bypass the active-session guard."""
 86  
 87      @pytest.mark.asyncio
 88      async def test_stop_bypasses_guard(self):
 89          """/stop must be dispatched directly, not queued."""
 90          adapter = _make_adapter()
 91          sk = _session_key()
 92          adapter._active_sessions[sk] = asyncio.Event()
 93  
 94          await adapter.handle_message(_make_event("/stop"))
 95  
 96          assert sk not in adapter._pending_messages, (
 97              "/stop was queued as a pending message instead of being dispatched"
 98          )
 99          assert any("handled:stop" in r for r in adapter.sent_responses), (
100              "/stop response was not sent back to the user"
101          )
102  
103      @pytest.mark.asyncio
104      async def test_new_bypasses_guard(self):
105          """/new must be dispatched directly, not queued."""
106          adapter = _make_adapter()
107          sk = _session_key()
108          adapter._active_sessions[sk] = asyncio.Event()
109  
110          await adapter.handle_message(_make_event("/new"))
111  
112          assert sk not in adapter._pending_messages
113          assert any("handled:new" in r for r in adapter.sent_responses)
114  
115      @pytest.mark.asyncio
116      async def test_reset_bypasses_guard(self):
117          """/reset (alias for /new) must be dispatched directly."""
118          adapter = _make_adapter()
119          sk = _session_key()
120          adapter._active_sessions[sk] = asyncio.Event()
121  
122          await adapter.handle_message(_make_event("/reset"))
123  
124          assert sk not in adapter._pending_messages
125          assert any("handled:reset" in r for r in adapter.sent_responses)
126  
127      @pytest.mark.asyncio
128      async def test_approve_bypasses_guard(self):
129          """/approve must bypass (deadlock prevention)."""
130          adapter = _make_adapter()
131          sk = _session_key()
132          adapter._active_sessions[sk] = asyncio.Event()
133  
134          await adapter.handle_message(_make_event("/approve"))
135  
136          assert sk not in adapter._pending_messages
137          assert any("handled:approve" in r for r in adapter.sent_responses)
138  
139      @pytest.mark.asyncio
140      async def test_deny_bypasses_guard(self):
141          """/deny must bypass (deadlock prevention)."""
142          adapter = _make_adapter()
143          sk = _session_key()
144          adapter._active_sessions[sk] = asyncio.Event()
145  
146          await adapter.handle_message(_make_event("/deny"))
147  
148          assert sk not in adapter._pending_messages
149          assert any("handled:deny" in r for r in adapter.sent_responses)
150  
151      @pytest.mark.asyncio
152      async def test_status_bypasses_guard(self):
153          """/status must bypass so it returns a system response."""
154          adapter = _make_adapter()
155          sk = _session_key()
156          adapter._active_sessions[sk] = asyncio.Event()
157  
158          await adapter.handle_message(_make_event("/status"))
159  
160          assert sk not in adapter._pending_messages
161          assert any("handled:status" in r for r in adapter.sent_responses)
162  
163      @pytest.mark.asyncio
164      async def test_agents_bypasses_guard(self):
165          """/agents must bypass so active-task queries don't interrupt runs."""
166          adapter = _make_adapter()
167          sk = _session_key()
168          adapter._active_sessions[sk] = asyncio.Event()
169  
170          await adapter.handle_message(_make_event("/agents"))
171  
172          assert sk not in adapter._pending_messages
173          assert any("handled:agents" in r for r in adapter.sent_responses)
174  
175      @pytest.mark.asyncio
176      async def test_tasks_alias_bypasses_guard(self):
177          """/tasks alias must bypass active-session guard too."""
178          adapter = _make_adapter()
179          sk = _session_key()
180          adapter._active_sessions[sk] = asyncio.Event()
181  
182          await adapter.handle_message(_make_event("/tasks"))
183  
184          assert sk not in adapter._pending_messages
185          assert any("handled:tasks" in r for r in adapter.sent_responses)
186  
187      @pytest.mark.asyncio
188      async def test_background_bypasses_guard(self):
189          """/background must bypass so it spawns a parallel task, not an interrupt."""
190          adapter = _make_adapter()
191          sk = _session_key()
192          adapter._active_sessions[sk] = asyncio.Event()
193  
194          await adapter.handle_message(_make_event("/background summarize HN"))
195  
196          assert sk not in adapter._pending_messages, (
197              "/background was queued as a pending message instead of being dispatched"
198          )
199          assert any("handled:background" in r for r in adapter.sent_responses), (
200              "/background response was not sent back to the user"
201          )
202  
203      @pytest.mark.asyncio
204      async def test_steer_bypasses_guard(self):
205          """/steer must bypass the Level-1 active-session guard so it reaches
206          the gateway runner's /steer handler and injects into the running
207          agent instead of being queued as user text for the next turn.
208          """
209          adapter = _make_adapter()
210          sk = _session_key()
211          adapter._active_sessions[sk] = asyncio.Event()
212  
213          await adapter.handle_message(_make_event("/steer also check auth.log"))
214  
215          assert sk not in adapter._pending_messages, (
216              "/steer was queued as a pending message instead of being dispatched"
217          )
218          assert any("handled:steer" in r for r in adapter.sent_responses), (
219              "/steer response was not sent back to the user"
220          )
221  
222      @pytest.mark.asyncio
223      async def test_help_bypasses_guard(self):
224          """/help must bypass so it is not silently dropped as pending slash text."""
225          adapter = _make_adapter()
226          sk = _session_key()
227          adapter._active_sessions[sk] = asyncio.Event()
228  
229          await adapter.handle_message(_make_event("/help"))
230  
231          assert sk not in adapter._pending_messages, (
232              "/help was queued as a pending message instead of being dispatched"
233          )
234          assert any("handled:help" in r for r in adapter.sent_responses), (
235              "/help response was not sent back to the user"
236          )
237  
238      @pytest.mark.asyncio
239      async def test_update_bypasses_guard(self):
240          """/update must bypass so it is not discarded by the pending-command safety net."""
241          adapter = _make_adapter()
242          sk = _session_key()
243          adapter._active_sessions[sk] = asyncio.Event()
244  
245          await adapter.handle_message(_make_event("/update"))
246  
247          assert sk not in adapter._pending_messages, (
248              "/update was queued as a pending message instead of being dispatched"
249          )
250          assert any("handled:update" in r for r in adapter.sent_responses), (
251              "/update response was not sent back to the user"
252          )
253  
254      @pytest.mark.asyncio
255      async def test_queue_bypasses_guard(self):
256          """/queue must bypass so it can queue without interrupting."""
257          adapter = _make_adapter()
258          sk = _session_key()
259          adapter._active_sessions[sk] = asyncio.Event()
260  
261          await adapter.handle_message(_make_event("/queue follow up"))
262  
263          assert sk not in adapter._pending_messages, (
264              "/queue was queued as a pending message instead of being dispatched"
265          )
266          assert any("handled:queue" in r for r in adapter.sent_responses), (
267              "/queue response was not sent back to the user"
268          )
269  
270  
271  # ---------------------------------------------------------------------------
272  # Tests: non-bypass-set commands (no dedicated Level-2 handler) also bypass
273  # instead of interrupting + being discarded.  Regression for the Discord
274  # ghost-slash-command bug where /model, /reasoning, /voice, /insights, /title,
275  # /resume, /retry, /undo, /compress, /usage, /reload-mcp,
276  # /sethome, /reset silently interrupted the running agent.
277  # ---------------------------------------------------------------------------
278  
279  
280  class TestAllResolvableCommandsBypassGuard:
281      """Every recognized slash command must bypass the Level-1 active-session
282      guard. Without this, commands the user fires mid-run interrupt the agent
283      AND get silently discarded by the slash-command safety net (zero-char
284      response)."""
285  
286      @pytest.mark.parametrize(
287          "command_text,canonical",
288          [
289              ("/model claude-sonnet-4", "model"),
290              ("/model", "model"),
291              ("/reasoning high", "reasoning"),
292              ("/personality default", "personality"),
293              ("/voice on", "voice"),
294              ("/insights 7", "insights"),
295              ("/title my session", "title"),
296              ("/resume yesterday", "resume"),
297              ("/retry", "retry"),
298              ("/undo", "undo"),
299              ("/compress", "compress"),
300              ("/usage", "usage"),
301              ("/reload-mcp", "reload-mcp"),
302              ("/sethome", "sethome"),
303          ],
304      )
305      @pytest.mark.asyncio
306      async def test_command_bypasses_guard(self, command_text, canonical):
307          """Any resolvable slash command bypasses instead of being queued."""
308          adapter = _make_adapter()
309          sk = _session_key()
310          adapter._active_sessions[sk] = asyncio.Event()
311  
312          await adapter.handle_message(_make_event(command_text))
313  
314          assert sk not in adapter._pending_messages, (
315              f"{command_text} was queued as pending — it should bypass the guard"
316          )
317          assert len(adapter.sent_responses) > 0, (
318              f"{command_text} produced no response — it should be dispatched, "
319              "not silently discarded"
320          )
321  
322      def test_should_bypass_returns_true_for_every_registered_command(self):
323          """Spot-check: the commands previously-broken on Discord all bypass."""
324          from hermes_cli.commands import should_bypass_active_session
325  
326          for cmd in (
327              "model", "reasoning", "personality", "voice", "insights", "title",
328              "resume", "retry", "undo", "compress", "usage",
329              "reload-mcp", "sethome", "reset",
330          ):
331              assert should_bypass_active_session(cmd) is True, (
332                  f"/{cmd} must bypass the active-session guard"
333              )
334  
335      def test_should_bypass_returns_false_for_unknown(self):
336          """Unknown words don't bypass — they get queued as user text."""
337          from hermes_cli.commands import should_bypass_active_session
338  
339          assert should_bypass_active_session("foobar") is False
340          assert should_bypass_active_session(None) is False
341          assert should_bypass_active_session("") is False
342          # A file path split on whitespace: '/path/to/file.py' -> 'path/to/file.py'
343          assert should_bypass_active_session("path/to/file.py") is False
344  
345  
346  # ---------------------------------------------------------------------------
347  # Tests: non-bypass messages still get queued
348  # ---------------------------------------------------------------------------
349  
350  
351  class TestNonBypassStillQueued:
352      """Regular messages and unknown commands must be queued, not dispatched."""
353  
354      @pytest.mark.asyncio
355      async def test_regular_text_queued(self):
356          """Plain text while agent is running must be queued as pending."""
357          adapter = _make_adapter()
358          sk = _session_key()
359          adapter._active_sessions[sk] = asyncio.Event()
360  
361          await adapter.handle_message(_make_event("hello world"))
362  
363          assert sk in adapter._pending_messages, (
364              "Regular text was not queued — it should be pending"
365          )
366          assert len(adapter.sent_responses) == 0, (
367              "Regular text should not produce a direct response"
368          )
369  
370      @pytest.mark.asyncio
371      async def test_unknown_command_queued(self):
372          """Unknown /commands must be queued, not dispatched."""
373          adapter = _make_adapter()
374          sk = _session_key()
375          adapter._active_sessions[sk] = asyncio.Event()
376  
377          await adapter.handle_message(_make_event("/foobar"))
378  
379          assert sk in adapter._pending_messages
380          assert len(adapter.sent_responses) == 0
381  
382      @pytest.mark.asyncio
383      async def test_file_path_not_treated_as_command(self):
384          """A message like '/path/to/file' must not bypass the guard."""
385          adapter = _make_adapter()
386          sk = _session_key()
387          adapter._active_sessions[sk] = asyncio.Event()
388  
389          await adapter.handle_message(_make_event("/path/to/file.py"))
390  
391          assert sk in adapter._pending_messages
392          assert len(adapter.sent_responses) == 0
393  
394  
395  # ---------------------------------------------------------------------------
396  # Tests: no active session — commands go through normally
397  # ---------------------------------------------------------------------------
398  
399  
400  class TestNoActiveSessionNormalDispatch:
401      """When no agent is running, messages spawn a background task normally."""
402  
403      @pytest.mark.asyncio
404      async def test_stop_when_no_session_active(self):
405          """/stop without an active session spawns a background task
406          (the Level 2 handler will return 'No active task')."""
407          adapter = _make_adapter()
408          sk = _session_key()
409  
410          # No active session — _active_sessions is empty
411          assert sk not in adapter._active_sessions
412  
413          await adapter.handle_message(_make_event("/stop"))
414  
415          # Should have gone through the normal path (background task spawned)
416          # and NOT be in _pending_messages (that's the queued-during-active path)
417          assert sk not in adapter._pending_messages
418  
419  
420  # ---------------------------------------------------------------------------
421  # Tests: safety net in _run_agent discards command text from pending queue
422  # ---------------------------------------------------------------------------
423  
424  
425  class TestPendingCommandSafetyNet:
426      """The safety net in gateway/run.py _run_agent must discard command text
427      that leaks into the pending queue via interrupt_message fallback."""
428  
429      def test_stop_command_detected(self):
430          """resolve_command must recognize /stop so the safety net can
431          discard it."""
432          from hermes_cli.commands import resolve_command
433  
434          assert resolve_command("stop") is not None
435          assert resolve_command("stop").name == "stop"
436  
437      def test_new_command_detected(self):
438          from hermes_cli.commands import resolve_command
439  
440          assert resolve_command("new") is not None
441          assert resolve_command("new").name == "new"
442  
443      def test_reset_alias_detected(self):
444          from hermes_cli.commands import resolve_command
445  
446          assert resolve_command("reset") is not None
447          assert resolve_command("reset").name == "new"  # alias
448  
449      def test_unknown_command_not_detected(self):
450          from hermes_cli.commands import resolve_command
451  
452          assert resolve_command("foobar") is None
453  
454      def test_file_path_not_detected_as_command(self):
455          """'/path/to/file' should not resolve as a command."""
456          from hermes_cli.commands import resolve_command
457  
458          # The safety net splits on whitespace and takes the first word
459          # after stripping '/'.  For '/path/to/file', that's 'path/to/file'.
460          assert resolve_command("path/to/file") is None
461  
462  
463  # ---------------------------------------------------------------------------
464  # Tests: bypass with @botname suffix (Telegram-style)
465  # ---------------------------------------------------------------------------
466  
467  
468  class TestBypassWithBotnameSuffix:
469      """Telegram appends @botname to commands. The bypass must still work."""
470  
471      @pytest.mark.asyncio
472      async def test_stop_with_botname(self):
473          """/stop@MyHermesBot must bypass the guard."""
474          adapter = _make_adapter()
475          sk = _session_key()
476          adapter._active_sessions[sk] = asyncio.Event()
477  
478          await adapter.handle_message(_make_event("/stop@MyHermesBot"))
479  
480          assert sk not in adapter._pending_messages, (
481              "/stop@MyHermesBot was queued instead of bypassing"
482          )
483          assert any("handled:stop" in r for r in adapter.sent_responses)
484  
485      @pytest.mark.asyncio
486      async def test_new_with_botname(self):
487          """/new@MyHermesBot must bypass the guard."""
488          adapter = _make_adapter()
489          sk = _session_key()
490          adapter._active_sessions[sk] = asyncio.Event()
491  
492          await adapter.handle_message(_make_event("/new@MyHermesBot"))
493  
494          assert sk not in adapter._pending_messages
495          assert any("handled:new" in r for r in adapter.sent_responses)