/ tests / gateway / test_status_command.py
test_status_command.py
  1  """Tests for gateway /status behavior and token persistence."""
  2  
  3  from datetime import datetime
  4  import time
  5  from types import SimpleNamespace
  6  from unittest.mock import AsyncMock, MagicMock
  7  
  8  import pytest
  9  
 10  from gateway.config import GatewayConfig, Platform, PlatformConfig
 11  from gateway.platforms.base import MessageEvent
 12  from gateway.session import SessionEntry, SessionSource, build_session_key
 13  
 14  
 15  def _make_source(platform: Platform = Platform.TELEGRAM) -> SessionSource:
 16      return SessionSource(
 17          platform=platform,
 18          user_id="u1",
 19          chat_id="c1",
 20          user_name="tester",
 21          chat_type="dm",
 22      )
 23  
 24  
 25  def _make_event(text: str, *, platform: Platform = Platform.TELEGRAM) -> MessageEvent:
 26      return MessageEvent(
 27          text=text,
 28          source=_make_source(platform),
 29          message_id="m1",
 30      )
 31  
 32  
 33  def _make_runner(session_entry: SessionEntry, *, platform: Platform = Platform.TELEGRAM):
 34      from gateway.run import GatewayRunner
 35  
 36      runner = object.__new__(GatewayRunner)
 37      runner.config = GatewayConfig(
 38          platforms={platform: PlatformConfig(enabled=True, token="***")}
 39      )
 40      adapter = MagicMock()
 41      adapter.send = AsyncMock()
 42      runner.adapters = {platform: adapter}
 43      runner._voice_mode = {}
 44      runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False)
 45      runner.session_store = MagicMock()
 46      runner.session_store.get_or_create_session.return_value = session_entry
 47      runner.session_store.load_transcript.return_value = []
 48      runner.session_store.has_any_sessions.return_value = True
 49      runner.session_store.append_to_transcript = MagicMock()
 50      runner.session_store.rewrite_transcript = MagicMock()
 51      runner.session_store.update_session = MagicMock()
 52      runner._running_agents = {}
 53      runner._session_run_generation = {}
 54      runner._pending_messages = {}
 55      runner._pending_approvals = {}
 56      runner._session_db = MagicMock()
 57      runner._session_db.get_session_title.return_value = None
 58      # Default: no DB row → /status reports 0 tokens.  Tests that exercise
 59      # the populated path override this.
 60      runner._session_db.get_session.return_value = None
 61      runner._reasoning_config = None
 62      runner._provider_routing = {}
 63      runner._fallback_model = None
 64      runner._show_reasoning = False
 65      runner._is_user_authorized = lambda _source: True
 66      runner._set_session_env = lambda _context: None
 67      runner._should_send_voice_reply = lambda *_args, **_kwargs: False
 68      runner._send_voice_reply = AsyncMock()
 69      runner._capture_gateway_honcho_if_configured = lambda *args, **kwargs: None
 70      runner._emit_gateway_run_progress = AsyncMock()
 71      return runner
 72  
 73  
 74  @pytest.mark.asyncio
 75  async def test_status_command_reports_running_agent_without_interrupt(monkeypatch):
 76      session_entry = SessionEntry(
 77          session_key=build_session_key(_make_source()),
 78          session_id="sess-1",
 79          created_at=datetime.now(),
 80          updated_at=datetime.now(),
 81          platform=Platform.TELEGRAM,
 82          chat_type="dm",
 83          total_tokens=321,
 84      )
 85      runner = _make_runner(session_entry)
 86      # Token total comes from the SQLite SessionDB, not SessionEntry.
 87      runner._session_db.get_session.return_value = {
 88          "input_tokens": 200,
 89          "output_tokens": 121,
 90          "cache_read_tokens": 0,
 91          "cache_write_tokens": 0,
 92          "reasoning_tokens": 0,
 93      }
 94      running_agent = MagicMock()
 95      runner._running_agents[build_session_key(_make_source())] = running_agent
 96  
 97      result = await runner._handle_message(_make_event("/status"))
 98  
 99      assert "**Session ID:** `sess-1`" in result
