test_cli_new_session.py
1 """Regression tests for CLI fresh-session commands.""" 2 3 from __future__ import annotations 4 5 import importlib 6 import os 7 import sys 8 from datetime import datetime, timedelta 9 from unittest.mock import MagicMock, patch 10 11 from hermes_state import SessionDB 12 from tools.todo_tool import TodoStore 13 14 15 class _FakeCompressor: 16 """Minimal stand-in for ContextCompressor.""" 17 18 def __init__(self): 19 self.last_prompt_tokens = 500 20 self.last_completion_tokens = 200 21 self.last_total_tokens = 700 22 self.compression_count = 3 23 self._context_probed = True 24 25 26 class _FakeAgent: 27 def __init__(self, session_id: str, session_start): 28 self.session_id = session_id 29 self.session_start = session_start 30 self.model = "anthropic/claude-opus-4.6" 31 self._last_flushed_db_idx = 7 32 self._todo_store = TodoStore() 33 self._todo_store.write( 34 [{"id": "t1", "content": "unfinished task", "status": "in_progress"}] 35 ) 36 self.commit_memory_session = MagicMock() 37 self._invalidate_system_prompt = MagicMock() 38 39 # Token counters (non-zero to verify reset) 40 self.session_total_tokens = 1000 41 self.session_input_tokens = 600 42 self.session_output_tokens = 400 43 self.session_prompt_tokens = 550 44 self.session_completion_tokens = 350 45 self.session_cache_read_tokens = 100 46 self.session_cache_write_tokens = 50 47 self.session_reasoning_tokens = 80 48 self.session_api_calls = 5 49 self.session_estimated_cost_usd = 0.42 50 self.session_cost_status = "estimated" 51 self.session_cost_source = "openrouter" 52 self.context_compressor = _FakeCompressor() 53 54 def reset_session_state(self): 55 """Mirror the real AIAgent.reset_session_state().""" 56 self.session_total_tokens = 0 57 self.session_input_tokens = 0 58 self.session_output_tokens = 0 59 self.session_prompt_tokens = 0 60 self.session_completion_tokens = 0 61 self.session_cache_read_tokens = 0 62 self.session_cache_write_tokens = 0 63 self.session_reasoning_tokens = 0 64 self.session_api_calls = 0 65 self.session_estimated_cost_usd = 0.0 66 self.session_cost_status = "unknown" 67 self.session_cost_source = "none" 68 if hasattr(self, "context_compressor") and self.context_compressor: 69 self.context_compressor.last_prompt_tokens = 0 70 self.context_compressor.last_completion_tokens = 0 71 self.context_compressor.last_total_tokens = 0 72 self.context_compressor.compression_count = 0 73 self.context_compressor._context_probed = False 74 75 76 def _make_cli(env_overrides=None, config_overrides=None, **kwargs): 77 """Create a HermesCLI instance with minimal mocking.""" 78 _clean_config = { 79 "model": { 80 "default": "anthropic/claude-opus-4.6", 81 "base_url": "https://openrouter.ai/api/v1", 82 "provider": "auto", 83 }, 84 "display": {"compact": False, "tool_progress": "all"}, 85 "agent": {}, 86 "terminal": {"env_type": "local"}, 87 } 88 if config_overrides: 89 _clean_config.update(config_overrides) 90 clean_env = {"LLM_MODEL": "", "HERMES_MAX_ITERATIONS": ""} 91 if env_overrides: 92 clean_env.update(env_overrides) 93 prompt_toolkit_stubs = { 94 "prompt_toolkit": MagicMock(), 95 "prompt_toolkit.history": MagicMock(), 96 "prompt_toolkit.styles": MagicMock(), 97 "prompt_toolkit.patch_stdout": MagicMock(), 98 "prompt_toolkit.application": MagicMock(), 99 "prompt_toolkit.layout": MagicMock(), 100 "prompt_toolkit.layout.processors": MagicMock(), 101 "prompt_toolkit.filters": MagicMock(), 102 "prompt_toolkit.layout.dimension": MagicMock(), 103 "prompt_toolkit.layout.menus": MagicMock(), 104 "prompt_toolkit.widgets": MagicMock(), 105 "prompt_toolkit.key_binding": MagicMock(), 106 "prompt_toolkit.completion": MagicMock(), 107 "prompt_toolkit.formatted_text": MagicMock(), 108 "prompt_toolkit.auto_suggest": MagicMock(), 109 } 110 with patch.dict(sys.modules, prompt_toolkit_stubs), patch.dict( 111 "os.environ", clean_env, clear=False 112 ): 113 import cli as _cli_mod 114 115 _cli_mod = importlib.reload(_cli_mod) 116 with patch.object(_cli_mod, "get_tool_definitions", return_value=[]), patch.dict( 117 _cli_mod.__dict__, {"CLI_CONFIG": _clean_config} 118 ): 119 return _cli_mod.HermesCLI(**kwargs) 120 121 122 def _prepare_cli_with_active_session(tmp_path): 123 cli = _make_cli() 124 cli._session_db = SessionDB(db_path=tmp_path / "state.db") 125 cli._session_db.create_session(session_id=cli.session_id, source="cli", model=cli.model) 126 127 cli.agent = _FakeAgent(cli.session_id, cli.session_start) 128 cli.conversation_history = [{"role": "user", "content": "hello"}] 129 130 old_session_start = cli.session_start - timedelta(seconds=1) 131 cli.session_start = old_session_start 132 cli.agent.session_start = old_session_start 133 return cli 134 135 136 def test_new_command_creates_real_fresh_session_and_resets_agent_state(tmp_path): 137 cli = _prepare_cli_with_active_session(tmp_path) 138 old_session_id = cli.session_id 139 old_session_start = cli.session_start 140 141 cli.process_command("/new") 142 143 assert cli.session_id != old_session_id 144 145 old_session = cli._session_db.get_session(old_session_id) 146 assert old_session is not None 147 assert old_session["end_reason"] == "new_session" 148 149 new_session = cli._session_db.get_session(cli.session_id) 150 assert new_session is not None 151 152 cli._session_db.append_message(cli.session_id, role="user", content="next turn") 153 154 assert cli.agent.session_id == cli.session_id 155 assert cli.agent._last_flushed_db_idx == 0 156 assert cli.agent._todo_store.read() == [] 157 assert cli.session_start > old_session_start 158 assert cli.agent.session_start == cli.session_start 159 cli.agent._invalidate_system_prompt.assert_called_once() 160 161 162 def test_reset_command_is_alias_for_new_session(tmp_path): 163 cli = _prepare_cli_with_active_session(tmp_path) 164 old_session_id = cli.session_id 165 166 cli.process_command("/reset") 167 168 assert cli.session_id != old_session_id 169 assert cli._session_db.get_session(old_session_id)["end_reason"] == "new_session" 170 assert cli._session_db.get_session(cli.session_id) is not None 171 172 173 def test_clear_command_starts_new_session_before_redrawing(tmp_path): 174 cli = _prepare_cli_with_active_session(tmp_path) 175 cli.console = MagicMock() 176 cli.show_banner = MagicMock() 177 178 old_session_id = cli.session_id 179 cli.process_command("/clear") 180 181 assert cli.session_id != old_session_id 182 assert cli._session_db.get_session(old_session_id)["end_reason"] == "new_session" 183 assert cli._session_db.get_session(cli.session_id) is not None 184 cli.console.clear.assert_called_once() 185 cli.show_banner.assert_called_once() 186 assert cli.conversation_history == [] 187 188 189 def test_new_session_resets_token_counters(tmp_path): 190 """Regression test for #2099: /new must zero all token counters.""" 191 cli = _prepare_cli_with_active_session(tmp_path) 192 193 # Verify counters are non-zero before reset 194 agent = cli.agent 195 assert agent.session_total_tokens > 0 196 assert agent.session_api_calls > 0 197 assert agent.context_compressor.compression_count > 0 198 199 cli.process_command("/new") 200 201 # All agent token counters must be zero 202 assert agent.session_total_tokens == 0 203 assert agent.session_input_tokens == 0 204 assert agent.session_output_tokens == 0 205 assert agent.session_prompt_tokens == 0 206 assert agent.session_completion_tokens == 0 207 assert agent.session_cache_read_tokens == 0 208 assert agent.session_cache_write_tokens == 0 209 assert agent.session_reasoning_tokens == 0 210 assert agent.session_api_calls == 0 211 assert agent.session_estimated_cost_usd == 0.0 212 assert agent.session_cost_status == "unknown" 213 assert agent.session_cost_source == "none" 214 215 # Context compressor counters must also be zero 216 comp = agent.context_compressor 217 assert comp.last_prompt_tokens == 0 218 assert comp.last_completion_tokens == 0 219 assert comp.last_total_tokens == 0 220 assert comp.compression_count == 0 221 assert comp._context_probed is False 222 223 224 def test_new_session_with_title(capsys): 225 """new_session(title=...) creates a session and sets the title.""" 226 cli = _make_cli() 227 cli._session_db = MagicMock() 228 cli.agent = _FakeAgent("old_session_id", datetime.now()) 229 cli.conversation_history = [] 230 231 cli.new_session(title="My Test Session") 232 233 # Assert set_session_title was called with the new session ID and sanitized title 234 cli._session_db.set_session_title.assert_called_once() 235 call_args = cli._session_db.set_session_title.call_args 236 assert call_args[0][0] == cli.session_id 237 assert call_args[0][1] == "My Test Session" 238 239 captured = capsys.readouterr() 240 assert "My Test Session" in captured.out 241 242 243 def test_new_session_with_duplicate_title_surfaces_error(capsys): 244 """new_session(title=...) handles ValueError from a duplicate-title conflict. 245 246 The session is still created; the title assignment fails; the success banner 247 must not claim the rejected title as the session name. 248 """ 249 cli = _make_cli() 250 cli._session_db = MagicMock() 251 cli._session_db.set_session_title.side_effect = ValueError( 252 "Title 'Dup' is already in use by session abc-123" 253 ) 254 cli.agent = _FakeAgent("old_session_id", datetime.now()) 255 cli.conversation_history = [] 256 257 # Capture warnings printed via cli._cprint. After importlib.reload(), 258 # the method's __globals__ dict is the one from the live module — patch 259 # the exact dict the method will read. 260 warnings: list[str] = [] 261 method_globals = cli.new_session.__globals__ 262 original = method_globals["_cprint"] 263 method_globals["_cprint"] = lambda msg: warnings.append(msg) 264 try: 265 cli.new_session(title="Dup") 266 finally: 267 method_globals["_cprint"] = original 268 269 cli._session_db.set_session_title.assert_called_once() 270 joined = "\n".join(warnings) 271 assert "already in use" in joined 272 assert "session started untitled" in joined 273 274 # The success banner must NOT claim the rejected title as the session name. 275 captured = capsys.readouterr() 276 assert "New session started: Dup" not in captured.out 277 assert "New session started!" in captured.out