/ agent / shell_hooks.py
shell_hooks.py
  1  """
  2  Shell-script hooks bridge.
  3  
  4  Reads the ``hooks:`` block from ``cli-config.yaml``, prompts the user for
  5  consent on first use of each ``(event, command)`` pair, and registers
  6  callbacks on the existing plugin hook manager so every existing
  7  ``invoke_hook()`` site dispatches to the configured shell scripts — with
  8  zero changes to call sites.
  9  
 10  Design notes
 11  ------------
 12  * Python plugins and shell hooks compose naturally: both flow through
 13    :func:`hermes_cli.plugins.invoke_hook` and its aggregators.  Python
 14    plugins are registered first (via ``discover_and_load()``) so their
 15    block decisions win ties over shell-hook blocks.
 16  * Subprocess execution uses ``shlex.split(os.path.expanduser(command))``
 17    with ``shell=False`` — no shell injection footguns.  Users that need
 18    pipes/redirection wrap their logic in a script.
 19  * First-use consent is gated by the allowlist under
 20    ``~/.hermes/shell-hooks-allowlist.json``.  Non-TTY callers must pass
 21    ``accept_hooks=True`` (resolved from ``--accept-hooks``,
 22    ``HERMES_ACCEPT_HOOKS``, or ``hooks_auto_accept: true`` in config)
 23    for registration to succeed without a prompt.
 24  * Registration is idempotent — safe to invoke from both the CLI entry
 25    point (``hermes_cli/main.py``) and the gateway entry point
 26    (``gateway/run.py``).
 27  
 28  Wire protocol
 29  -------------
 30  **stdin** (JSON, piped to the script)::
 31  
 32      {
 33          "hook_event_name": "pre_tool_call",
 34          "tool_name":       "terminal",
 35          "tool_input":      {"command": "rm -rf /"},
 36          "session_id":      "sess_abc123",
 37          "cwd":             "/home/user/project",
 38          "extra":           {...}   # event-specific kwargs
 39      }
 40  
 41  **stdout** (JSON, optional — anything else is ignored)::
 42  
 43      # Block a pre_tool_call (either shape accepted; normalised internally):
 44      {"decision": "block", "reason":  "Forbidden command"}   # Claude-Code-style
 45      {"action":   "block", "message": "Forbidden command"}   # Hermes-canonical
 46  
 47      # Inject context for pre_llm_call:
 48      {"context": "Today is Friday"}
 49  
 50      # Silent no-op:
 51      <empty or any non-matching JSON object>
 52  """
 53  
 54  from __future__ import annotations
 55  
 56  import difflib
 57  import json
 58  import logging
 59  import os
 60  import re
 61  import shlex
 62  import subprocess
 63  import sys
 64  import tempfile
 65  import threading
 66  import time
 67  from contextlib import contextmanager
 68  from dataclasses import dataclass, field
 69  from datetime import datetime, timezone
 70  from pathlib import Path
 71  from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple
 72  
 73  try:
 74      import fcntl  # POSIX only; Windows falls back to best-effort without flock.
 75  except ImportError:  # pragma: no cover
 76      fcntl = None  # type: ignore[assignment]
 77  
 78  from hermes_constants import get_hermes_home
 79  from utils import atomic_replace
 80  
 81  logger = logging.getLogger(__name__)
 82  
 83  DEFAULT_TIMEOUT_SECONDS = 60
 84  MAX_TIMEOUT_SECONDS = 300
 85  ALLOWLIST_FILENAME = "shell-hooks-allowlist.json"
 86  
 87  # (event, matcher, command) triples that have been wired to the plugin
 88  # manager in the current process.  Matcher is part of the key because
 89  # the same script can legitimately register for different matchers under
 90  # the same event (e.g. one entry per tool the user wants to gate).
 91  # Second registration attempts for the exact same triple become no-ops
 92  # so the CLI and gateway can both call register_from_config() safely.
 93  _registered: Set[Tuple[str, Optional[str], str]] = set()
 94  _registered_lock = threading.Lock()
 95  
 96  # Intra-process lock for allowlist read-modify-write on platforms that
 97  # lack ``fcntl`` (non-POSIX).  Kept separate from ``_registered_lock``
 98  # because ``register_from_config`` already holds ``_registered_lock`` when
 99  # it triggers ``_record_approval`` — reusing it here would self-deadlock