100      assert "**Tokens:** 321" in result
101      assert "**Agent Running:** Yes ⚡" in result
102      assert "**Title:**" not in result
103      running_agent.interrupt.assert_not_called()
104      assert runner._pending_messages == {}
105  
106  
107  @pytest.mark.asyncio
108  async def test_status_command_includes_session_title_when_present():
109      session_entry = SessionEntry(
110          session_key=build_session_key(_make_source()),
111          session_id="sess-1",
112          created_at=datetime.now(),
113          updated_at=datetime.now(),
114          platform=Platform.TELEGRAM,
115          chat_type="dm",
116          total_tokens=321,
117      )
118      runner = _make_runner(session_entry)
119      runner._session_db.get_session_title.return_value = "My titled session"
120  
121      result = await runner._handle_message(_make_event("/status"))
122  
123      assert "**Session ID:** `sess-1`" in result
124      assert "**Title:** My titled session" in result
125  
126  
127  @pytest.mark.asyncio
128  async def test_status_command_reads_token_totals_from_session_db():
129      """Regression test for #17158: /status must source token totals from the
130      SQLite SessionDB (where run_agent.py persists them) and sum all component
131      counts, not from SessionEntry (which the agent never writes)."""
132      session_entry = SessionEntry(
133          session_key=build_session_key(_make_source()),
134          session_id="sess-1",
135          created_at=datetime.now(),
136          updated_at=datetime.now(),
137          platform=Platform.TELEGRAM,
138          chat_type="dm",
139          total_tokens=0,  # SessionEntry never gets written to — always 0.
140      )
141      runner = _make_runner(session_entry)
142      runner._session_db.get_session.return_value = {
143          "input_tokens": 1000,
144          "output_tokens": 250,
145          "cache_read_tokens": 500,
146          "cache_write_tokens": 100,
147          "reasoning_tokens": 50,
148      }
149  
150      result = await runner._handle_message(_make_event("/status"))
151  
152      # 1000 + 250 + 500 + 100 + 50 = 1,900
153      assert "**Tokens:** 1,900" in result
154  
155  
156  @pytest.mark.asyncio
157  async def test_status_command_tokens_zero_when_session_db_row_missing():
158      """When the SessionDB has no row for the current session yet (fresh
159      session, no agent calls), /status reports 0 without raising."""
160      session_entry = SessionEntry(
161          session_key=build_session_key(_make_source()),
162          session_id="sess-1",
163          created_at=datetime.now(),
164          updated_at=datetime.now(),
165          platform=Platform.TELEGRAM,
166          chat_type="dm",
167          total_tokens=999,  # This should be ignored.
168      )
169      runner = _make_runner(session_entry)
170      runner._session_db.get_session.return_value = None
171  
172      result = await runner._handle_message(_make_event("/status"))
173  
174      assert "**Tokens:** 0" in result
175  
176  
177  @pytest.mark.asyncio
178  async def test_agents_command_reports_active_agents_and_processes(monkeypatch):
179      session_key = build_session_key(_make_source())
180      session_entry = SessionEntry(
181          session_key=session_key,
182          session_id="sess-1",
183          created_at=datetime.now(),
184          updated_at=datetime.now(),
185          platform=Platform.TELEGRAM,
186          chat_type="dm",
187          total_tokens=0,
188      )
189      runner = _make_runner(session_entry)
190      running_agent = SimpleNamespace(
191          session_id="sess-running",
192          model="openrouter/test-model",
193          interrupt=MagicMock(),
194          get_activity_summary=lambda: {"seconds_since_activity": 0},
195      )
196      runner._running_agents[session_key] = running_agent
197      runner._running_agents_ts = {session_key: time.time() - 8}
198      runner._background_tasks = set()
199  
200      class _FakeRegistry:
201          def list_sessions(self):
202              return [
203                  {
204                      "session_id": "proc-1",
205                      "status": "running",
206                      "uptime_seconds": 17,
207                      "command": "sleep 30",
208                  }
209              ]
210  
211      monkeypatch.setattr("tools.process_registry.process_registry", _FakeRegistry())
212  
213      result = await runner._handle_message(_make_event("/agents"))
214  
215      assert "**Active agents:** 1" in result
216      assert "**Running background processes:** 1" in result
217      assert "proc-1" in result
218      running_agent.interrupt.assert_not_called()
219  
220  
221  @pytest.mark.asyncio
222  async def test_tasks_alias_routes_to_agents_command(monkeypatch):
223      session_entry = SessionEntry(
224          session_key=build_session_key(_make_source()),
225          session_id="sess-1",
226          created_at=datetime.now(),
227          updated_at=datetime.now(),
228          platform=Platform.TELEGRAM,
229          chat_type="dm",
230          total_tokens=0,
231      )
232      runner = _make_runner(session_entry)
233      runner._background_tasks = set()
234  
235      class _FakeRegistry:
236          def list_sessions(self):
237              return []
238  
239      monkeypatch.setattr("tools.process_registry.process_registry", _FakeRegistry())
240  
241      result = await runner._handle_message(_make_event("/tasks"))
242  
243      assert "Active Agents & Tasks" in result
244  
245  
246  @pytest.mark.asyncio
247  async def test_handle_message_persists_agent_token_counts(monkeypatch):
248      import gateway.run as gateway_run
249  
250      session_entry = SessionEntry(
251          session_key=build_session_key(_make_source()),
252          session_id="sess-1",
253          created_at=datetime.now(),
254          updated_at=datetime.now(),
255          platform=Platform.TELEGRAM,
256          chat_type="dm",
257      )
258      runner = _make_runner(session_entry)
259      runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}]
260      runner._run_agent = AsyncMock(
261          return_value={
262              "final_response": "ok",
263              "messages": [],
264              "tools": [],
265              "history_offset": 0,
266              "last_prompt_tokens": 80,
267              "input_tokens": 120,
268              "output_tokens": 45,
269              "model": "openai/test-model",
270          }
271      )
272  
273      monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
274      monkeypatch.setattr(
275          "agent.model_metadata.get_model_context_length",
276          lambda *_args, **_kwargs: 100000,
277      )
278  
279      result = await runner._handle_message(_make_event("hello"))
280  
281      assert result == "ok"
282      runner.session_store.update_session.assert_called_once_with(
283          session_entry.session_key,
284          last_prompt_tokens=80,
285      )
286  
287  
288  @pytest.mark.asyncio
289  async def test_first_run_slack_home_channel_onboarding_uses_parent_command(monkeypatch):
290      import gateway.run as gateway_run
291  
292      session_entry = SessionEntry(
293          session_key=build_session_key(_make_source(Platform.SLACK)),
294          session_id="sess-1",
295          created_at=datetime.now(),
296          updated_at=datetime.now(),
297          platform=Platform.SLACK,
298          chat_type="dm",
299      )
300      runner = _make_runner(session_entry, platform=Platform.SLACK)
301      runner.session_store.load_transcript.return_value = []
302      runner.session_store.has_any_sessions.return_value = False
303      runner._run_agent = AsyncMock(
304          return_value={
305              "final_response": "ok",
306              "messages": [],
307              "tools": [],
308              "history_offset": 0,
309              "last_prompt_tokens": 0,
310              "input_tokens": 0,
311              "output_tokens": 0,
312              "model": "openai/test-model",
313          }
314      )
315  
316      monkeypatch.delenv("SLACK_HOME_CHANNEL", raising=False)
317      monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
318      monkeypatch.setattr(
319          "agent.model_metadata.get_model_context_length",
320          lambda *_args, **_kwargs: 100000,
321      )
322  
323      result = await runner._handle_message(_make_event("hello", platform=Platform.SLACK))
324  
325      assert result == "ok"
326      runner.adapters[Platform.SLACK].send.assert_awaited_once()
327      onboarding = runner.adapters[Platform.SLACK].send.await_args.args[1]
328      assert "/hermes sethome" in onboarding
329      assert "Type /sethome" not in onboarding
330  
331  
332  @pytest.mark.asyncio
333  async def test_first_run_non_slack_home_channel_onboarding_keeps_direct_command(monkeypatch):
334      import gateway.run as gateway_run
335  
336      session_entry = SessionEntry(
337          session_key=build_session_key(_make_source(Platform.TELEGRAM)),
338          session_id="sess-1",
339          created_at=datetime.now(),
340          updated_at=datetime.now(),
341          platform=Platform.TELEGRAM,
342          chat_type="dm",
343      )
344      runner = _make_runner(session_entry, platform=Platform.TELEGRAM)
345      runner.session_store.load_transcript.return_value = []
346      runner.session_store.has_any_sessions.return_value = False
347      runner._run_agent = AsyncMock(
348          return_value={
349              "final_response": "ok",
350              "messages": [],
351              "tools": [],
352              "history_offset": 0,
353              "last_prompt_tokens": 0,
354              "input_tokens": 0,
355              "output_tokens": 0,
356              "model": "openai/test-model",
357          }
358      )
359  
360      monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False)
361      monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
362      monkeypatch.setattr(
363          "agent.model_metadata.get_model_context_length",
364          lambda *_args, **_kwargs: 100000,
365      )
366  
367      result = await runner._handle_message(_make_event("hello", platform=Platform.TELEGRAM))
368  
369      assert result == "ok"
370      runner.adapters[Platform.TELEGRAM].send.assert_awaited_once()
371      onboarding = runner.adapters[Platform.TELEGRAM].send.await_args.args[1]
372      assert "Type /sethome" in onboarding
373  
374  
375  @pytest.mark.asyncio
376  async def test_handle_message_discards_stale_result_after_session_invalidation(monkeypatch):
377      import gateway.run as gateway_run
378  
379      session_entry = SessionEntry(
380          session_key=build_session_key(_make_source()),
381          session_id="sess-1",
382          created_at=datetime.now(),
383          updated_at=datetime.now(),
384          platform=Platform.TELEGRAM,
385          chat_type="dm",
386      )
387      runner = _make_runner(session_entry)
388      runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}]
389      session_key = session_entry.session_key
390      runner.adapters[Platform.TELEGRAM]._post_delivery_callbacks = {session_key: object()}
391  
392      async def _stale_result(**kwargs):
393          runner._invalidate_session_run_generation(kwargs["session_key"], reason="test_stale_result")
394          return {
395              "final_response": "late reply",
396              "messages": [],
397              "tools": [],
398              "history_offset": 0,
399              "last_prompt_tokens": 80,
400              "input_tokens": 120,
401              "output_tokens": 45,
402              "model": "openai/test-model",
403          }
404  
405      runner._run_agent = AsyncMock(side_effect=_stale_result)
406  
407      monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
408      monkeypatch.setattr(
409          "agent.model_metadata.get_model_context_length",
410          lambda *_args, **_kwargs: 100000,
411      )
412  
413      result = await runner._handle_message(_make_event("hello"))
414  
415      assert result is None
416      runner.session_store.append_to_transcript.assert_not_called()
417      runner.session_store.update_session.assert_not_called()
418      assert session_key not in runner.adapters[Platform.TELEGRAM]._post_delivery_callbacks
419  
420  
421  @pytest.mark.asyncio
422  async def test_handle_message_stale_result_keeps_newer_generation_callback(monkeypatch):
423      import gateway.run as gateway_run
424  
425      class _Adapter:
426          def __init__(self):
427              self._post_delivery_callbacks = {}
428  
429          async def send(self, *args, **kwargs):
430              return None
431  
432          def pop_post_delivery_callback(self, session_key, *, generation=None):
433              entry = self._post_delivery_callbacks.get(session_key)
434              if entry is None:
435                  return None
436              if isinstance(entry, tuple):
437                  entry_generation, callback = entry
438                  if generation is not None and entry_generation != generation:
439                      return None
440                  self._post_delivery_callbacks.pop(session_key, None)
441                  return callback
442              if generation is not None:
443                  return None
444              return self._post_delivery_callbacks.pop(session_key, None)
445  
446      session_entry = SessionEntry(
447          session_key=build_session_key(_make_source()),
448          session_id="sess-1",
449          created_at=datetime.now(),
450          updated_at=datetime.now(),
451          platform=Platform.TELEGRAM,
452          chat_type="dm",
453      )
454      runner = _make_runner(session_entry)
455      runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}]
456      session_key = session_entry.session_key
457      adapter = _Adapter()
458      runner.adapters[Platform.TELEGRAM] = adapter
459  
460      async def _stale_result(**kwargs):
461          # Simulate a newer run claiming the callback slot before the stale run unwinds.
462          runner._session_run_generation[session_key] = 2
463          adapter._post_delivery_callbacks[session_key] = (2, lambda: None)
464          return {
465              "final_response": "late reply",
466              "messages": [],
467              "tools": [],
468              "history_offset": 0,
469              "last_prompt_tokens": 80,
470              "input_tokens": 120,
471              "output_tokens": 45,
472              "model": "openai/test-model",
473          }
474  
475      runner._run_agent = AsyncMock(side_effect=_stale_result)
476  
477      monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
478      monkeypatch.setattr(
479          "agent.model_metadata.get_model_context_length",
480          lambda *_args, **_kwargs: 100000,
481      )
482  
483      result = await runner._handle_message(_make_event("hello"))
484  
485      assert result is None
486      assert session_key in adapter._post_delivery_callbacks
487      assert adapter._post_delivery_callbacks[session_key][0] == 2
488  
489  
490  
491  @pytest.mark.asyncio
492  async def test_status_command_bypasses_active_session_guard():
493      """When an agent is running, /status must be dispatched immediately via
494      base.handle_message — not queued or treated as an interrupt (#5046)."""
495      import asyncio
496      from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType
497      from gateway.session import build_session_key
498      from gateway.config import Platform, PlatformConfig, GatewayConfig
499  
500      source = _make_source()
501      session_key = build_session_key(source)
502  
503      handler_called_with = []
504  
505      async def fake_handler(event):
506          handler_called_with.append(event)
507          return "📊 **Hermes Gateway Status**\n**Agent Running:** Yes ⚡"
508  
509      # Concrete subclass to avoid abstract method errors
510      class _ConcreteAdapter(BasePlatformAdapter):
511          platform = Platform.TELEGRAM
512  
513          async def connect(self): pass
514          async def disconnect(self): pass
515          async def send(self, chat_id, content, **kwargs): pass
516          async def get_chat_info(self, chat_id): return {}
517  
518      platform_config = PlatformConfig(enabled=True, token="***")
519      adapter = _ConcreteAdapter(platform_config, Platform.TELEGRAM)
520      adapter.set_message_handler(fake_handler)
521  
522      sent = []
523  
524      async def fake_send_with_retry(chat_id, content, reply_to=None, metadata=None):
525          sent.append(content)
526  
527      adapter._send_with_retry = fake_send_with_retry
528  
529      # Simulate an active session
530      interrupt_event = asyncio.Event()
531      adapter._active_sessions[session_key] = interrupt_event
532  
533      event = MessageEvent(
534          text="/status",
535          source=source,
536          message_id="m1",
537          message_type=MessageType.COMMAND,
538      )
539      await adapter.handle_message(event)
540  
541      assert handler_called_with, "/status handler was never called (event was queued or dropped)"
542      assert sent, "/status response was never sent"
543      assert "Agent Running" in sent[0]
544      assert not interrupt_event.is_set(), "/status incorrectly triggered an agent interrupt"
545      assert session_key not in adapter._pending_messages, "/status was incorrectly queued"
546  
547  
548  @pytest.mark.asyncio
549  async def test_profile_command_reports_custom_root_profile(monkeypatch, tmp_path):
550      """Gateway /profile detects custom-root profiles (not under ~/.hermes)."""
551      from pathlib import Path
552  
553      session_entry = SessionEntry(
554          session_key=build_session_key(_make_source()),
555          session_id="sess-1",
556          created_at=datetime.now(),
557          updated_at=datetime.now(),
558          platform=Platform.TELEGRAM,
559          chat_type="dm",
560      )
561      runner = _make_runner(session_entry)
562      profile_home = tmp_path / "profiles" / "coder"
563  
564      monkeypatch.setenv("HERMES_HOME", str(profile_home))
565      monkeypatch.setattr(Path, "home", lambda: tmp_path / "unrelated-home")
566  
567      result = await runner._handle_profile_command(_make_event("/profile"))
568  
569      assert "**Profile:** `coder`" in result
570      assert f"**Home:** `{profile_home}`" in result
571  
572  
573  @pytest.mark.asyncio
574  async def test_post_delivery_callback_generation_snapshot_happens_after_bind():
575      """Regression: the callback_generation snapshot in _process_message_background
576      must happen AFTER the handler runs, not before.
577  
578      _hermes_run_generation is set on the interrupt event by
579      GatewayRunner._bind_adapter_run_generation during _handle_message_with_agent.
580      The earlier snapshot-at-task-start always captured None, which bypassed the
581      generation-ownership check in pop_post_delivery_callback and let stale runs
582      fire a fresher run's callbacks.
583      """
584      import asyncio
585      from gateway.platforms.base import BasePlatformAdapter
586  
587      source = _make_source()
588      session_key = build_session_key(source)
589      fired = []
590  
591      class _ConcreteAdapter(BasePlatformAdapter):
592          platform = Platform.TELEGRAM
593  
594          async def connect(self): pass
595          async def disconnect(self): pass
596          async def send(self, chat_id, content, **kwargs): pass
597          async def get_chat_info(self, chat_id): return {}
598  
599      adapter = _ConcreteAdapter(
600          PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM
601      )
602  
603      async def fake_handler(event):
604          # Simulate what _bind_adapter_run_generation does mid-run.
605          interrupt_event = adapter._active_sessions.get(session_key)
606          setattr(interrupt_event, "_hermes_run_generation", 1)
607          # Stale run registers its callback at generation=1.
608          adapter.register_post_delivery_callback(
609              session_key,
610              lambda: fired.append("older"),
611              generation=1,
612          )
613          # A fresher run overwrites with generation=2 (different dict entry).
614          adapter.register_post_delivery_callback(
615              session_key,
616              lambda: fired.append("newer"),
617              generation=2,
618          )
619          return None
620  
621      adapter.set_message_handler(fake_handler)
622      event = MessageEvent(text="hello", source=source, message_id="m1")
623  
624      await adapter.handle_message(event)
625      tasks = list(adapter._background_tasks)
626      assert tasks, "expected background task to be created"
627      await asyncio.gather(*tasks)
628  
629      # The stale run (generation=1) must NOT fire the fresher run's callback
630      # (generation=2). With the pre-fix code, callback_generation was snapshotted
631      # as None before the handler ran, bypassing the ownership check and firing
632      # "newer" anyway.
633      assert fired == []
634      assert session_key in adapter._post_delivery_callbacks
635      assert adapter._post_delivery_callbacks[session_key][0] == 2