/ tests / cli / test_cli_new_session.py
test_cli_new_session.py
  1  """Regression tests for CLI fresh-session commands."""
  2  
  3  from __future__ import annotations
  4  
  5  import importlib
  6  import os
  7  import sys
  8  from datetime import datetime, timedelta
  9  from unittest.mock import MagicMock, patch
 10  
 11  from hermes_state import SessionDB
 12  from tools.todo_tool import TodoStore
 13  
 14  
 15  class _FakeCompressor:
 16      """Minimal stand-in for ContextCompressor."""
 17  
 18      def __init__(self):
 19          self.last_prompt_tokens = 500
 20          self.last_completion_tokens = 200
 21          self.last_total_tokens = 700
 22          self.compression_count = 3
 23          self._context_probed = True
 24  
 25  
 26  class _FakeAgent:
 27      def __init__(self, session_id: str, session_start):
 28          self.session_id = session_id
 29          self.session_start = session_start
 30          self.model = "anthropic/claude-opus-4.6"
 31          self._last_flushed_db_idx = 7
 32          self._todo_store = TodoStore()
 33          self._todo_store.write(
 34              [{"id": "t1", "content": "unfinished task", "status": "in_progress"}]
 35          )
 36          self.commit_memory_session = MagicMock()
 37          self._invalidate_system_prompt = MagicMock()
 38  
 39          # Token counters (non-zero to verify reset)
 40          self.session_total_tokens = 1000
 41          self.session_input_tokens = 600
 42          self.session_output_tokens = 400
 43          self.session_prompt_tokens = 550
 44          self.session_completion_tokens = 350
 45          self.session_cache_read_tokens = 100
 46          self.session_cache_write_tokens = 50
 47          self.session_reasoning_tokens = 80
 48          self.session_api_calls = 5
 49          self.session_estimated_cost_usd = 0.42
 50          self.session_cost_status = "estimated"
 51          self.session_cost_source = "openrouter"
 52          self.context_compressor = _FakeCompressor()
 53  
 54      def reset_session_state(self):
 55          """Mirror the real AIAgent.reset_session_state()."""
 56          self.session_total_tokens = 0
 57          self.session_input_tokens = 0
 58          self.session_output_tokens = 0
 59          self.session_prompt_tokens = 0
 60          self.session_completion_tokens = 0
 61          self.session_cache_read_tokens = 0
 62          self.session_cache_write_tokens = 0
 63          self.session_reasoning_tokens = 0
 64          self.session_api_calls = 0
 65          self.session_estimated_cost_usd = 0.0
 66          self.session_cost_status = "unknown"
 67          self.session_cost_source = "none"
 68          if hasattr(self, "context_compressor") and self.context_compressor:
 69              self.context_compressor.last_prompt_tokens = 0
 70              self.context_compressor.last_completion_tokens = 0
 71              self.context_compressor.last_total_tokens = 0
 72              self.context_compressor.compression_count = 0
 73              self.context_compressor._context_probed = False
 74  
 75  
 76  def _make_cli(env_overrides=None, config_overrides=None, **kwargs):
 77      """Create a HermesCLI instance with minimal mocking."""
 78      _clean_config = {
 79          "model": {
 80              "default": "anthropic/claude-opus-4.6",
 81              "base_url": "https://openrouter.ai/api/v1",
 82              "provider": "auto",
 83          },
 84          "display": {"compact": False, "tool_progress": "all"},
 85          "agent": {},
 86          "terminal": {"env_type": "local"},
 87      }
 88      if config_overrides:
 89          _clean_config.update(config_overrides)
 90      clean_env = {"LLM_MODEL": "", "HERMES_MAX_ITERATIONS": ""}
 91      if env_overrides:
 92          clean_env.update(env_overrides)
 93      prompt_toolkit_stubs = {
 94          "prompt_toolkit": MagicMock(),
 95          "prompt_toolkit.history": MagicMock(),
 96          "prompt_toolkit.styles": MagicMock(),
 97          "prompt_toolkit.patch_stdout": MagicMock(),
 98          "prompt_toolkit.application": MagicMock(),
 99          "prompt_toolkit.layout": MagicMock(),
100          "prompt_toolkit.layout.processors": MagicMock(),
101          "prompt_toolkit.filters": MagicMock(),
102          "prompt_toolkit.layout.dimension": MagicMock(),
103          "prompt_toolkit.layout.menus": MagicMock(),
104          "prompt_toolkit.widgets": MagicMock(),
105          "prompt_toolkit.key_binding": MagicMock(),
106          "prompt_toolkit.completion": MagicMock(),
107          "prompt_toolkit.formatted_text": MagicMock(),
108          "prompt_toolkit.auto_suggest": MagicMock(),
109      }
110      with patch.dict(sys.modules, prompt_toolkit_stubs), patch.dict(
111          "os.environ", clean_env, clear=False
112      ):
113          import cli as _cli_mod
114  
115          _cli_mod = importlib.reload(_cli_mod)
116          with patch.object(_cli_mod, "get_tool_definitions", return_value=[]), patch.dict(
117              _cli_mod.__dict__, {"CLI_CONFIG": _clean_config}
118          ):
119              return _cli_mod.HermesCLI(**kwargs)
120  
121  
122  def _prepare_cli_with_active_session(tmp_path):
123      cli = _make_cli()
124      cli._session_db = SessionDB(db_path=tmp_path / "state.db")
125      cli._session_db.create_session(session_id=cli.session_id, source="cli", model=cli.model)
126  
127      cli.agent = _FakeAgent(cli.session_id, cli.session_start)
128      cli.conversation_history = [{"role": "user", "content": "hello"}]
129  
130      old_session_start = cli.session_start - timedelta(seconds=1)
131      cli.session_start = old_session_start
132      cli.agent.session_start = old_session_start
133      return cli
134  
135  
136  def test_new_command_creates_real_fresh_session_and_resets_agent_state(tmp_path):
137      cli = _prepare_cli_with_active_session(tmp_path)
138      old_session_id = cli.session_id
139      old_session_start = cli.session_start
140  
141      cli.process_command("/new")
142  
143      assert cli.session_id != old_session_id
144  
145      old_session = cli._session_db.get_session(old_session_id)
146      assert old_session is not None
147      assert old_session["end_reason"] == "new_session"
148  
149      new_session = cli._session_db.get_session(cli.session_id)
150      assert new_session is not None
151  
152      cli._session_db.append_message(cli.session_id, role="user", content="next turn")
153  
154      assert cli.agent.session_id == cli.session_id
155      assert cli.agent._last_flushed_db_idx == 0
156      assert cli.agent._todo_store.read() == []
157      assert cli.session_start > old_session_start
158      assert cli.agent.session_start == cli.session_start
159      cli.agent._invalidate_system_prompt.assert_called_once()
160  
161  
162  def test_reset_command_is_alias_for_new_session(tmp_path):
163      cli = _prepare_cli_with_active_session(tmp_path)
164      old_session_id = cli.session_id
165  
166      cli.process_command("/reset")
167  
168      assert cli.session_id != old_session_id
169      assert cli._session_db.get_session(old_session_id)["end_reason"] == "new_session"
170      assert cli._session_db.get_session(cli.session_id) is not None
171  
172  
173  def test_clear_command_starts_new_session_before_redrawing(tmp_path):
174      cli = _prepare_cli_with_active_session(tmp_path)
175      cli.console = MagicMock()
176      cli.show_banner = MagicMock()
177  
178      old_session_id = cli.session_id
179      cli.process_command("/clear")
180  
181      assert cli.session_id != old_session_id
182      assert cli._session_db.get_session(old_session_id)["end_reason"] == "new_session"
183      assert cli._session_db.get_session(cli.session_id) is not None
184      cli.console.clear.assert_called_once()
185      cli.show_banner.assert_called_once()
186      assert cli.conversation_history == []
187  
188  
189  def test_new_session_resets_token_counters(tmp_path):
190      """Regression test for #2099: /new must zero all token counters."""
191      cli = _prepare_cli_with_active_session(tmp_path)
192  
193      # Verify counters are non-zero before reset
194      agent = cli.agent
195      assert agent.session_total_tokens > 0
196      assert agent.session_api_calls > 0
197      assert agent.context_compressor.compression_count > 0
198  
199      cli.process_command("/new")
200  
201      # All agent token counters must be zero
202      assert agent.session_total_tokens == 0
203      assert agent.session_input_tokens == 0
204      assert agent.session_output_tokens == 0
205      assert agent.session_prompt_tokens == 0
206      assert agent.session_completion_tokens == 0
207      assert agent.session_cache_read_tokens == 0
208      assert agent.session_cache_write_tokens == 0
209      assert agent.session_reasoning_tokens == 0
210      assert agent.session_api_calls == 0
211      assert agent.session_estimated_cost_usd == 0.0
212      assert agent.session_cost_status == "unknown"
213      assert agent.session_cost_source == "none"
214  
215      # Context compressor counters must also be zero
216      comp = agent.context_compressor
217      assert comp.last_prompt_tokens == 0
218      assert comp.last_completion_tokens == 0
219      assert comp.last_total_tokens == 0
220      assert comp.compression_count == 0
221      assert comp._context_probed is False
222  
223  
224  def test_new_session_with_title(capsys):
225      """new_session(title=...) creates a session and sets the title."""
226      cli = _make_cli()
227      cli._session_db = MagicMock()
228      cli.agent = _FakeAgent("old_session_id", datetime.now())
229      cli.conversation_history = []
230  
231      cli.new_session(title="My Test Session")
232  
233      # Assert set_session_title was called with the new session ID and sanitized title
234      cli._session_db.set_session_title.assert_called_once()
235      call_args = cli._session_db.set_session_title.call_args
236      assert call_args[0][0] == cli.session_id
237      assert call_args[0][1] == "My Test Session"
238  
239      captured = capsys.readouterr()
240      assert "My Test Session" in captured.out
241  
242  
243  def test_new_session_with_duplicate_title_surfaces_error(capsys):
244      """new_session(title=...) handles ValueError from a duplicate-title conflict.
245  
246      The session is still created; the title assignment fails; the success banner
247      must not claim the rejected title as the session name.
248      """
249      cli = _make_cli()
250      cli._session_db = MagicMock()
251      cli._session_db.set_session_title.side_effect = ValueError(
252          "Title 'Dup' is already in use by session abc-123"
253      )
254      cli.agent = _FakeAgent("old_session_id", datetime.now())
255      cli.conversation_history = []
256  
257      # Capture warnings printed via cli._cprint. After importlib.reload(),
258      # the method's __globals__ dict is the one from the live module — patch
259      # the exact dict the method will read.
260      warnings: list[str] = []
261      method_globals = cli.new_session.__globals__
262      original = method_globals["_cprint"]
263      method_globals["_cprint"] = lambda msg: warnings.append(msg)
264      try:
265          cli.new_session(title="Dup")
266      finally:
267          method_globals["_cprint"] = original
268  
269      cli._session_db.set_session_title.assert_called_once()
270      joined = "\n".join(warnings)
271      assert "already in use" in joined
272      assert "session started untitled" in joined
273  
274      # The success banner must NOT claim the rejected title as the session name.
275      captured = capsys.readouterr()
276      assert "New session started: Dup" not in captured.out
277      assert "New session started!" in captured.out