test_status_command.py
1 """Tests for gateway /status behavior and token persistence.""" 2 3 from datetime import datetime 4 import time 5 from types import SimpleNamespace 6 from unittest.mock import AsyncMock, MagicMock 7 8 import pytest 9 10 from gateway.config import GatewayConfig, Platform, PlatformConfig 11 from gateway.platforms.base import MessageEvent 12 from gateway.session import SessionEntry, SessionSource, build_session_key 13 14 15 def _make_source(platform: Platform = Platform.TELEGRAM) -> SessionSource: 16 return SessionSource( 17 platform=platform, 18 user_id="u1", 19 chat_id="c1", 20 user_name="tester", 21 chat_type="dm", 22 ) 23 24 25 def _make_event(text: str, *, platform: Platform = Platform.TELEGRAM) -> MessageEvent: 26 return MessageEvent( 27 text=text, 28 source=_make_source(platform), 29 message_id="m1", 30 ) 31 32 33 def _make_runner(session_entry: SessionEntry, *, platform: Platform = Platform.TELEGRAM): 34 from gateway.run import GatewayRunner 35 36 runner = object.__new__(GatewayRunner) 37 runner.config = GatewayConfig( 38 platforms={platform: PlatformConfig(enabled=True, token="***")} 39 ) 40 adapter = MagicMock() 41 adapter.send = AsyncMock() 42 runner.adapters = {platform: adapter} 43 runner._voice_mode = {} 44 runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False) 45 runner.session_store = MagicMock() 46 runner.session_store.get_or_create_session.return_value = session_entry 47 runner.session_store.load_transcript.return_value = [] 48 runner.session_store.has_any_sessions.return_value = True 49 runner.session_store.append_to_transcript = MagicMock() 50 runner.session_store.rewrite_transcript = MagicMock() 51 runner.session_store.update_session = MagicMock() 52 runner._running_agents = {} 53 runner._session_run_generation = {} 54 runner._pending_messages = {} 55 runner._pending_approvals = {} 56 runner._session_db = MagicMock() 57 runner._session_db.get_session_title.return_value = None 58 # Default: no DB row → /status reports 0 tokens. Tests that exercise 59 # the populated path override this. 60 runner._session_db.get_session.return_value = None 61 runner._reasoning_config = None 62 runner._provider_routing = {} 63 runner._fallback_model = None 64 runner._show_reasoning = False 65 runner._is_user_authorized = lambda _source: True 66 runner._set_session_env = lambda _context: None 67 runner._should_send_voice_reply = lambda *_args, **_kwargs: False 68 runner._send_voice_reply = AsyncMock() 69 runner._capture_gateway_honcho_if_configured = lambda *args, **kwargs: None 70 runner._emit_gateway_run_progress = AsyncMock() 71 return runner 72 73 74 @pytest.mark.asyncio 75 async def test_status_command_reports_running_agent_without_interrupt(monkeypatch): 76 session_entry = SessionEntry( 77 session_key=build_session_key(_make_source()), 78 session_id="sess-1", 79 created_at=datetime.now(), 80 updated_at=datetime.now(), 81 platform=Platform.TELEGRAM, 82 chat_type="dm", 83 total_tokens=321, 84 ) 85 runner = _make_runner(session_entry) 86 # Token total comes from the SQLite SessionDB, not SessionEntry. 87 runner._session_db.get_session.return_value = { 88 "input_tokens": 200, 89 "output_tokens": 121, 90 "cache_read_tokens": 0, 91 "cache_write_tokens": 0, 92 "reasoning_tokens": 0, 93 } 94 running_agent = MagicMock() 95 runner._running_agents[build_session_key(_make_source())] = running_agent 96 97 result = await runner._handle_message(_make_event("/status")) 98 99 assert "**Session ID:** `sess-1`" in result 100 assert "**Tokens:** 321" in result 101 assert "**Agent Running:** Yes ⚡" in result 102 assert "**Title:**" not in result 103 running_agent.interrupt.assert_not_called() 104 assert runner._pending_messages == {} 105 106 107 @pytest.mark.asyncio 108 async def test_status_command_includes_session_title_when_present(): 109 session_entry = SessionEntry( 110 session_key=build_session_key(_make_source()), 111 session_id="sess-1", 112 created_at=datetime.now(), 113 updated_at=datetime.now(), 114 platform=Platform.TELEGRAM, 115 chat_type="dm", 116 total_tokens=321, 117 ) 118 runner = _make_runner(session_entry) 119 runner._session_db.get_session_title.return_value = "My titled session" 120 121 result = await runner._handle_message(_make_event("/status")) 122 123 assert "**Session ID:** `sess-1`" in result 124 assert "**Title:** My titled session" in result 125 126 127 @pytest.mark.asyncio 128 async def test_status_command_reads_token_totals_from_session_db(): 129 """Regression test for #17158: /status must source token totals from the 130 SQLite SessionDB (where run_agent.py persists them) and sum all component 131 counts, not from SessionEntry (which the agent never writes).""" 132 session_entry = SessionEntry( 133 session_key=build_session_key(_make_source()), 134 session_id="sess-1", 135 created_at=datetime.now(), 136 updated_at=datetime.now(), 137 platform=Platform.TELEGRAM, 138 chat_type="dm", 139 total_tokens=0, # SessionEntry never gets written to — always 0. 140 ) 141 runner = _make_runner(session_entry) 142 runner._session_db.get_session.return_value = { 143 "input_tokens": 1000, 144 "output_tokens": 250, 145 "cache_read_tokens": 500, 146 "cache_write_tokens": 100, 147 "reasoning_tokens": 50, 148 } 149 150 result = await runner._handle_message(_make_event("/status")) 151 152 # 1000 + 250 + 500 + 100 + 50 = 1,900 153 assert "**Tokens:** 1,900" in result 154 155 156 @pytest.mark.asyncio 157 async def test_status_command_tokens_zero_when_session_db_row_missing(): 158 """When the SessionDB has no row for the current session yet (fresh 159 session, no agent calls), /status reports 0 without raising.""" 160 session_entry = SessionEntry( 161 session_key=build_session_key(_make_source()), 162 session_id="sess-1", 163 created_at=datetime.now(), 164 updated_at=datetime.now(), 165 platform=Platform.TELEGRAM, 166 chat_type="dm", 167 total_tokens=999, # This should be ignored. 168 ) 169 runner = _make_runner(session_entry) 170 runner._session_db.get_session.return_value = None 171 172 result = await runner._handle_message(_make_event("/status")) 173 174 assert "**Tokens:** 0" in result 175 176 177 @pytest.mark.asyncio 178 async def test_agents_command_reports_active_agents_and_processes(monkeypatch): 179 session_key = build_session_key(_make_source()) 180 session_entry = SessionEntry( 181 session_key=session_key, 182 session_id="sess-1", 183 created_at=datetime.now(), 184 updated_at=datetime.now(), 185 platform=Platform.TELEGRAM, 186 chat_type="dm", 187 total_tokens=0, 188 ) 189 runner = _make_runner(session_entry) 190 running_agent = SimpleNamespace( 191 session_id="sess-running", 192 model="openrouter/test-model", 193 interrupt=MagicMock(), 194 get_activity_summary=lambda: {"seconds_since_activity": 0}, 195 ) 196 runner._running_agents[session_key] = running_agent 197 runner._running_agents_ts = {session_key: time.time() - 8} 198 runner._background_tasks = set() 199 200 class _FakeRegistry: 201 def list_sessions(self): 202 return [ 203 { 204 "session_id": "proc-1", 205 "status": "running", 206 "uptime_seconds": 17, 207 "command": "sleep 30", 208 } 209 ] 210 211 monkeypatch.setattr("tools.process_registry.process_registry", _FakeRegistry()) 212 213 result = await runner._handle_message(_make_event("/agents")) 214 215 assert "**Active agents:** 1" in result 216 assert "**Running background processes:** 1" in result 217 assert "proc-1" in result 218 running_agent.interrupt.assert_not_called() 219 220 221 @pytest.mark.asyncio 222 async def test_tasks_alias_routes_to_agents_command(monkeypatch): 223 session_entry = SessionEntry( 224 session_key=build_session_key(_make_source()), 225 session_id="sess-1", 226 created_at=datetime.now(), 227 updated_at=datetime.now(), 228 platform=Platform.TELEGRAM, 229 chat_type="dm", 230 total_tokens=0, 231 ) 232 runner = _make_runner(session_entry) 233 runner._background_tasks = set() 234 235 class _FakeRegistry: 236 def list_sessions(self): 237 return [] 238 239 monkeypatch.setattr("tools.process_registry.process_registry", _FakeRegistry()) 240 241 result = await runner._handle_message(_make_event("/tasks")) 242 243 assert "Active Agents & Tasks" in result 244 245 246 @pytest.mark.asyncio 247 async def test_handle_message_persists_agent_token_counts(monkeypatch): 248 import gateway.run as gateway_run 249 250 session_entry = SessionEntry( 251 session_key=build_session_key(_make_source()), 252 session_id="sess-1", 253 created_at=datetime.now(), 254 updated_at=datetime.now(), 255 platform=Platform.TELEGRAM, 256 chat_type="dm", 257 ) 258 runner = _make_runner(session_entry) 259 runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}] 260 runner._run_agent = AsyncMock( 261 return_value={ 262 "final_response": "ok", 263 "messages": [], 264 "tools": [], 265 "history_offset": 0, 266 "last_prompt_tokens": 80, 267 "input_tokens": 120, 268 "output_tokens": 45, 269 "model": "openai/test-model", 270 } 271 ) 272 273 monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) 274 monkeypatch.setattr( 275 "agent.model_metadata.get_model_context_length", 276 lambda *_args, **_kwargs: 100000, 277 ) 278 279 result = await runner._handle_message(_make_event("hello")) 280 281 assert result == "ok" 282 runner.session_store.update_session.assert_called_once_with( 283 session_entry.session_key, 284 last_prompt_tokens=80, 285 ) 286 287 288 @pytest.mark.asyncio 289 async def test_first_run_slack_home_channel_onboarding_uses_parent_command(monkeypatch): 290 import gateway.run as gateway_run 291 292 session_entry = SessionEntry( 293 session_key=build_session_key(_make_source(Platform.SLACK)), 294 session_id="sess-1", 295 created_at=datetime.now(), 296 updated_at=datetime.now(), 297 platform=Platform.SLACK, 298 chat_type="dm", 299 ) 300 runner = _make_runner(session_entry, platform=Platform.SLACK) 301 runner.session_store.load_transcript.return_value = [] 302 runner.session_store.has_any_sessions.return_value = False 303 runner._run_agent = AsyncMock( 304 return_value={ 305 "final_response": "ok", 306 "messages": [], 307 "tools": [], 308 "history_offset": 0, 309 "last_prompt_tokens": 0, 310 "input_tokens": 0, 311 "output_tokens": 0, 312 "model": "openai/test-model", 313 } 314 ) 315 316 monkeypatch.delenv("SLACK_HOME_CHANNEL", raising=False) 317 monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) 318 monkeypatch.setattr( 319 "agent.model_metadata.get_model_context_length", 320 lambda *_args, **_kwargs: 100000, 321 ) 322 323 result = await runner._handle_message(_make_event("hello", platform=Platform.SLACK)) 324 325 assert result == "ok" 326 runner.adapters[Platform.SLACK].send.assert_awaited_once() 327 onboarding = runner.adapters[Platform.SLACK].send.await_args.args[1] 328 assert "/hermes sethome" in onboarding 329 assert "Type /sethome" not in onboarding 330 331 332 @pytest.mark.asyncio 333 async def test_first_run_non_slack_home_channel_onboarding_keeps_direct_command(monkeypatch): 334 import gateway.run as gateway_run 335 336 session_entry = SessionEntry( 337 session_key=build_session_key(_make_source(Platform.TELEGRAM)), 338 session_id="sess-1", 339 created_at=datetime.now(), 340 updated_at=datetime.now(), 341 platform=Platform.TELEGRAM, 342 chat_type="dm", 343 ) 344 runner = _make_runner(session_entry, platform=Platform.TELEGRAM) 345 runner.session_store.load_transcript.return_value = [] 346 runner.session_store.has_any_sessions.return_value = False 347 runner._run_agent = AsyncMock( 348 return_value={ 349 "final_response": "ok", 350 "messages": [], 351 "tools": [], 352 "history_offset": 0, 353 "last_prompt_tokens": 0, 354 "input_tokens": 0, 355 "output_tokens": 0, 356 "model": "openai/test-model", 357 } 358 ) 359 360 monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False) 361 monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) 362 monkeypatch.setattr( 363 "agent.model_metadata.get_model_context_length", 364 lambda *_args, **_kwargs: 100000, 365 ) 366 367 result = await runner._handle_message(_make_event("hello", platform=Platform.TELEGRAM)) 368 369 assert result == "ok" 370 runner.adapters[Platform.TELEGRAM].send.assert_awaited_once() 371 onboarding = runner.adapters[Platform.TELEGRAM].send.await_args.args[1] 372 assert "Type /sethome" in onboarding 373 374 375 @pytest.mark.asyncio 376 async def test_handle_message_discards_stale_result_after_session_invalidation(monkeypatch): 377 import gateway.run as gateway_run 378 379 session_entry = SessionEntry( 380 session_key=build_session_key(_make_source()), 381 session_id="sess-1", 382 created_at=datetime.now(), 383 updated_at=datetime.now(), 384 platform=Platform.TELEGRAM, 385 chat_type="dm", 386 ) 387 runner = _make_runner(session_entry) 388 runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}] 389 session_key = session_entry.session_key 390 runner.adapters[Platform.TELEGRAM]._post_delivery_callbacks = {session_key: object()} 391 392 async def _stale_result(**kwargs): 393 runner._invalidate_session_run_generation(kwargs["session_key"], reason="test_stale_result") 394 return { 395 "final_response": "late reply", 396 "messages": [], 397 "tools": [], 398 "history_offset": 0, 399 "last_prompt_tokens": 80, 400 "input_tokens": 120, 401 "output_tokens": 45, 402 "model": "openai/test-model", 403 } 404 405 runner._run_agent = AsyncMock(side_effect=_stale_result) 406 407 monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) 408 monkeypatch.setattr( 409 "agent.model_metadata.get_model_context_length", 410 lambda *_args, **_kwargs: 100000, 411 ) 412 413 result = await runner._handle_message(_make_event("hello")) 414 415 assert result is None 416 runner.session_store.append_to_transcript.assert_not_called() 417 runner.session_store.update_session.assert_not_called() 418 assert session_key not in runner.adapters[Platform.TELEGRAM]._post_delivery_callbacks 419 420 421 @pytest.mark.asyncio 422 async def test_handle_message_stale_result_keeps_newer_generation_callback(monkeypatch): 423 import gateway.run as gateway_run 424 425 class _Adapter: 426 def __init__(self): 427 self._post_delivery_callbacks = {} 428 429 async def send(self, *args, **kwargs): 430 return None 431 432 def pop_post_delivery_callback(self, session_key, *, generation=None): 433 entry = self._post_delivery_callbacks.get(session_key) 434 if entry is None: 435 return None 436 if isinstance(entry, tuple): 437 entry_generation, callback = entry 438 if generation is not None and entry_generation != generation: 439 return None 440 self._post_delivery_callbacks.pop(session_key, None) 441 return callback 442 if generation is not None: 443 return None 444 return self._post_delivery_callbacks.pop(session_key, None) 445 446 session_entry = SessionEntry( 447 session_key=build_session_key(_make_source()), 448 session_id="sess-1", 449 created_at=datetime.now(), 450 updated_at=datetime.now(), 451 platform=Platform.TELEGRAM, 452 chat_type="dm", 453 ) 454 runner = _make_runner(session_entry) 455 runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}] 456 session_key = session_entry.session_key 457 adapter = _Adapter() 458 runner.adapters[Platform.TELEGRAM] = adapter 459 460 async def _stale_result(**kwargs): 461 # Simulate a newer run claiming the callback slot before the stale run unwinds. 462 runner._session_run_generation[session_key] = 2 463 adapter._post_delivery_callbacks[session_key] = (2, lambda: None) 464 return { 465 "final_response": "late reply", 466 "messages": [], 467 "tools": [], 468 "history_offset": 0, 469 "last_prompt_tokens": 80, 470 "input_tokens": 120, 471 "output_tokens": 45, 472 "model": "openai/test-model", 473 } 474 475 runner._run_agent = AsyncMock(side_effect=_stale_result) 476 477 monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) 478 monkeypatch.setattr( 479 "agent.model_metadata.get_model_context_length", 480 lambda *_args, **_kwargs: 100000, 481 ) 482 483 result = await runner._handle_message(_make_event("hello")) 484 485 assert result is None 486 assert session_key in adapter._post_delivery_callbacks 487 assert adapter._post_delivery_callbacks[session_key][0] == 2 488 489 490 491 @pytest.mark.asyncio 492 async def test_status_command_bypasses_active_session_guard(): 493 """When an agent is running, /status must be dispatched immediately via 494 base.handle_message — not queued or treated as an interrupt (#5046).""" 495 import asyncio 496 from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType 497 from gateway.session import build_session_key 498 from gateway.config import Platform, PlatformConfig, GatewayConfig 499 500 source = _make_source() 501 session_key = build_session_key(source) 502 503 handler_called_with = [] 504 505 async def fake_handler(event): 506 handler_called_with.append(event) 507 return "📊 **Hermes Gateway Status**\n**Agent Running:** Yes ⚡" 508 509 # Concrete subclass to avoid abstract method errors 510 class _ConcreteAdapter(BasePlatformAdapter): 511 platform = Platform.TELEGRAM 512 513 async def connect(self): pass 514 async def disconnect(self): pass 515 async def send(self, chat_id, content, **kwargs): pass 516 async def get_chat_info(self, chat_id): return {} 517 518 platform_config = PlatformConfig(enabled=True, token="***") 519 adapter = _ConcreteAdapter(platform_config, Platform.TELEGRAM) 520 adapter.set_message_handler(fake_handler) 521 522 sent = [] 523 524 async def fake_send_with_retry(chat_id, content, reply_to=None, metadata=None): 525 sent.append(content) 526 527 adapter._send_with_retry = fake_send_with_retry 528 529 # Simulate an active session 530 interrupt_event = asyncio.Event() 531 adapter._active_sessions[session_key] = interrupt_event 532 533 event = MessageEvent( 534 text="/status", 535 source=source, 536 message_id="m1", 537 message_type=MessageType.COMMAND, 538 ) 539 await adapter.handle_message(event) 540 541 assert handler_called_with, "/status handler was never called (event was queued or dropped)" 542 assert sent, "/status response was never sent" 543 assert "Agent Running" in sent[0] 544 assert not interrupt_event.is_set(), "/status incorrectly triggered an agent interrupt" 545 assert session_key not in adapter._pending_messages, "/status was incorrectly queued" 546 547 548 @pytest.mark.asyncio 549 async def test_profile_command_reports_custom_root_profile(monkeypatch, tmp_path): 550 """Gateway /profile detects custom-root profiles (not under ~/.hermes).""" 551 from pathlib import Path 552 553 session_entry = SessionEntry( 554 session_key=build_session_key(_make_source()), 555 session_id="sess-1", 556 created_at=datetime.now(), 557 updated_at=datetime.now(), 558 platform=Platform.TELEGRAM, 559 chat_type="dm", 560 ) 561 runner = _make_runner(session_entry) 562 profile_home = tmp_path / "profiles" / "coder" 563 564 monkeypatch.setenv("HERMES_HOME", str(profile_home)) 565 monkeypatch.setattr(Path, "home", lambda: tmp_path / "unrelated-home") 566 567 result = await runner._handle_profile_command(_make_event("/profile")) 568 569 assert "**Profile:** `coder`" in result 570 assert f"**Home:** `{profile_home}`" in result 571 572 573 @pytest.mark.asyncio 574 async def test_post_delivery_callback_generation_snapshot_happens_after_bind(): 575 """Regression: the callback_generation snapshot in _process_message_background 576 must happen AFTER the handler runs, not before. 577 578 _hermes_run_generation is set on the interrupt event by 579 GatewayRunner._bind_adapter_run_generation during _handle_message_with_agent. 580 The earlier snapshot-at-task-start always captured None, which bypassed the 581 generation-ownership check in pop_post_delivery_callback and let stale runs 582 fire a fresher run's callbacks. 583 """ 584 import asyncio 585 from gateway.platforms.base import BasePlatformAdapter 586 587 source = _make_source() 588 session_key = build_session_key(source) 589 fired = [] 590 591 class _ConcreteAdapter(BasePlatformAdapter): 592 platform = Platform.TELEGRAM 593 594 async def connect(self): pass 595 async def disconnect(self): pass 596 async def send(self, chat_id, content, **kwargs): pass 597 async def get_chat_info(self, chat_id): return {} 598 599 adapter = _ConcreteAdapter( 600 PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM 601 ) 602 603 async def fake_handler(event): 604 # Simulate what _bind_adapter_run_generation does mid-run. 605 interrupt_event = adapter._active_sessions.get(session_key) 606 setattr(interrupt_event, "_hermes_run_generation", 1) 607 # Stale run registers its callback at generation=1. 608 adapter.register_post_delivery_callback( 609 session_key, 610 lambda: fired.append("older"), 611 generation=1, 612 ) 613 # A fresher run overwrites with generation=2 (different dict entry). 614 adapter.register_post_delivery_callback( 615 session_key, 616 lambda: fired.append("newer"), 617 generation=2, 618 ) 619 return None 620 621 adapter.set_message_handler(fake_handler) 622 event = MessageEvent(text="hello", source=source, message_id="m1") 623 624 await adapter.handle_message(event) 625 tasks = list(adapter._background_tasks) 626 assert tasks, "expected background task to be created" 627 await asyncio.gather(*tasks) 628 629 # The stale run (generation=1) must NOT fire the fresher run's callback 630 # (generation=2). With the pre-fix code, callback_generation was snapshotted 631 # as None before the handler ran, bypassing the ownership check and firing 632 # "newer" anyway. 633 assert fired == [] 634 assert session_key in adapter._post_delivery_callbacks 635 assert adapter._post_delivery_callbacks[session_key][0] == 2