100  # (``threading.Lock`` is non-reentrant).  POSIX callers use the sibling
101  # ``.lock`` file via ``fcntl.flock`` and bypass this.
102  _allowlist_write_lock = threading.Lock()
103  
104  
105  @dataclass
106  class ShellHookSpec:
107      """Parsed and validated representation of a single ``hooks:`` entry."""
108  
109      event: str
110      command: str
111      matcher: Optional[str] = None
112      timeout: int = DEFAULT_TIMEOUT_SECONDS
113      compiled_matcher: Optional[re.Pattern] = field(default=None, repr=False)
114  
115      def __post_init__(self) -> None:
116          # Strip whitespace introduced by YAML quirks (e.g. multi-line string
117          # folding) — a matcher of " terminal" would otherwise silently fail
118          # to match "terminal" without any diagnostic.
119          if isinstance(self.matcher, str):
120              stripped = self.matcher.strip()
121              self.matcher = stripped if stripped else None
122          if self.matcher:
123              try:
124                  self.compiled_matcher = re.compile(self.matcher)
125              except re.error as exc:
126                  logger.warning(
127                      "shell hook matcher %r is invalid (%s) — treating as "
128                      "literal equality", self.matcher, exc,
129                  )
130                  self.compiled_matcher = None
131  
132      def matches_tool(self, tool_name: Optional[str]) -> bool:
133          if not self.matcher:
134              return True
135          if tool_name is None:
136              return False
137          if self.compiled_matcher is not None:
138              return self.compiled_matcher.fullmatch(tool_name) is not None
139          # compiled_matcher is None only when the regex failed to compile,
140          # in which case we already warned and fall back to literal equality.
141          return tool_name == self.matcher
142  
143  
144  # ---------------------------------------------------------------------------
145  # Public API
146  # ---------------------------------------------------------------------------
147  
148  def register_from_config(
149      cfg: Optional[Dict[str, Any]],
150      *,
151      accept_hooks: bool = False,
152  ) -> List[ShellHookSpec]:
153      """Register every configured shell hook on the plugin manager.
154  
155      ``cfg`` is the full parsed config dict (``hermes_cli.config.load_config``
156      output).  The ``hooks:`` key is read out of it.  Missing, empty, or
157      non-dict ``hooks`` is treated as zero configured hooks.
158  
159      ``accept_hooks=True`` skips the TTY consent prompt — the caller is
160      promising that the user has opted in via a flag, env var, or config
161      setting.  ``HERMES_ACCEPT_HOOKS=1`` and ``hooks_auto_accept: true`` are
162      also honored inside this function so either CLI or gateway call sites
163      pick them up.
164  
165      Returns the list of :class:`ShellHookSpec` entries that ended up wired
166      up on the plugin manager.  Skipped entries (unknown events, malformed,
167      not allowlisted, already registered) are logged but not returned.
168      """
169      if not isinstance(cfg, dict):
170          return []
171  
172      effective_accept = _resolve_effective_accept(cfg, accept_hooks)
173  
174      specs = _parse_hooks_block(cfg.get("hooks"))
175      if not specs:
176          return []
177  
178      registered: List[ShellHookSpec] = []
179  
180      # Import lazily — avoids circular imports at module-load time.
181      from hermes_cli.plugins import get_plugin_manager
182  
183      manager = get_plugin_manager()
184  
185      # Idempotence + allowlist read happen under the lock; the TTY
186      # prompt runs outside so other threads aren't parked on a blocking
187      # input().  Mutation re-takes the lock with a defensive idempotence
188      # re-check in case two callers ever race through the prompt.
189      for spec in specs:
190          key = (spec.event, spec.matcher, spec.command)
191          with _registered_lock:
192              if key in _registered:
193                  continue
194              already_allowlisted = _is_allowlisted(spec.event, spec.command)
195  
196          if not already_allowlisted:
197              if not _prompt_and_record(
198                  spec.event, spec.command, accept_hooks=effective_accept,
199              ):
200                  logger.warning(
201                      "shell hook for %s (%s) not allowlisted — skipped. "
202                      "Use --accept-hooks / HERMES_ACCEPT_HOOKS=1 / "
203                      "hooks_auto_accept: true, or approve at the TTY "
204                      "prompt next run.",
205                      spec.event, spec.command,
206                  )
207                  continue
208  
209          with _registered_lock:
210              if key in _registered:
211                  continue
212              manager._hooks.setdefault(spec.event, []).append(_make_callback(spec))
213              _registered.add(key)
214              registered.append(spec)
215              logger.info(
216                  "shell hook registered: %s -> %s (matcher=%s, timeout=%ds)",
217                  spec.event, spec.command, spec.matcher, spec.timeout,
218              )
219  
220      return registered
221  
222  
223  def iter_configured_hooks(cfg: Optional[Dict[str, Any]]) -> List[ShellHookSpec]:
224      """Return the parsed ``ShellHookSpec`` entries from config without
225      registering anything.  Used by ``hermes hooks list`` and ``doctor``."""
226      if not isinstance(cfg, dict):
227          return []
228      return _parse_hooks_block(cfg.get("hooks"))
229  
230  
231  def reset_for_tests() -> None:
232      """Clear the idempotence set.  Test-only helper."""
233      with _registered_lock:
234          _registered.clear()
235  
236  
237  # ---------------------------------------------------------------------------
238  # Config parsing
239  # ---------------------------------------------------------------------------
240  
241  def _parse_hooks_block(hooks_cfg: Any) -> List[ShellHookSpec]:
242      """Normalise the ``hooks:`` dict into a flat list of ``ShellHookSpec``.
243  
244      Malformed entries warn-and-skip — we never raise from config parsing
245      because a broken hook must not crash the agent.
246      """
247      from hermes_cli.plugins import VALID_HOOKS
248  
249      if not isinstance(hooks_cfg, dict):
250          return []
251  
252      specs: List[ShellHookSpec] = []
253  
254      for event_name, entries in hooks_cfg.items():
255          if event_name not in VALID_HOOKS:
256              suggestion = difflib.get_close_matches(
257                  str(event_name), VALID_HOOKS, n=1, cutoff=0.6,
258              )
259              if suggestion:
260                  logger.warning(
261                      "unknown hook event %r in hooks: config — did you mean %r?",
262                      event_name, suggestion[0],
263                  )
264              else:
265                  logger.warning(
266                      "unknown hook event %r in hooks: config (valid: %s)",
267                      event_name, ", ".join(sorted(VALID_HOOKS)),
268                  )
269              continue
270  
271          if entries is None:
272              continue
273  
274          if not isinstance(entries, list):
275              logger.warning(
276                  "hooks.%s must be a list of hook definitions; got %s",
277                  event_name, type(entries).__name__,
278              )
279              continue
280  
281          for i, raw in enumerate(entries):
282              spec = _parse_single_entry(event_name, i, raw)
283              if spec is not None:
284                  specs.append(spec)
285  
286      return specs
287  
288  
289  def _parse_single_entry(
290      event: str, index: int, raw: Any,
291  ) -> Optional[ShellHookSpec]:
292      if not isinstance(raw, dict):
293          logger.warning(
294              "hooks.%s[%d] must be a mapping with a 'command' key; got %s",
295              event, index, type(raw).__name__,
296          )
297          return None
298  
299      command = raw.get("command")
300      if not isinstance(command, str) or not command.strip():
301          logger.warning(
302              "hooks.%s[%d] is missing a non-empty 'command' field",
303              event, index,
304          )
305          return None
306  
307      matcher = raw.get("matcher")
308      if matcher is not None and not isinstance(matcher, str):
309          logger.warning(
310              "hooks.%s[%d].matcher must be a string regex; ignoring",
311              event, index,
312          )
313          matcher = None
314  
315      if matcher is not None and event not in ("pre_tool_call", "post_tool_call"):
316          logger.warning(
317              "hooks.%s[%d].matcher=%r will be ignored at runtime — the "
318              "matcher field is only honored for pre_tool_call / "
319              "post_tool_call.  The hook will fire on every %s event.",
320              event, index, matcher, event,
321          )
322          matcher = None
323  
324      timeout_raw = raw.get("timeout", DEFAULT_TIMEOUT_SECONDS)
325      try:
326          timeout = int(timeout_raw)
327      except (TypeError, ValueError):
328          logger.warning(
329              "hooks.%s[%d].timeout must be an int (got %r); using default %ds",
330              event, index, timeout_raw, DEFAULT_TIMEOUT_SECONDS,
331          )
332          timeout = DEFAULT_TIMEOUT_SECONDS
333  
334      if timeout < 1:
335          logger.warning(
336              "hooks.%s[%d].timeout must be >=1; using default %ds",
337              event, index, DEFAULT_TIMEOUT_SECONDS,
338          )
339          timeout = DEFAULT_TIMEOUT_SECONDS
340  
341      if timeout > MAX_TIMEOUT_SECONDS:
342          logger.warning(
343              "hooks.%s[%d].timeout=%ds exceeds max %ds; clamping",
344              event, index, timeout, MAX_TIMEOUT_SECONDS,
345          )
346          timeout = MAX_TIMEOUT_SECONDS
347  
348      return ShellHookSpec(
349          event=event,
350          command=command.strip(),
351          matcher=matcher,
352          timeout=timeout,
353      )
354  
355  
356  # ---------------------------------------------------------------------------
357  # Subprocess callback
358  # ---------------------------------------------------------------------------
359  
360  _TOP_LEVEL_PAYLOAD_KEYS = {"tool_name", "args", "session_id", "parent_session_id"}
361  
362  
363  def _spawn(spec: ShellHookSpec, stdin_json: str) -> Dict[str, Any]:
364      """Run ``spec.command`` as a subprocess with ``stdin_json`` on stdin.
365  
366      Returns a diagnostic dict with the same keys for every outcome
367      (``returncode``, ``stdout``, ``stderr``, ``timed_out``,
368      ``elapsed_seconds``, ``error``).  This is the single place the
369      subprocess is actually invoked — both the live callback path
370      (:func:`_make_callback`) and the CLI test helper (:func:`run_once`)
371      go through it.
372      """
373      result: Dict[str, Any] = {
374          "returncode": None,
375          "stdout": "",
376          "stderr": "",
377          "timed_out": False,
378          "elapsed_seconds": 0.0,
379          "error": None,
380      }
381      try:
382          argv = shlex.split(os.path.expanduser(spec.command))
383      except ValueError as exc:
384          result["error"] = f"command {spec.command!r} cannot be parsed: {exc}"
385          return result
386      if not argv:
387          result["error"] = "empty command"
388          return result
389  
390      t0 = time.monotonic()
391      try:
392          proc = subprocess.run(
393              argv,
394              input=stdin_json,
395              capture_output=True,
396              timeout=spec.timeout,
397              text=True,
398              shell=False,
399          )
400      except subprocess.TimeoutExpired:
401          result["timed_out"] = True
402          result["elapsed_seconds"] = round(time.monotonic() - t0, 3)
403          return result
404      except FileNotFoundError:
405          result["error"] = "command not found"
406          return result
407      except PermissionError:
408          result["error"] = "command not executable"
409          return result
410      except Exception as exc:  # pragma: no cover — defensive
411          result["error"] = str(exc)
412          return result
413  
414      result["returncode"] = proc.returncode
415      result["stdout"] = proc.stdout or ""
416      result["stderr"] = proc.stderr or ""
417      result["elapsed_seconds"] = round(time.monotonic() - t0, 3)
418      return result
419  
420  
421  def _make_callback(spec: ShellHookSpec) -> Callable[..., Optional[Dict[str, Any]]]:
422      """Build the closure that ``invoke_hook()`` will call per firing."""
423  
424      def _callback(**kwargs: Any) -> Optional[Dict[str, Any]]:
425          # Matcher gate — only meaningful for tool-scoped events.
426          if spec.event in ("pre_tool_call", "post_tool_call"):
427              if not spec.matches_tool(kwargs.get("tool_name")):
428                  return None
429  
430          r = _spawn(spec, _serialize_payload(spec.event, kwargs))
431  
432          if r["error"]:
433              logger.warning(
434                  "shell hook failed (event=%s command=%s): %s",
435                  spec.event, spec.command, r["error"],
436              )
437              return None
438          if r["timed_out"]:
439              logger.warning(
440                  "shell hook timed out after %.2fs (event=%s command=%s)",
441                  r["elapsed_seconds"], spec.event, spec.command,
442              )
443              return None
444  
445          stderr = r["stderr"].strip()
446          if stderr:
447              logger.debug(
448                  "shell hook stderr (event=%s command=%s): %s",
449                  spec.event, spec.command, stderr[:400],
450              )
451          # Non-zero exits: log but still parse stdout so scripts that
452          # signal failure via exit code can also return a block directive.
453          if r["returncode"] != 0:
454              logger.warning(
455                  "shell hook exited %d (event=%s command=%s); stderr=%s",
456                  r["returncode"], spec.event, spec.command, stderr[:400],
457              )
458          return _parse_response(spec.event, r["stdout"])
459  
460      _callback.__name__ = f"shell_hook[{spec.event}:{spec.command}]"
461      _callback.__qualname__ = _callback.__name__
462      return _callback
463  
464  
465  def _serialize_payload(event: str, kwargs: Dict[str, Any]) -> str:
466      """Render the stdin JSON payload.  Unserialisable values are
467      stringified via ``default=str`` rather than dropped."""
468      extras = {k: v for k, v in kwargs.items() if k not in _TOP_LEVEL_PAYLOAD_KEYS}
469      try:
470          cwd = str(Path.cwd())
471      except OSError:
472          cwd = ""
473      payload = {
474          "hook_event_name": event,
475          "tool_name": kwargs.get("tool_name"),
476          "tool_input": kwargs.get("args") if isinstance(kwargs.get("args"), dict) else None,
477          "session_id": kwargs.get("session_id") or kwargs.get("parent_session_id") or "",
478          "cwd": cwd,
479          "extra": extras,
480      }
481      return json.dumps(payload, ensure_ascii=False, default=str)
482  
483  
484  def _parse_response(event: str, stdout: str) -> Optional[Dict[str, Any]]:
485      """Translate stdout JSON into a Hermes wire-shape dict.
486  
487      For ``pre_tool_call`` the Claude-Code-style ``{"decision": "block",
488      "reason": "..."}`` payload is translated into the canonical Hermes
489      ``{"action": "block", "message": "..."}`` shape expected by
490      :func:`hermes_cli.plugins.get_pre_tool_call_block_message`.  This is
491      the single most important correctness invariant in this module —
492      skipping the translation silently breaks every ``pre_tool_call``
493      block directive.
494  
495      For ``pre_llm_call``, ``{"context": "..."}`` is passed through
496      unchanged to match the existing plugin-hook contract.
497  
498      Anything else returns ``None``.
499      """
500      stdout = (stdout or "").strip()
501      if not stdout:
502          return None
503  
504      try:
505          data = json.loads(stdout)
506      except json.JSONDecodeError:
507          logger.warning(
508              "shell hook stdout was not valid JSON (event=%s): %s",
509              event, stdout[:200],
510          )
511          return None
512  
513      if not isinstance(data, dict):
514          return None
515  
516      if event == "pre_tool_call":
517          if data.get("action") == "block":
518              message = data.get("message") or data.get("reason") or ""
519              if isinstance(message, str) and message:
520                  return {"action": "block", "message": message}
521          if data.get("decision") == "block":
522              message = data.get("reason") or data.get("message") or ""
523              if isinstance(message, str) and message:
524                  return {"action": "block", "message": message}
525          return None
526  
527      context = data.get("context")
528      if isinstance(context, str) and context.strip():
529          return {"context": context}
530  
531      return None
532  
533  
534  # ---------------------------------------------------------------------------
535  # Allowlist / consent
536  # ---------------------------------------------------------------------------
537  
538  def allowlist_path() -> Path:
539      """Path to the per-user shell-hook allowlist file."""
540      return get_hermes_home() / ALLOWLIST_FILENAME
541  
542  
543  def load_allowlist() -> Dict[str, Any]:
544      """Return the parsed allowlist, or an empty skeleton if absent."""
545      try:
546          raw = json.loads(allowlist_path().read_text())
547      except (FileNotFoundError, json.JSONDecodeError, OSError):
548          return {"approvals": []}
549      if not isinstance(raw, dict):
550          return {"approvals": []}
551      approvals = raw.get("approvals")
552      if not isinstance(approvals, list):
553          raw["approvals"] = []
554      return raw
555  
556  
557  def save_allowlist(data: Dict[str, Any]) -> None:
558      """Atomically persist the allowlist via per-process ``mkstemp`` +
559      ``os.replace``.  Cross-process read-modify-write races are handled
560      by :func:`_locked_update_approvals` (``fcntl.flock``).  On OSError
561      the failure is logged; the in-process hook still registers but
562      the approval won't survive across runs."""
563      p = allowlist_path()
564      try:
565          p.parent.mkdir(parents=True, exist_ok=True)
566          fd, tmp_path = tempfile.mkstemp(
567              prefix=f"{p.name}.", suffix=".tmp", dir=str(p.parent),
568          )
569          try:
570              with os.fdopen(fd, "w") as fh:
571                  fh.write(json.dumps(data, indent=2, sort_keys=True))
572              atomic_replace(tmp_path, p)
573          except Exception:
574              try:
575                  os.unlink(tmp_path)
576              except OSError:
577                  pass
578              raise
579      except OSError as exc:
580          logger.warning(
581              "Failed to persist shell hook allowlist to %s: %s. "
582              "The approval is in-memory for this run, but the next "
583              "startup will re-prompt (or skip registration on non-TTY "
584              "runs without --accept-hooks / HERMES_ACCEPT_HOOKS).",
585              p, exc,
586          )
587  
588  
589  def _is_allowlisted(event: str, command: str) -> bool:
590      data = load_allowlist()
591      return any(
592          isinstance(e, dict)
593          and e.get("event") == event
594          and e.get("command") == command
595          for e in data.get("approvals", [])
596      )
597  
598  
599  @contextmanager
600  def _locked_update_approvals() -> Iterator[Dict[str, Any]]:
601      """Serialise read-modify-write on the allowlist across processes.
602  
603      Holds an exclusive ``flock`` on a sibling lock file for the duration
604      of the update so concurrent ``_record_approval``/``revoke`` callers
605      cannot clobber each other's changes (the race Codex reproduced with
606      20–50 simultaneous writers).  Falls back to an in-process lock on
607      platforms without ``fcntl``.
608      """
609      p = allowlist_path()
610      p.parent.mkdir(parents=True, exist_ok=True)
611      lock_path = p.with_suffix(p.suffix + ".lock")
612  
613      if fcntl is None:  # pragma: no cover — non-POSIX fallback
614          with _allowlist_write_lock:
615              data = load_allowlist()
616              yield data
617              save_allowlist(data)
618          return
619  
620      with open(lock_path, "a+") as lock_fh:
621          fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX)
622          try:
623              data = load_allowlist()
624              yield data
625              save_allowlist(data)
626          finally:
627              fcntl.flock(lock_fh.fileno(), fcntl.LOCK_UN)
628  
629  
630  def _prompt_and_record(
631      event: str, command: str, *, accept_hooks: bool,
632  ) -> bool:
633      """Decide whether to approve an unseen ``(event, command)`` pair.
634      Returns ``True`` iff the approval was granted and recorded.
635      """
636      if accept_hooks:
637          _record_approval(event, command)
638          logger.info(
639              "shell hook auto-approved via --accept-hooks / env / config: "
640              "%s -> %s", event, command,
641          )
642          return True
643  
644      if not sys.stdin.isatty():
645          return False
646  
647      print(
648          f"\n⚠ Hermes is about to register a shell hook that will run a\n"
649          f"  command on your behalf.\n\n"
650          f"    Event:   {event}\n"
651          f"    Command: {command}\n\n"
652          f"  Commands run with your full user credentials.  Only approve\n"
653          f"  commands you trust."
654      )
655      try:
656          answer = input("Allow this hook to run? [y/N]: ").strip().lower()
657      except (EOFError, KeyboardInterrupt):
658          print()  # keep the terminal tidy after ^C
659          return False
660  
661      if answer in ("y", "yes"):
662          _record_approval(event, command)
663          return True
664  
665      return False
666  
667  
668  def _record_approval(event: str, command: str) -> None:
669      entry = {
670          "event": event,
671          "command": command,
672          "approved_at": _utc_now_iso(),
673          "script_mtime_at_approval": script_mtime_iso(command),
674      }
675      with _locked_update_approvals() as data:
676          data["approvals"] = [
677              e for e in data.get("approvals", [])
678              if not (
679                  isinstance(e, dict)
680                  and e.get("event") == event
681                  and e.get("command") == command
682              )
683          ] + [entry]
684  
685  
686  def _utc_now_iso() -> str:
687      return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z")
688  
689  
690  def revoke(command: str) -> int:
691      """Remove every allowlist entry matching ``command``.
692  
693      Returns the number of entries removed.  Does not unregister any
694      callbacks that are already live on the plugin manager in the current
695      process — restart the CLI / gateway to drop them.
696      """
697      with _locked_update_approvals() as data:
698          before = len(data.get("approvals", []))
699          data["approvals"] = [
700              e for e in data.get("approvals", [])
701              if not (isinstance(e, dict) and e.get("command") == command)
702          ]
703          after = len(data["approvals"])
704      return before - after
705  
706  
707  _SCRIPT_EXTENSIONS: Tuple[str, ...] = (
708      ".sh", ".bash", ".zsh", ".fish",
709      ".py", ".pyw",
710      ".rb", ".pl", ".lua",
711      ".js", ".mjs", ".cjs", ".ts",
712  )
713  
714  
715  def _command_script_path(command: str) -> str:
716      """Return the script path from ``command`` for doctor / drift checks.
717  
718      Prefers a token ending in a known script extension, then a token
719      containing ``/`` or leading ``~``, then the first token.  Handles
720      ``python3 /path/hook.py``, ``/usr/bin/env bash hook.sh``, and the
721      common bare-path form.
722      """
723      try:
724          parts = shlex.split(command)
725      except ValueError:
726          return command
727      if not parts:
728          return command
729      for part in parts:
730          if part.lower().endswith(_SCRIPT_EXTENSIONS):
731              return part
732      for part in parts:
733          if "/" in part or part.startswith("~"):
734              return part
735      return parts[0]
736  
737  
738  # ---------------------------------------------------------------------------
739  # Helpers for accept-hooks resolution
740  # ---------------------------------------------------------------------------
741  
742  def _resolve_effective_accept(
743      cfg: Dict[str, Any], accept_hooks_arg: bool,
744  ) -> bool:
745      """Combine all three opt-in channels into a single boolean.
746  
747      Precedence (any truthy source flips us on):
748        1. ``--accept-hooks`` flag (CLI) / explicit argument
749        2. ``HERMES_ACCEPT_HOOKS`` env var
750        3. ``hooks_auto_accept: true`` in ``cli-config.yaml``
751      """
752      if accept_hooks_arg:
753          return True
754      env = os.environ.get("HERMES_ACCEPT_HOOKS", "").strip().lower()
755      if env in ("1", "true", "yes", "on"):
756          return True
757      cfg_val = cfg.get("hooks_auto_accept", False)
758      if isinstance(cfg_val, bool):
759          return cfg_val
760      if isinstance(cfg_val, str):
761          return cfg_val.strip().lower() in ("1", "true", "yes", "on")
762      return False
763  
764  
765  # ---------------------------------------------------------------------------
766  # Introspection (used by `hermes hooks` CLI)
767  # ---------------------------------------------------------------------------
768  
769  def allowlist_entry_for(event: str, command: str) -> Optional[Dict[str, Any]]:
770      """Return the allowlist record for this pair, if any."""
771      for e in load_allowlist().get("approvals", []):
772          if (
773              isinstance(e, dict)
774              and e.get("event") == event
775              and e.get("command") == command
776          ):
777              return e
778      return None
779  
780  
781  def script_mtime_iso(command: str) -> Optional[str]:
782      """ISO-8601 mtime of the resolved script path, or ``None`` if the
783      script is missing."""
784      path = _command_script_path(command)
785      if not path:
786          return None
787      try:
788          expanded = os.path.expanduser(path)
789          return datetime.fromtimestamp(
790              os.path.getmtime(expanded), tz=timezone.utc,
791          ).isoformat().replace("+00:00", "Z")
792      except OSError:
793          return None
794  
795  
796  def script_is_executable(command: str) -> bool:
797      """Return ``True`` iff ``command`` is runnable as configured.
798  
799      For a bare invocation (``/path/hook.sh``) the script itself must be
800      executable.  For interpreter-prefixed commands (``python3
801      /path/hook.py``, ``/usr/bin/env bash hook.sh``) the script just has
802      to be readable — the interpreter doesn't care about the ``X_OK``
803      bit.  Mirrors what ``_spawn`` would actually do at runtime."""
804      path = _command_script_path(command)
805      if not path:
806          return False
807      expanded = os.path.expanduser(path)
808      if not os.path.isfile(expanded):
809          return False
810      try:
811          argv = shlex.split(command)
812      except ValueError:
813          return False
814      is_bare_invocation = bool(argv) and argv[0] == path
815      required = os.X_OK if is_bare_invocation else os.R_OK
816      return os.access(expanded, required)
817  
818  
819  def run_once(
820      spec: ShellHookSpec, kwargs: Dict[str, Any],
821  ) -> Dict[str, Any]:
822      """Fire a single shell-hook invocation with a synthetic payload.
823      Used by ``hermes hooks test`` and ``hermes hooks doctor``.
824  
825      ``kwargs`` is the same dict that :func:`hermes_cli.plugins.invoke_hook`
826      would pass at runtime.  It is routed through :func:`_serialize_payload`
827      so the synthetic stdin exactly matches what a real hook firing would
828      produce — otherwise scripts tested via ``hermes hooks test`` could
829      diverge silently from production behaviour.
830  
831      Returns the :func:`_spawn` diagnostic dict plus a ``parsed`` field
832      holding the canonical Hermes-wire-shape response."""
833      stdin_json = _serialize_payload(spec.event, kwargs)
834      result = _spawn(spec, stdin_json)
835      result["parsed"] = _parse_response(spec.event, result["stdout"])
836      return result