test_achievements_plugin.py
1 """Tests for the bundled hermes-achievements dashboard plugin. 2 3 These target the two behaviors that matter for official integration: 4 5 * The 200-session scan cap is removed — the plugin now walks the entire 6 session history by default. Lifetime badges (tens of thousands of 7 tool calls) were unreachable before this fix on long-running installs. 8 * First-ever scans run in a background thread so the dashboard request 9 path never blocks, even on 8000+ session databases where a cold scan 10 takes minutes. 11 12 The upstream repo ships its own unittest suite under 13 ``plugins/hermes-achievements/tests/`` covering the achievement engine 14 internals (tier math, secret-state handling, catalog invariants). These 15 tests live at the hermes-agent level and focus on the integration 16 contract: the plugin scans ALL of your sessions, not the first 200. 17 """ 18 from __future__ import annotations 19 20 import importlib.util 21 import sys 22 import threading 23 import time 24 from pathlib import Path 25 from typing import Any, Dict, List, Optional 26 27 import pytest 28 29 PLUGIN_MODULE_PATH = ( 30 Path(__file__).resolve().parents[2] 31 / "plugins" 32 / "hermes-achievements" 33 / "dashboard" 34 / "plugin_api.py" 35 ) 36 37 38 @pytest.fixture 39 def plugin_api(tmp_path, monkeypatch): 40 """Load plugin_api with isolated ~/.hermes so state/snapshot files don't collide. 41 42 We load the module fresh per test because the plugin keeps module-level 43 caches (``_SNAPSHOT_CACHE``, ``_SCAN_STATUS``, background thread handle). 44 Reloading gives each test a clean world. 45 """ 46 monkeypatch.setattr(Path, "home", lambda: tmp_path) 47 48 spec = importlib.util.spec_from_file_location( 49 f"plugin_api_test_{id(tmp_path)}", PLUGIN_MODULE_PATH 50 ) 51 module = importlib.util.module_from_spec(spec) 52 spec.loader.exec_module(module) 53 # Stash monkeypatch so ``_install_fake_session_db`` can use it to 54 # swap ``sys.modules['hermes_state']`` with auto-restoration. Without 55 # this, a raw ``sys.modules[...] = fake`` assignment would leak the 56 # fake into later tests in the same xdist worker — breaking every 57 # test that does ``from hermes_state import SessionDB``. 58 module._test_monkeypatch = monkeypatch 59 yield module 60 61 62 class _FakeSessionDB: 63 """Stand-in for hermes_state.SessionDB that records scan calls.""" 64 65 def __init__(self, session_count: int): 66 self.session_count = session_count 67 self.last_limit: Optional[int] = None 68 self.last_include_children: Optional[bool] = None 69 self.list_calls = 0 70 self.messages_calls = 0 71 72 def list_sessions_rich( 73 self, 74 source: Optional[str] = None, 75 exclude_sources: Optional[List[str]] = None, 76 limit: int = 20, 77 offset: int = 0, 78 include_children: bool = False, 79 project_compression_tips: bool = True, 80 ) -> List[Dict[str, Any]]: 81 self.last_limit = limit 82 self.last_include_children = include_children 83 self.list_calls += 1 84 # SQLite semantics: LIMIT -1 = unlimited. Honor that here. 85 effective = self.session_count if limit == -1 else min(self.session_count, limit) 86 now = int(time.time()) 87 return [ 88 { 89 "id": f"sess-{i}", 90 "title": f"Session {i}", 91 "preview": f"preview {i}", 92 "started_at": now - (self.session_count - i) * 60, 93 "last_active": now - (self.session_count - i) * 60 + 30, 94 "source": "cli", 95 "model": "test-model", 96 } 97 for i in range(effective) 98 ] 99 100 def get_messages(self, session_id: str) -> List[Dict[str, Any]]: 101 self.messages_calls += 1 102 return [ 103 {"role": "user", "content": f"ask {session_id}"}, 104 { 105 "role": "assistant", 106 "tool_calls": [{"function": {"name": "terminal"}}], 107 }, 108 {"role": "tool", "tool_name": "terminal", "content": "ok"}, 109 ] 110 111 def close(self) -> None: 112 pass 113 114 115 def _install_fake_session_db(plugin_api, fake_db): 116 """Inject a fake SessionDB so ``scan_sessions`` finds it via its local import. 117 118 Uses the monkeypatch stashed on ``plugin_api`` by the fixture, so the 119 ``sys.modules['hermes_state']`` swap is auto-restored at test teardown 120 and cannot leak into unrelated tests in the same xdist worker. 121 """ 122 fake_module = type(sys)("hermes_state") 123 fake_module.SessionDB = lambda: fake_db 124 plugin_api._test_monkeypatch.setitem(sys.modules, "hermes_state", fake_module) 125 126 127 def test_scan_sessions_default_scans_all_history_not_first_200(plugin_api): 128 """Bug regression: ``scan_sessions()`` used to cap at limit=200. 129 130 A user with 8000+ sessions would only see ~2% of their history in 131 achievement totals, making lifetime badges unreachable. The default 132 now passes ``LIMIT -1`` (SQLite "unlimited") to ``list_sessions_rich``. 133 """ 134 fake_db = _FakeSessionDB(session_count=500) # > old 200 cap 135 _install_fake_session_db(plugin_api, fake_db) 136 137 result = plugin_api.scan_sessions() 138 139 assert fake_db.last_limit == -1, ( 140 "scan_sessions() must pass LIMIT=-1 (unlimited) to list_sessions_rich " 141 f"by default, got {fake_db.last_limit}" 142 ) 143 assert fake_db.last_include_children is True, ( 144 "scan_sessions() must include subagent/compression child sessions so " 145 "tool calls made in delegated agents still count toward achievements" 146 ) 147 assert len(result["sessions"]) == 500 148 assert result["scan_meta"]["sessions_total"] == 500 149 150 151 def test_scan_sessions_explicit_positive_limit_is_honored(plugin_api): 152 """Callers can still pass a small limit for smoke tests.""" 153 fake_db = _FakeSessionDB(session_count=500) 154 _install_fake_session_db(plugin_api, fake_db) 155 156 result = plugin_api.scan_sessions(limit=10) 157 158 assert fake_db.last_limit == 10 159 assert len(result["sessions"]) == 10 160 161 162 def test_scan_sessions_zero_or_negative_limit_means_unlimited(plugin_api): 163 """``limit=0`` and ``limit=-1`` both map to the unlimited path.""" 164 fake_db = _FakeSessionDB(session_count=300) 165 _install_fake_session_db(plugin_api, fake_db) 166 167 plugin_api.scan_sessions(limit=0) 168 assert fake_db.last_limit == -1 169 170 plugin_api.scan_sessions(limit=-1) 171 assert fake_db.last_limit == -1 172 173 174 def test_evaluate_all_first_run_returns_pending_and_starts_background_scan(plugin_api): 175 """First-ever evaluate_all with no cache returns a pending placeholder 176 immediately and kicks off a background scan thread. Cold scans on 177 large DBs take minutes — blocking the dashboard request path is not 178 acceptable. 179 """ 180 fake_db = _FakeSessionDB(session_count=50) 181 _install_fake_session_db(plugin_api, fake_db) 182 183 # Wrap _run_scan_and_update_cache so we can release it on demand, 184 # simulating a slow cold scan without actually waiting. 185 scan_started = threading.Event() 186 allow_scan_finish = threading.Event() 187 original_run = plugin_api._run_scan_and_update_cache 188 189 def gated_run(*args, **kwargs): 190 scan_started.set() 191 allow_scan_finish.wait(timeout=5) 192 original_run(*args, **kwargs) 193 194 plugin_api._run_scan_and_update_cache = gated_run 195 196 t0 = time.time() 197 result = plugin_api.evaluate_all() 198 elapsed = time.time() - t0 199 200 # Immediate return — should not block waiting for the scan. 201 assert elapsed < 1.0, f"evaluate_all blocked for {elapsed:.2f}s on first run" 202 assert result["scan_meta"]["mode"] == "pending" 203 assert result["unlocked_count"] == 0 204 # Catalog still rendered so UI has something to draw. 205 assert result["total_count"] >= 60 206 207 # Background scan is running. 208 assert scan_started.wait(timeout=2), "background scan did not start" 209 210 # Let the scan complete, then a second call returns real data. 211 allow_scan_finish.set() 212 # Wait for thread to finish. 213 thread = plugin_api._BACKGROUND_SCAN_THREAD 214 assert thread is not None 215 thread.join(timeout=5) 216 assert not thread.is_alive() 217 218 second = plugin_api.evaluate_all() 219 assert second["scan_meta"]["mode"] != "pending" 220 assert second["scan_meta"].get("sessions_total") == 50 221 222 223 def test_evaluate_all_stale_cache_serves_stale_and_refreshes_in_background(plugin_api): 224 """When the snapshot is on-disk but older than TTL, evaluate_all returns 225 the stale data immediately and kicks a background refresh. Users don't 226 stare at a loading spinner every time TTL expires. 227 """ 228 fake_db = _FakeSessionDB(session_count=10) 229 _install_fake_session_db(plugin_api, fake_db) 230 231 # Seed a stale snapshot on disk. 232 stale_generated_at = int(time.time()) - plugin_api.SNAPSHOT_TTL_SECONDS - 60 233 stale_payload = { 234 "achievements": [], 235 "sessions": [], 236 "aggregate": {}, 237 "scan_meta": {"mode": "full", "sessions_total": 1, "sessions_rescanned": 1, "sessions_reused": 0}, 238 "error": None, 239 "unlocked_count": 0, 240 "discovered_count": 0, 241 "secret_count": 0, 242 "total_count": 0, 243 "generated_at": stale_generated_at, 244 } 245 plugin_api.save_snapshot(stale_payload) 246 247 t0 = time.time() 248 result = plugin_api.evaluate_all() 249 elapsed = time.time() - t0 250 251 assert elapsed < 1.0, f"evaluate_all blocked for {elapsed:.2f}s serving stale data" 252 assert result["generated_at"] == stale_generated_at 253 254 # Background scan should be running or have completed. 255 thread = plugin_api._BACKGROUND_SCAN_THREAD 256 assert thread is not None 257 thread.join(timeout=5) 258 259 fresh = plugin_api.evaluate_all() 260 assert fresh["generated_at"] >= stale_generated_at 261 262 263 def test_evaluate_all_force_runs_synchronously(plugin_api): 264 """Manual /rescan (force=True) blocks the caller — users clicking 265 the rescan button expect up-to-date data when the call returns. 266 """ 267 fake_db = _FakeSessionDB(session_count=25) 268 _install_fake_session_db(plugin_api, fake_db) 269 270 result = plugin_api.evaluate_all(force=True) 271 272 # Synchronous — snapshot is fresh on return. 273 assert result["scan_meta"].get("sessions_total") == 25 274 assert result["scan_meta"]["mode"] in ("full", "incremental") 275 276 277 def test_start_background_scan_is_idempotent_while_running(plugin_api): 278 """Multiple concurrent dashboard requests must not spawn duplicate scans.""" 279 fake_db = _FakeSessionDB(session_count=5) 280 _install_fake_session_db(plugin_api, fake_db) 281 282 release = threading.Event() 283 original_run = plugin_api._run_scan_and_update_cache 284 285 def gated_run(*args, **kwargs): 286 release.wait(timeout=5) 287 original_run(*args, **kwargs) 288 289 plugin_api._run_scan_and_update_cache = gated_run 290 291 plugin_api._start_background_scan() 292 first_thread = plugin_api._BACKGROUND_SCAN_THREAD 293 assert first_thread is not None and first_thread.is_alive() 294 295 plugin_api._start_background_scan() 296 plugin_api._start_background_scan() 297 298 assert plugin_api._BACKGROUND_SCAN_THREAD is first_thread 299 300 release.set() 301 first_thread.join(timeout=5) 302 303 304 def test_background_scan_publishes_partial_snapshots(plugin_api): 305 """The background scanner publishes intermediate snapshots to the cache 306 every ~N sessions. Each dashboard refresh during a long cold scan sees 307 more badges unlocked instead of staring at zeros for minutes and then 308 having everything pop at the end. 309 """ 310 fake_db = _FakeSessionDB(session_count=750) 311 _install_fake_session_db(plugin_api, fake_db) 312 313 # Record every partial snapshot the scanner publishes. 314 partial_snapshots: List[Dict[str, Any]] = [] 315 original_compute_from_scan = plugin_api._compute_from_scan 316 317 def recording_compute(scan, *, is_partial=False): 318 result = original_compute_from_scan(scan, is_partial=is_partial) 319 if is_partial: 320 partial_snapshots.append(result) 321 return result 322 323 plugin_api._compute_from_scan = recording_compute 324 325 # scan 750 sessions with progress_every=250 → expect 2 intermediate 326 # publications (at 250 and 500; the final 750 call goes through the 327 # finished, non-partial path). 328 plugin_api._run_scan_and_update_cache(publish_partial_snapshots=True) 329 330 assert len(partial_snapshots) >= 2, ( 331 f"expected at least 2 partial publications on a 750-session scan with " 332 f"progress_every=250, got {len(partial_snapshots)}" 333 ) 334 # Partial snapshots should report growing session counts. 335 counts = [p["scan_meta"].get("sessions_scanned_so_far") for p in partial_snapshots] 336 assert counts == sorted(counts), f"partial session counts not monotonic: {counts}" 337 assert counts[0] < 750 and counts[-1] < 750, ( 338 f"partial counts should be less than the final total; got {counts}" 339 ) 340 # Every partial reports the expected end-state total so the UI can 341 # show an accurate progress bar. 342 for p in partial_snapshots: 343 assert p["scan_meta"].get("sessions_expected_total") == 750 344 345 # Final snapshot in cache is the real (non-partial) one. 346 final = plugin_api._SNAPSHOT_CACHE 347 assert final is not None 348 assert final["scan_meta"].get("mode") != "in_progress" 349 assert final["scan_meta"].get("sessions_total") == 750 350 351 352 def test_partial_snapshots_do_not_persist_unlock_timestamps(plugin_api): 353 """Intermediate snapshots must not write to state.json — an unlock 354 that appears at 30% scan progress could disappear when a later session 355 rebalances the aggregate. Only the final snapshot records ``unlocked_at``. 356 """ 357 fake_db = _FakeSessionDB(session_count=10) 358 _install_fake_session_db(plugin_api, fake_db) 359 360 # Seed empty state, then invoke partial compute directly. 361 plugin_api.save_state({"unlocks": {}}) 362 partial_scan = { 363 "sessions": [{"session_id": "x", "tool_call_count": 99999, "tool_names": set()}], 364 "aggregate": {"max_tool_calls_in_session": 99999, "total_tool_calls": 99999}, 365 "scan_meta": {"mode": "in_progress"}, 366 } 367 result = plugin_api._compute_from_scan(partial_scan, is_partial=True) 368 369 # Some achievements should evaluate as unlocked in this aggregate... 370 assert any(a["unlocked"] for a in result["achievements"]) 371 372 # ...but state.json on disk stays empty (no timestamps were recorded). 373 persisted = plugin_api.load_state() 374 assert persisted.get("unlocks", {}) == {}, ( 375 "partial scans must not record unlock timestamps — a later session " 376 "could change whether the badge deserves to be unlocked yet" 377 )