/ tests / agent / test_shell_hooks.py
test_shell_hooks.py
  1  """Tests for the shell-hooks subprocess bridge (agent.shell_hooks).
  2  
  3  These tests focus on the pure translation layer — JSON serialisation,
  4  JSON parsing, matcher behaviour, block-schema correctness, and the
  5  subprocess runner's graceful error handling.  Consent prompts are
  6  covered in ``test_shell_hooks_consent.py``.
  7  """
  8  
  9  from __future__ import annotations
 10  
 11  import json
 12  import os
 13  import stat
 14  from pathlib import Path
 15  from typing import Any, Dict
 16  
 17  import pytest
 18  
 19  from agent import shell_hooks
 20  
 21  
 22  # ── helpers ───────────────────────────────────────────────────────────────
 23  
 24  
 25  def _write_script(tmp_path: Path, name: str, body: str) -> Path:
 26      path = tmp_path / name
 27      path.write_text(body)
 28      path.chmod(0o755)
 29      return path
 30  
 31  
 32  def _allowlist_pair(monkeypatch, tmp_path, event: str, command: str) -> None:
 33      monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_home"))
 34      shell_hooks._record_approval(event, command)
 35  
 36  
 37  @pytest.fixture(autouse=True)
 38  def _reset_registration_state():
 39      shell_hooks.reset_for_tests()
 40      yield
 41      shell_hooks.reset_for_tests()
 42  
 43  
 44  # ── _parse_response ───────────────────────────────────────────────────────
 45  
 46  
 47  class TestParseResponse:
 48      def test_block_claude_code_style(self):
 49          r = shell_hooks._parse_response(
 50              "pre_tool_call",
 51              '{"decision": "block", "reason": "nope"}',
 52          )
 53          assert r == {"action": "block", "message": "nope"}
 54  
 55      def test_block_canonical_style(self):
 56          r = shell_hooks._parse_response(
 57              "pre_tool_call",
 58              '{"action": "block", "message": "nope"}',
 59          )
 60          assert r == {"action": "block", "message": "nope"}
 61  
 62      def test_block_canonical_wins_over_claude_style(self):
 63          r = shell_hooks._parse_response(
 64              "pre_tool_call",
 65              '{"action": "block", "message": "canonical", '
 66              '"decision": "block", "reason": "claude"}',
 67          )
 68          assert r == {"action": "block", "message": "canonical"}
 69  
 70      def test_empty_stdout_returns_none(self):
 71          assert shell_hooks._parse_response("pre_tool_call", "") is None
 72          assert shell_hooks._parse_response("pre_tool_call", "   ") is None
 73  
 74      def test_invalid_json_returns_none(self):
 75          assert shell_hooks._parse_response("pre_tool_call", "not json") is None
 76  
 77      def test_non_dict_json_returns_none(self):
 78          assert shell_hooks._parse_response("pre_tool_call", "[1, 2]") is None
 79  
 80      def test_non_block_pre_tool_call_returns_none(self):
 81          r = shell_hooks._parse_response("pre_tool_call", '{"decision": "allow"}')
 82          assert r is None
 83  
 84      def test_pre_llm_call_context_passthrough(self):
 85          r = shell_hooks._parse_response(
 86              "pre_llm_call", '{"context": "today is Friday"}',
 87          )
 88          assert r == {"context": "today is Friday"}
 89  
 90      def test_subagent_stop_context_passthrough(self):
 91          r = shell_hooks._parse_response(
 92              "subagent_stop", '{"context": "child role=leaf"}',
 93          )
 94          assert r == {"context": "child role=leaf"}
 95  
 96      def test_pre_llm_call_block_ignored(self):
 97          """Only pre_tool_call honors block directives."""
 98          r = shell_hooks._parse_response(
 99              "pre_llm_call", '{"decision": "block", "reason": "no"}',
100          )
101          assert r is None
102  
103  
104  # ── _serialize_payload ────────────────────────────────────────────────────
105  
106  
107  class TestSerializePayload:
108      def test_basic_pre_tool_call_schema(self):
109          raw = shell_hooks._serialize_payload(
110              "pre_tool_call",
111              {
112                  "tool_name": "terminal",
113                  "args": {"command": "ls"},
114                  "session_id": "sess-1",
115                  "task_id": "t-1",
116                  "tool_call_id": "c-1",
117              },
118          )
119          payload = json.loads(raw)
120          assert payload["hook_event_name"] == "pre_tool_call"
121          assert payload["tool_name"] == "terminal"
122          assert payload["tool_input"] == {"command": "ls"}
123          assert payload["session_id"] == "sess-1"
124          assert "cwd" in payload
125          # task_id / tool_call_id end up under extra
126          assert payload["extra"]["task_id"] == "t-1"
127          assert payload["extra"]["tool_call_id"] == "c-1"
128  
129      def test_args_not_dict_becomes_null(self):
130          raw = shell_hooks._serialize_payload(
131              "pre_tool_call", {"args": ["not", "a", "dict"]},
132          )
133          payload = json.loads(raw)
134          assert payload["tool_input"] is None
135  
136      def test_parent_session_id_used_when_no_session_id(self):
137          raw = shell_hooks._serialize_payload(
138              "subagent_stop", {"parent_session_id": "p-1"},
139          )
140          payload = json.loads(raw)
141          assert payload["session_id"] == "p-1"
142  
143      def test_unserialisable_extras_stringified(self):
144          class Weird:
145              def __repr__(self) -> str:
146                  return "<weird>"
147  
148          raw = shell_hooks._serialize_payload(
149              "on_session_start", {"obj": Weird()},
150          )
151          payload = json.loads(raw)
152          assert payload["extra"]["obj"] == "<weird>"
153  
154  
155  # ── Matcher behaviour ─────────────────────────────────────────────────────
156  
157  
158  class TestMatcher:
159      def test_no_matcher_fires_for_any_tool(self):
160          spec = shell_hooks.ShellHookSpec(
161              event="pre_tool_call", command="echo", matcher=None,
162          )
163          assert spec.matches_tool("terminal")
164          assert spec.matches_tool("write_file")
165  
166      def test_single_name_matcher(self):
167          spec = shell_hooks.ShellHookSpec(
168              event="pre_tool_call", command="echo", matcher="terminal",
169          )
170          assert spec.matches_tool("terminal")
171          assert not spec.matches_tool("web_search")
172  
173      def test_alternation_matcher(self):
174          spec = shell_hooks.ShellHookSpec(
175              event="pre_tool_call", command="echo", matcher="terminal|file",
176          )
177          assert spec.matches_tool("terminal")
178          assert spec.matches_tool("file")
179          assert not spec.matches_tool("web")
180  
181      def test_invalid_regex_falls_back_to_literal(self):
182          spec = shell_hooks.ShellHookSpec(
183              event="pre_tool_call", command="echo", matcher="foo[bar",
184          )
185          assert spec.matches_tool("foo[bar")
186          assert not spec.matches_tool("foo")
187  
188      def test_matcher_ignored_when_no_tool_name(self):
189          spec = shell_hooks.ShellHookSpec(
190              event="pre_tool_call", command="echo", matcher="terminal",
191          )
192          assert not spec.matches_tool(None)
193  
194      def test_matcher_leading_whitespace_stripped(self):
195          """YAML quirks can introduce leading/trailing whitespace — must
196          not silently break the matcher."""
197          spec = shell_hooks.ShellHookSpec(
198              event="pre_tool_call", command="echo", matcher=" terminal ",
199          )
200          assert spec.matcher == "terminal"
201          assert spec.matches_tool("terminal")
202  
203      def test_matcher_trailing_newline_stripped(self):
204          spec = shell_hooks.ShellHookSpec(
205              event="pre_tool_call", command="echo", matcher="terminal\n",
206          )
207          assert spec.matches_tool("terminal")
208  
209      def test_whitespace_only_matcher_becomes_none(self):
210          """A matcher that's pure whitespace is treated as 'no matcher'."""
211          spec = shell_hooks.ShellHookSpec(
212              event="pre_tool_call", command="echo", matcher="   ",
213          )
214          assert spec.matcher is None
215          assert spec.matches_tool("anything")
216  
217  
218  # ── End-to-end subprocess behaviour ───────────────────────────────────────
219  
220  
221  class TestCallbackSubprocess:
222      def test_timeout_returns_none(self, tmp_path):
223          # Script that sleeps forever; we set a 1s timeout.
224          script = _write_script(
225              tmp_path, "slow.sh",
226              "#!/usr/bin/env bash\nsleep 60\n",
227          )
228          spec = shell_hooks.ShellHookSpec(
229              event="post_tool_call", command=str(script), timeout=1,
230          )
231          cb = shell_hooks._make_callback(spec)
232          assert cb(tool_name="terminal") is None
233  
234      def test_malformed_json_stdout_returns_none(self, tmp_path):
235          script = _write_script(
236              tmp_path, "bad_json.sh",
237              "#!/usr/bin/env bash\necho 'not json at all'\n",
238          )
239          spec = shell_hooks.ShellHookSpec(
240              event="pre_tool_call", command=str(script),
241          )
242          cb = shell_hooks._make_callback(spec)
243          # Matcher is None so the callback fires for any tool.
244          assert cb(tool_name="terminal") is None
245  
246      def test_non_zero_exit_with_block_stdout_still_blocks(self, tmp_path):
247          """A script that signals failure via exit code AND prints a block
248          directive must still block — scripts should be free to mix exit
249          codes with parseable output."""
250          script = _write_script(
251              tmp_path, "exit1_block.sh",
252              "#!/usr/bin/env bash\n"
253              'printf \'{"decision": "block", "reason": "via exit 1"}\\n\'\n'
254              "exit 1\n",
255          )
256          spec = shell_hooks.ShellHookSpec(
257              event="pre_tool_call", command=str(script),
258          )
259          cb = shell_hooks._make_callback(spec)
260          assert cb(tool_name="terminal") == {"action": "block", "message": "via exit 1"}
261  
262      def test_block_translation_end_to_end(self, tmp_path):
263          """v1 schema-bug regression gate.
264  
265          Shell hook returns the Claude-Code-style payload and the bridge
266          must translate it to the canonical Hermes block shape so that
267          get_pre_tool_call_block_message() surfaces the block.
268          """
269          script = _write_script(
270              tmp_path, "blocker.sh",
271              "#!/usr/bin/env bash\n"
272              'printf \'{"decision": "block", "reason": "no terminal"}\\n\'\n',
273          )
274          spec = shell_hooks.ShellHookSpec(
275              event="pre_tool_call",
276              command=str(script),
277              matcher="terminal",
278          )
279          cb = shell_hooks._make_callback(spec)
280          result = cb(tool_name="terminal", args={"command": "rm -rf /"})
281          assert result == {"action": "block", "message": "no terminal"}
282  
283      def test_block_aggregation_through_plugin_manager(self, tmp_path, monkeypatch):
284          """Registering via register_from_config makes
285          get_pre_tool_call_block_message surface the block — the real
286          end-to-end control flow used by run_agent._invoke_tool."""
287          from hermes_cli import plugins
288  
289          script = _write_script(
290              tmp_path, "block.sh",
291              "#!/usr/bin/env bash\n"
292              'printf \'{"decision": "block", "reason": "blocked-by-shell"}\\n\'\n',
293          )
294  
295          monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
296          monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1")
297  
298          # Fresh manager
299          plugins._plugin_manager = plugins.PluginManager()
300  
301          cfg = {
302              "hooks": {
303                  "pre_tool_call": [
304                      {"matcher": "terminal", "command": str(script)},
305                  ],
306              },
307          }
308          registered = shell_hooks.register_from_config(cfg, accept_hooks=True)
309          assert len(registered) == 1
310  
311          msg = plugins.get_pre_tool_call_block_message(
312              tool_name="terminal",
313              args={"command": "rm"},
314          )
315          assert msg == "blocked-by-shell"
316  
317      def test_matcher_regex_filters_callback(self, tmp_path, monkeypatch):
318          """A matcher set to 'terminal' must not fire for 'web_search'."""
319          calls = tmp_path / "calls.log"
320          script = _write_script(
321              tmp_path, "log.sh",
322              f"#!/usr/bin/env bash\n"
323              f"echo \"$(cat -)\" >> {calls}\n"
324              f"printf '{{}}\\n'\n",
325          )
326          spec = shell_hooks.ShellHookSpec(
327              event="pre_tool_call",
328              command=str(script),
329              matcher="terminal",
330          )
331          cb = shell_hooks._make_callback(spec)
332          cb(tool_name="terminal", args={"command": "ls"})
333          cb(tool_name="web_search", args={"q": "x"})
334          cb(tool_name="file_read", args={"path": "x"})
335          assert calls.exists()
336          # Only the terminal call wrote to the log
337          assert calls.read_text().count("pre_tool_call") == 1
338  
339      def test_payload_schema_delivered(self, tmp_path):
340          capture = tmp_path / "payload.json"
341          script = _write_script(
342              tmp_path, "capture.sh",
343              f"#!/usr/bin/env bash\ncat - > {capture}\nprintf '{{}}\\n'\n",
344          )
345          spec = shell_hooks.ShellHookSpec(
346              event="pre_tool_call", command=str(script),
347          )
348          cb = shell_hooks._make_callback(spec)
349          cb(
350              tool_name="terminal",
351              args={"command": "echo hi"},
352              session_id="sess-77",
353              task_id="task-77",
354          )
355          payload = json.loads(capture.read_text())
356          assert payload["hook_event_name"] == "pre_tool_call"
357          assert payload["tool_name"] == "terminal"
358          assert payload["tool_input"] == {"command": "echo hi"}
359          assert payload["session_id"] == "sess-77"
360          assert "cwd" in payload
361          assert payload["extra"]["task_id"] == "task-77"
362  
363      def test_pre_llm_call_context_flows_through(self, tmp_path):
364          script = _write_script(
365              tmp_path, "ctx.sh",
366              "#!/usr/bin/env bash\n"
367              'printf \'{"context": "env-note"}\\n\'\n',
368          )
369          spec = shell_hooks.ShellHookSpec(
370              event="pre_llm_call", command=str(script),
371          )
372          cb = shell_hooks._make_callback(spec)
373          result = cb(
374              session_id="s1", user_message="hello",
375              conversation_history=[], is_first_turn=True,
376              model="gpt-4", platform="cli",
377          )
378          assert result == {"context": "env-note"}
379  
380      def test_shlex_handles_paths_with_spaces(self, tmp_path):
381          dir_with_space = tmp_path / "path with space"
382          dir_with_space.mkdir()
383          script = _write_script(
384              dir_with_space, "ok.sh",
385              "#!/usr/bin/env bash\nprintf '{}\\n'\n",
386          )
387          # Quote the path so shlex keeps it as a single token.
388          spec = shell_hooks.ShellHookSpec(
389              event="post_tool_call",
390              command=f'"{script}"',
391          )
392          cb = shell_hooks._make_callback(spec)
393          # No crash = shlex parsed it correctly.
394          assert cb(tool_name="terminal") is None  # empty object parses to None
395  
396      def test_missing_binary_logged_not_raised(self, tmp_path):
397          spec = shell_hooks.ShellHookSpec(
398              event="on_session_start",
399              command=str(tmp_path / "does-not-exist"),
400          )
401          cb = shell_hooks._make_callback(spec)
402          # Must not raise — agent loop should continue.
403          assert cb(session_id="s") is None
404  
405      def test_non_executable_binary_logged_not_raised(self, tmp_path):
406          path = tmp_path / "no-exec"
407          path.write_text("#!/usr/bin/env bash\necho hi\n")
408          # Intentionally do NOT chmod +x.
409          spec = shell_hooks.ShellHookSpec(
410              event="on_session_start", command=str(path),
411          )
412          cb = shell_hooks._make_callback(spec)
413          assert cb(session_id="s") is None
414  
415  
416  # ── config parsing ────────────────────────────────────────────────────────
417  
418  
419  class TestParseHooksBlock:
420      def test_valid_entry(self):
421          specs = shell_hooks._parse_hooks_block({
422              "pre_tool_call": [
423                  {"matcher": "terminal", "command": "/tmp/hook.sh", "timeout": 30},
424              ],
425          })
426          assert len(specs) == 1
427          assert specs[0].event == "pre_tool_call"
428          assert specs[0].matcher == "terminal"
429          assert specs[0].command == "/tmp/hook.sh"
430          assert specs[0].timeout == 30
431  
432      def test_unknown_event_skipped(self, caplog):
433          specs = shell_hooks._parse_hooks_block({
434              "pre_tools_call": [  # typo
435                  {"command": "/tmp/hook.sh"},
436              ],
437          })
438          assert specs == []
439  
440      def test_missing_command_skipped(self):
441          specs = shell_hooks._parse_hooks_block({
442              "pre_tool_call": [{"matcher": "terminal"}],
443          })
444          assert specs == []
445  
446      def test_timeout_clamped_to_max(self):
447          specs = shell_hooks._parse_hooks_block({
448              "post_tool_call": [
449                  {"command": "/tmp/slow.sh", "timeout": 9999},
450              ],
451          })
452          assert specs[0].timeout == shell_hooks.MAX_TIMEOUT_SECONDS
453  
454      def test_non_int_timeout_defaulted(self):
455          specs = shell_hooks._parse_hooks_block({
456              "post_tool_call": [
457                  {"command": "/tmp/x.sh", "timeout": "thirty"},
458              ],
459          })
460          assert specs[0].timeout == shell_hooks.DEFAULT_TIMEOUT_SECONDS
461  
462      def test_non_list_event_skipped(self):
463          specs = shell_hooks._parse_hooks_block({
464              "pre_tool_call": "not a list",
465          })
466          assert specs == []
467  
468      def test_none_hooks_block(self):
469          assert shell_hooks._parse_hooks_block(None) == []
470          assert shell_hooks._parse_hooks_block("string") == []
471          assert shell_hooks._parse_hooks_block([]) == []
472  
473      def test_non_tool_event_matcher_warns_and_drops(self, caplog):
474          """matcher: is only honored for pre/post_tool_call; must warn
475          and drop on other events so the spec reflects runtime."""
476          import logging
477          cfg = {"pre_llm_call": [{"matcher": "terminal", "command": "/bin/echo"}]}
478          with caplog.at_level(logging.WARNING, logger=shell_hooks.logger.name):
479              specs = shell_hooks._parse_hooks_block(cfg)
480          assert len(specs) == 1 and specs[0].matcher is None
481          assert any(
482              "only honored for pre_tool_call" in r.getMessage()
483              and "pre_llm_call" in r.getMessage()
484              for r in caplog.records
485          )
486  
487  
488  # ── Idempotent registration ───────────────────────────────────────────────
489  
490  
491  class TestIdempotentRegistration:
492      def test_double_call_registers_once(self, tmp_path, monkeypatch):
493          from hermes_cli import plugins
494  
495          script = _write_script(tmp_path, "h.sh",
496                                 "#!/usr/bin/env bash\nprintf '{}\\n'\n")
497          monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
498          monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1")
499  
500          plugins._plugin_manager = plugins.PluginManager()
501  
502          cfg = {"hooks": {"on_session_start": [{"command": str(script)}]}}
503  
504          first = shell_hooks.register_from_config(cfg, accept_hooks=True)
505          second = shell_hooks.register_from_config(cfg, accept_hooks=True)
506          assert len(first) == 1
507          assert second == []
508          # Only one callback on the manager
509          mgr = plugins.get_plugin_manager()
510          assert len(mgr._hooks.get("on_session_start", [])) == 1
511  
512      def test_same_command_different_matcher_registers_both(
513          self, tmp_path, monkeypatch,
514      ):
515          """Same script used for different matchers under one event must
516          register both callbacks — dedupe keys on (event, matcher, command)."""
517          from hermes_cli import plugins
518  
519          script = _write_script(tmp_path, "h.sh",
520                                 "#!/usr/bin/env bash\nprintf '{}\\n'\n")
521          monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
522          monkeypatch.setenv("HERMES_ACCEPT_HOOKS", "1")
523  
524          plugins._plugin_manager = plugins.PluginManager()
525  
526          cfg = {
527              "hooks": {
528                  "pre_tool_call": [
529                      {"matcher": "terminal", "command": str(script)},
530                      {"matcher": "web_search", "command": str(script)},
531                  ],
532              },
533          }
534  
535          registered = shell_hooks.register_from_config(cfg, accept_hooks=True)
536          assert len(registered) == 2
537          mgr = plugins.get_plugin_manager()
538          assert len(mgr._hooks.get("pre_tool_call", [])) == 2
539  
540  
541  # ── Allowlist concurrency ─────────────────────────────────────────────────
542  
543  
544  class TestAllowlistConcurrency:
545      """Regression tests for the Codex#1 finding: simultaneous
546      _record_approval() calls used to collide on a fixed tmp path and
547      silently lose entries under read-modify-write races."""
548  
549      def test_parallel_record_approval_does_not_lose_entries(
550          self, tmp_path, monkeypatch,
551      ):
552          import threading
553  
554          monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
555  
556          N = 32
557          barrier = threading.Barrier(N)
558          errors: list = []
559  
560          def worker(i: int) -> None:
561              try:
562                  barrier.wait(timeout=5)
563                  shell_hooks._record_approval(
564                      "on_session_start", f"/bin/hook-{i}.sh",
565                  )
566              except Exception as exc:  # pragma: no cover
567                  errors.append(exc)
568  
569          threads = [threading.Thread(target=worker, args=(i,)) for i in range(N)]
570          for t in threads:
571              t.start()
572          for t in threads:
573              t.join()
574  
575          assert not errors, f"worker errors: {errors}"
576  
577          data = shell_hooks.load_allowlist()
578          commands = {e["command"] for e in data["approvals"]}
579          assert commands == {f"/bin/hook-{i}.sh" for i in range(N)}, (
580              f"expected all {N} entries, got {len(commands)}"
581          )
582  
583      def test_non_posix_fallback_does_not_self_deadlock(
584          self, tmp_path, monkeypatch,
585      ):
586          """Regression: on platforms without fcntl, the fallback lock must
587          be separate from _registered_lock.  register_from_config holds
588          _registered_lock while calling _record_approval (via the consent
589          prompt path), so a shared non-reentrant lock would self-deadlock."""
590          import threading
591  
592          monkeypatch.setattr(shell_hooks, "fcntl", None)
593          monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
594  
595          completed = threading.Event()
596          errors: list = []
597  
598          def target() -> None:
599              try:
600                  with shell_hooks._registered_lock:
601                      shell_hooks._record_approval(
602                          "on_session_start", "/bin/x.sh",
603                      )
604                  completed.set()
605              except Exception as exc:  # pragma: no cover
606                  errors.append(exc)
607                  completed.set()
608  
609          t = threading.Thread(target=target, daemon=True)
610          t.start()
611          if not completed.wait(timeout=3.0):
612              pytest.fail(
613                  "non-POSIX fallback self-deadlocked — "
614                  "_locked_update_approvals must not reuse _registered_lock",
615              )
616          t.join(timeout=1.0)
617          assert not errors, f"errors: {errors}"
618          assert shell_hooks._is_allowlisted(
619              "on_session_start", "/bin/x.sh",
620          )
621  
622      def test_save_allowlist_failure_logs_actionable_warning(
623          self, tmp_path, monkeypatch, caplog,
624      ):
625          """Persistence failures must log the path, errno, and
626          re-prompt consequence so "hermes keeps asking" is debuggable."""
627          import logging
628          monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
629          monkeypatch.setattr(
630              shell_hooks.tempfile, "mkstemp",
631              lambda *a, **kw: (_ for _ in ()).throw(OSError(28, "No space")),
632          )
633          with caplog.at_level(logging.WARNING, logger=shell_hooks.logger.name):
634              shell_hooks.save_allowlist({"approvals": []})
635          msg = next(
636              (r.getMessage() for r in caplog.records
637               if "Failed to persist" in r.getMessage()), "",
638          )
639          assert "shell-hooks-allowlist.json" in msg
640          assert "No space" in msg
641          assert "re-prompt" in msg
642  
643      def test_script_is_executable_handles_interpreter_prefix(self, tmp_path):
644          """For ``python3 hook.py`` and similar the interpreter reads
645          the script, so X_OK on the script itself is not required —
646          only R_OK.  Bare invocations still require X_OK."""
647          script = tmp_path / "hook.py"
648          script.write_text("print()\n")  # readable, NOT executable
649  
650          # Interpreter prefix: R_OK is enough.
651          assert shell_hooks.script_is_executable(f"python3 {script}")
652          assert shell_hooks.script_is_executable(f"/usr/bin/env python3 {script}")
653  
654          # Bare invocation on the same non-X_OK file: not runnable.
655          assert not shell_hooks.script_is_executable(str(script))
656  
657          # Flip +x; bare invocation is now runnable too.
658          script.chmod(0o755)
659          assert shell_hooks.script_is_executable(str(script))
660  
661      def test_command_script_path_resolution(self):
662          """Regression: ``_command_script_path`` used to return the first
663          shlex token, which picked the interpreter (``python3``, ``bash``,
664          ``/usr/bin/env``) instead of the actual script for any
665          interpreter-prefixed command.  That broke
666          ``hermes hooks doctor``'s executability check and silently
667          disabled mtime drift detection for such hooks."""
668          cases = [
669              # bare path
670              ("/path/hook.sh", "/path/hook.sh"),
671              ("/bin/echo hi", "/bin/echo"),
672              ("~/hook.sh", "~/hook.sh"),
673              ("hook.sh", "hook.sh"),
674              # interpreter prefix
675              ("python3 /path/hook.py", "/path/hook.py"),
676              ("bash /path/hook.sh", "/path/hook.sh"),
677              ("bash ~/hook.sh", "~/hook.sh"),
678              ("python3 -u /path/hook.py", "/path/hook.py"),
679              ("nice -n 10 /path/hook.sh", "/path/hook.sh"),
680              # /usr/bin/env shebang form — must find the *script*, not env
681              ("/usr/bin/env python3 /path/hook.py", "/path/hook.py"),
682              ("/usr/bin/env bash /path/hook.sh", "/path/hook.sh"),
683              # no path-like tokens → fallback to first token
684              ("my-binary --verbose", "my-binary"),
685              ("python3 -c 'print(1)'", "python3"),
686              # unparseable (unbalanced quotes) → return command as-is
687              ("python3 'unterminated", "python3 'unterminated"),
688              # empty
689              ("", ""),
690          ]
691          for command, expected in cases:
692              got = shell_hooks._command_script_path(command)
693              assert got == expected, f"{command!r} -> {got!r}, expected {expected!r}"
694  
695      def test_save_allowlist_uses_unique_tmp_paths(self, tmp_path, monkeypatch):
696          """Two save_allowlist calls in flight must use distinct tmp files
697          so the loser's os.replace does not ENOENT on the winner's sweep."""
698          monkeypatch.setenv("HERMES_HOME", str(tmp_path / "home"))
699          p = shell_hooks.allowlist_path()
700          p.parent.mkdir(parents=True, exist_ok=True)
701  
702          tmp_paths_seen: list = []
703          real_mkstemp = shell_hooks.tempfile.mkstemp
704  
705          def spying_mkstemp(*args, **kwargs):
706              fd, path = real_mkstemp(*args, **kwargs)
707              tmp_paths_seen.append(path)
708              return fd, path
709  
710          monkeypatch.setattr(shell_hooks.tempfile, "mkstemp", spying_mkstemp)
711  
712          shell_hooks.save_allowlist({"approvals": [{"event": "a", "command": "x"}]})
713          shell_hooks.save_allowlist({"approvals": [{"event": "b", "command": "y"}]})
714  
715          assert len(tmp_paths_seen) == 2
716          assert tmp_paths_seen[0] != tmp_paths_seen[1]