shell_hooks.py
1 """ 2 Shell-script hooks bridge. 3 4 Reads the ``hooks:`` block from ``cli-config.yaml``, prompts the user for 5 consent on first use of each ``(event, command)`` pair, and registers 6 callbacks on the existing plugin hook manager so every existing 7 ``invoke_hook()`` site dispatches to the configured shell scripts — with 8 zero changes to call sites. 9 10 Design notes 11 ------------ 12 * Python plugins and shell hooks compose naturally: both flow through 13 :func:`hermes_cli.plugins.invoke_hook` and its aggregators. Python 14 plugins are registered first (via ``discover_and_load()``) so their 15 block decisions win ties over shell-hook blocks. 16 * Subprocess execution uses ``shlex.split(os.path.expanduser(command))`` 17 with ``shell=False`` — no shell injection footguns. Users that need 18 pipes/redirection wrap their logic in a script. 19 * First-use consent is gated by the allowlist under 20 ``~/.hermes/shell-hooks-allowlist.json``. Non-TTY callers must pass 21 ``accept_hooks=True`` (resolved from ``--accept-hooks``, 22 ``HERMES_ACCEPT_HOOKS``, or ``hooks_auto_accept: true`` in config) 23 for registration to succeed without a prompt. 24 * Registration is idempotent — safe to invoke from both the CLI entry 25 point (``hermes_cli/main.py``) and the gateway entry point 26 (``gateway/run.py``). 27 28 Wire protocol 29 ------------- 30 **stdin** (JSON, piped to the script):: 31 32 { 33 "hook_event_name": "pre_tool_call", 34 "tool_name": "terminal", 35 "tool_input": {"command": "rm -rf /"}, 36 "session_id": "sess_abc123", 37 "cwd": "/home/user/project", 38 "extra": {...} # event-specific kwargs 39 } 40 41 **stdout** (JSON, optional — anything else is ignored):: 42 43 # Block a pre_tool_call (either shape accepted; normalised internally): 44 {"decision": "block", "reason": "Forbidden command"} # Claude-Code-style 45 {"action": "block", "message": "Forbidden command"} # Hermes-canonical 46 47 # Inject context for pre_llm_call: 48 {"context": "Today is Friday"} 49 50 # Silent no-op: 51 <empty or any non-matching JSON object> 52 """ 53 54 from __future__ import annotations 55 56 import difflib 57 import json 58 import logging 59 import os 60 import re 61 import shlex 62 import subprocess 63 import sys 64 import tempfile 65 import threading 66 import time 67 from contextlib import contextmanager 68 from dataclasses import dataclass, field 69 from datetime import datetime, timezone 70 from pathlib import Path 71 from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple 72 73 try: 74 import fcntl # POSIX only; Windows falls back to best-effort without flock. 75 except ImportError: # pragma: no cover 76 fcntl = None # type: ignore[assignment] 77 78 from hermes_constants import get_hermes_home 79 from utils import atomic_replace 80 81 logger = logging.getLogger(__name__) 82 83 DEFAULT_TIMEOUT_SECONDS = 60 84 MAX_TIMEOUT_SECONDS = 300 85 ALLOWLIST_FILENAME = "shell-hooks-allowlist.json" 86 87 # (event, matcher, command) triples that have been wired to the plugin 88 # manager in the current process. Matcher is part of the key because 89 # the same script can legitimately register for different matchers under 90 # the same event (e.g. one entry per tool the user wants to gate). 91 # Second registration attempts for the exact same triple become no-ops 92 # so the CLI and gateway can both call register_from_config() safely. 93 _registered: Set[Tuple[str, Optional[str], str]] = set() 94 _registered_lock = threading.Lock() 95 96 # Intra-process lock for allowlist read-modify-write on platforms that 97 # lack ``fcntl`` (non-POSIX). Kept separate from ``_registered_lock`` 98 # because ``register_from_config`` already holds ``_registered_lock`` when 99 # it triggers ``_record_approval`` — reusing it here would self-deadlock 100 # (``threading.Lock`` is non-reentrant). POSIX callers use the sibling 101 # ``.lock`` file via ``fcntl.flock`` and bypass this. 102 _allowlist_write_lock = threading.Lock() 103 104 105 @dataclass 106 class ShellHookSpec: 107 """Parsed and validated representation of a single ``hooks:`` entry.""" 108 109 event: str 110 command: str 111 matcher: Optional[str] = None 112 timeout: int = DEFAULT_TIMEOUT_SECONDS 113 compiled_matcher: Optional[re.Pattern] = field(default=None, repr=False) 114 115 def __post_init__(self) -> None: 116 # Strip whitespace introduced by YAML quirks (e.g. multi-line string 117 # folding) — a matcher of " terminal" would otherwise silently fail 118 # to match "terminal" without any diagnostic. 119 if isinstance(self.matcher, str): 120 stripped = self.matcher.strip() 121 self.matcher = stripped if stripped else None 122 if self.matcher: 123 try: 124 self.compiled_matcher = re.compile(self.matcher) 125 except re.error as exc: 126 logger.warning( 127 "shell hook matcher %r is invalid (%s) — treating as " 128 "literal equality", self.matcher, exc, 129 ) 130 self.compiled_matcher = None 131 132 def matches_tool(self, tool_name: Optional[str]) -> bool: 133 if not self.matcher: 134 return True 135 if tool_name is None: 136 return False 137 if self.compiled_matcher is not None: 138 return self.compiled_matcher.fullmatch(tool_name) is not None 139 # compiled_matcher is None only when the regex failed to compile, 140 # in which case we already warned and fall back to literal equality. 141 return tool_name == self.matcher 142 143 144 # --------------------------------------------------------------------------- 145 # Public API 146 # --------------------------------------------------------------------------- 147 148 def register_from_config( 149 cfg: Optional[Dict[str, Any]], 150 *, 151 accept_hooks: bool = False, 152 ) -> List[ShellHookSpec]: 153 """Register every configured shell hook on the plugin manager. 154 155 ``cfg`` is the full parsed config dict (``hermes_cli.config.load_config`` 156 output). The ``hooks:`` key is read out of it. Missing, empty, or 157 non-dict ``hooks`` is treated as zero configured hooks. 158 159 ``accept_hooks=True`` skips the TTY consent prompt — the caller is 160 promising that the user has opted in via a flag, env var, or config 161 setting. ``HERMES_ACCEPT_HOOKS=1`` and ``hooks_auto_accept: true`` are 162 also honored inside this function so either CLI or gateway call sites 163 pick them up. 164 165 Returns the list of :class:`ShellHookSpec` entries that ended up wired 166 up on the plugin manager. Skipped entries (unknown events, malformed, 167 not allowlisted, already registered) are logged but not returned. 168 """ 169 if not isinstance(cfg, dict): 170 return [] 171 172 effective_accept = _resolve_effective_accept(cfg, accept_hooks) 173 174 specs = _parse_hooks_block(cfg.get("hooks")) 175 if not specs: 176 return [] 177 178 registered: List[ShellHookSpec] = [] 179 180 # Import lazily — avoids circular imports at module-load time. 181 from hermes_cli.plugins import get_plugin_manager 182 183 manager = get_plugin_manager() 184 185 # Idempotence + allowlist read happen under the lock; the TTY 186 # prompt runs outside so other threads aren't parked on a blocking 187 # input(). Mutation re-takes the lock with a defensive idempotence 188 # re-check in case two callers ever race through the prompt. 189 for spec in specs: 190 key = (spec.event, spec.matcher, spec.command) 191 with _registered_lock: 192 if key in _registered: 193 continue 194 already_allowlisted = _is_allowlisted(spec.event, spec.command) 195 196 if not already_allowlisted: 197 if not _prompt_and_record( 198 spec.event, spec.command, accept_hooks=effective_accept, 199 ): 200 logger.warning( 201 "shell hook for %s (%s) not allowlisted — skipped. " 202 "Use --accept-hooks / HERMES_ACCEPT_HOOKS=1 / " 203 "hooks_auto_accept: true, or approve at the TTY " 204 "prompt next run.", 205 spec.event, spec.command, 206 ) 207 continue 208 209 with _registered_lock: 210 if key in _registered: 211 continue 212 manager._hooks.setdefault(spec.event, []).append(_make_callback(spec)) 213 _registered.add(key) 214 registered.append(spec) 215 logger.info( 216 "shell hook registered: %s -> %s (matcher=%s, timeout=%ds)", 217 spec.event, spec.command, spec.matcher, spec.timeout, 218 ) 219 220 return registered 221 222 223 def iter_configured_hooks(cfg: Optional[Dict[str, Any]]) -> List[ShellHookSpec]: 224 """Return the parsed ``ShellHookSpec`` entries from config without 225 registering anything. Used by ``hermes hooks list`` and ``doctor``.""" 226 if not isinstance(cfg, dict): 227 return [] 228 return _parse_hooks_block(cfg.get("hooks")) 229 230 231 def reset_for_tests() -> None: 232 """Clear the idempotence set. Test-only helper.""" 233 with _registered_lock: 234 _registered.clear() 235 236 237 # --------------------------------------------------------------------------- 238 # Config parsing 239 # --------------------------------------------------------------------------- 240 241 def _parse_hooks_block(hooks_cfg: Any) -> List[ShellHookSpec]: 242 """Normalise the ``hooks:`` dict into a flat list of ``ShellHookSpec``. 243 244 Malformed entries warn-and-skip — we never raise from config parsing 245 because a broken hook must not crash the agent. 246 """ 247 from hermes_cli.plugins import VALID_HOOKS 248 249 if not isinstance(hooks_cfg, dict): 250 return [] 251 252 specs: List[ShellHookSpec] = [] 253 254 for event_name, entries in hooks_cfg.items(): 255 if event_name not in VALID_HOOKS: 256 suggestion = difflib.get_close_matches( 257 str(event_name), VALID_HOOKS, n=1, cutoff=0.6, 258 ) 259 if suggestion: 260 logger.warning( 261 "unknown hook event %r in hooks: config — did you mean %r?", 262 event_name, suggestion[0], 263 ) 264 else: 265 logger.warning( 266 "unknown hook event %r in hooks: config (valid: %s)", 267 event_name, ", ".join(sorted(VALID_HOOKS)), 268 ) 269 continue 270 271 if entries is None: 272 continue 273 274 if not isinstance(entries, list): 275 logger.warning( 276 "hooks.%s must be a list of hook definitions; got %s", 277 event_name, type(entries).__name__, 278 ) 279 continue 280 281 for i, raw in enumerate(entries): 282 spec = _parse_single_entry(event_name, i, raw) 283 if spec is not None: 284 specs.append(spec) 285 286 return specs 287 288 289 def _parse_single_entry( 290 event: str, index: int, raw: Any, 291 ) -> Optional[ShellHookSpec]: 292 if not isinstance(raw, dict): 293 logger.warning( 294 "hooks.%s[%d] must be a mapping with a 'command' key; got %s", 295 event, index, type(raw).__name__, 296 ) 297 return None 298 299 command = raw.get("command") 300 if not isinstance(command, str) or not command.strip(): 301 logger.warning( 302 "hooks.%s[%d] is missing a non-empty 'command' field", 303 event, index, 304 ) 305 return None 306 307 matcher = raw.get("matcher") 308 if matcher is not None and not isinstance(matcher, str): 309 logger.warning( 310 "hooks.%s[%d].matcher must be a string regex; ignoring", 311 event, index, 312 ) 313 matcher = None 314 315 if matcher is not None and event not in ("pre_tool_call", "post_tool_call"): 316 logger.warning( 317 "hooks.%s[%d].matcher=%r will be ignored at runtime — the " 318 "matcher field is only honored for pre_tool_call / " 319 "post_tool_call. The hook will fire on every %s event.", 320 event, index, matcher, event, 321 ) 322 matcher = None 323 324 timeout_raw = raw.get("timeout", DEFAULT_TIMEOUT_SECONDS) 325 try: 326 timeout = int(timeout_raw) 327 except (TypeError, ValueError): 328 logger.warning( 329 "hooks.%s[%d].timeout must be an int (got %r); using default %ds", 330 event, index, timeout_raw, DEFAULT_TIMEOUT_SECONDS, 331 ) 332 timeout = DEFAULT_TIMEOUT_SECONDS 333 334 if timeout < 1: 335 logger.warning( 336 "hooks.%s[%d].timeout must be >=1; using default %ds", 337 event, index, DEFAULT_TIMEOUT_SECONDS, 338 ) 339 timeout = DEFAULT_TIMEOUT_SECONDS 340 341 if timeout > MAX_TIMEOUT_SECONDS: 342 logger.warning( 343 "hooks.%s[%d].timeout=%ds exceeds max %ds; clamping", 344 event, index, timeout, MAX_TIMEOUT_SECONDS, 345 ) 346 timeout = MAX_TIMEOUT_SECONDS 347 348 return ShellHookSpec( 349 event=event, 350 command=command.strip(), 351 matcher=matcher, 352 timeout=timeout, 353 ) 354 355 356 # --------------------------------------------------------------------------- 357 # Subprocess callback 358 # --------------------------------------------------------------------------- 359 360 _TOP_LEVEL_PAYLOAD_KEYS = {"tool_name", "args", "session_id", "parent_session_id"} 361 362 363 def _spawn(spec: ShellHookSpec, stdin_json: str) -> Dict[str, Any]: 364 """Run ``spec.command`` as a subprocess with ``stdin_json`` on stdin. 365 366 Returns a diagnostic dict with the same keys for every outcome 367 (``returncode``, ``stdout``, ``stderr``, ``timed_out``, 368 ``elapsed_seconds``, ``error``). This is the single place the 369 subprocess is actually invoked — both the live callback path 370 (:func:`_make_callback`) and the CLI test helper (:func:`run_once`) 371 go through it. 372 """ 373 result: Dict[str, Any] = { 374 "returncode": None, 375 "stdout": "", 376 "stderr": "", 377 "timed_out": False, 378 "elapsed_seconds": 0.0, 379 "error": None, 380 } 381 try: 382 argv = shlex.split(os.path.expanduser(spec.command)) 383 except ValueError as exc: 384 result["error"] = f"command {spec.command!r} cannot be parsed: {exc}" 385 return result 386 if not argv: 387 result["error"] = "empty command" 388 return result 389 390 t0 = time.monotonic() 391 try: 392 proc = subprocess.run( 393 argv, 394 input=stdin_json, 395 capture_output=True, 396 timeout=spec.timeout, 397 text=True, 398 shell=False, 399 ) 400 except subprocess.TimeoutExpired: 401 result["timed_out"] = True 402 result["elapsed_seconds"] = round(time.monotonic() - t0, 3) 403 return result 404 except FileNotFoundError: 405 result["error"] = "command not found" 406 return result 407 except PermissionError: 408 result["error"] = "command not executable" 409 return result 410 except Exception as exc: # pragma: no cover — defensive 411 result["error"] = str(exc) 412 return result 413 414 result["returncode"] = proc.returncode 415 result["stdout"] = proc.stdout or "" 416 result["stderr"] = proc.stderr or "" 417 result["elapsed_seconds"] = round(time.monotonic() - t0, 3) 418 return result 419 420 421 def _make_callback(spec: ShellHookSpec) -> Callable[..., Optional[Dict[str, Any]]]: 422 """Build the closure that ``invoke_hook()`` will call per firing.""" 423 424 def _callback(**kwargs: Any) -> Optional[Dict[str, Any]]: 425 # Matcher gate — only meaningful for tool-scoped events. 426 if spec.event in ("pre_tool_call", "post_tool_call"): 427 if not spec.matches_tool(kwargs.get("tool_name")): 428 return None 429 430 r = _spawn(spec, _serialize_payload(spec.event, kwargs)) 431 432 if r["error"]: 433 logger.warning( 434 "shell hook failed (event=%s command=%s): %s", 435 spec.event, spec.command, r["error"], 436 ) 437 return None 438 if r["timed_out"]: 439 logger.warning( 440 "shell hook timed out after %.2fs (event=%s command=%s)", 441 r["elapsed_seconds"], spec.event, spec.command, 442 ) 443 return None 444 445 stderr = r["stderr"].strip() 446 if stderr: 447 logger.debug( 448 "shell hook stderr (event=%s command=%s): %s", 449 spec.event, spec.command, stderr[:400], 450 ) 451 # Non-zero exits: log but still parse stdout so scripts that 452 # signal failure via exit code can also return a block directive. 453 if r["returncode"] != 0: 454 logger.warning( 455 "shell hook exited %d (event=%s command=%s); stderr=%s", 456 r["returncode"], spec.event, spec.command, stderr[:400], 457 ) 458 return _parse_response(spec.event, r["stdout"]) 459 460 _callback.__name__ = f"shell_hook[{spec.event}:{spec.command}]" 461 _callback.__qualname__ = _callback.__name__ 462 return _callback 463 464 465 def _serialize_payload(event: str, kwargs: Dict[str, Any]) -> str: 466 """Render the stdin JSON payload. Unserialisable values are 467 stringified via ``default=str`` rather than dropped.""" 468 extras = {k: v for k, v in kwargs.items() if k not in _TOP_LEVEL_PAYLOAD_KEYS} 469 try: 470 cwd = str(Path.cwd()) 471 except OSError: 472 cwd = "" 473 payload = { 474 "hook_event_name": event, 475 "tool_name": kwargs.get("tool_name"), 476 "tool_input": kwargs.get("args") if isinstance(kwargs.get("args"), dict) else None, 477 "session_id": kwargs.get("session_id") or kwargs.get("parent_session_id") or "", 478 "cwd": cwd, 479 "extra": extras, 480 } 481 return json.dumps(payload, ensure_ascii=False, default=str) 482 483 484 def _parse_response(event: str, stdout: str) -> Optional[Dict[str, Any]]: 485 """Translate stdout JSON into a Hermes wire-shape dict. 486 487 For ``pre_tool_call`` the Claude-Code-style ``{"decision": "block", 488 "reason": "..."}`` payload is translated into the canonical Hermes 489 ``{"action": "block", "message": "..."}`` shape expected by 490 :func:`hermes_cli.plugins.get_pre_tool_call_block_message`. This is 491 the single most important correctness invariant in this module — 492 skipping the translation silently breaks every ``pre_tool_call`` 493 block directive. 494 495 For ``pre_llm_call``, ``{"context": "..."}`` is passed through 496 unchanged to match the existing plugin-hook contract. 497 498 Anything else returns ``None``. 499 """ 500 stdout = (stdout or "").strip() 501 if not stdout: 502 return None 503 504 try: 505 data = json.loads(stdout) 506 except json.JSONDecodeError: 507 logger.warning( 508 "shell hook stdout was not valid JSON (event=%s): %s", 509 event, stdout[:200], 510 ) 511 return None 512 513 if not isinstance(data, dict): 514 return None 515 516 if event == "pre_tool_call": 517 if data.get("action") == "block": 518 message = data.get("message") or data.get("reason") or "" 519 if isinstance(message, str) and message: 520 return {"action": "block", "message": message} 521 if data.get("decision") == "block": 522 message = data.get("reason") or data.get("message") or "" 523 if isinstance(message, str) and message: 524 return {"action": "block", "message": message} 525 return None 526 527 context = data.get("context") 528 if isinstance(context, str) and context.strip(): 529 return {"context": context} 530 531 return None 532 533 534 # --------------------------------------------------------------------------- 535 # Allowlist / consent 536 # --------------------------------------------------------------------------- 537 538 def allowlist_path() -> Path: 539 """Path to the per-user shell-hook allowlist file.""" 540 return get_hermes_home() / ALLOWLIST_FILENAME 541 542 543 def load_allowlist() -> Dict[str, Any]: 544 """Return the parsed allowlist, or an empty skeleton if absent.""" 545 try: 546 raw = json.loads(allowlist_path().read_text()) 547 except (FileNotFoundError, json.JSONDecodeError, OSError): 548 return {"approvals": []} 549 if not isinstance(raw, dict): 550 return {"approvals": []} 551 approvals = raw.get("approvals") 552 if not isinstance(approvals, list): 553 raw["approvals"] = [] 554 return raw 555 556 557 def save_allowlist(data: Dict[str, Any]) -> None: 558 """Atomically persist the allowlist via per-process ``mkstemp`` + 559 ``os.replace``. Cross-process read-modify-write races are handled 560 by :func:`_locked_update_approvals` (``fcntl.flock``). On OSError 561 the failure is logged; the in-process hook still registers but 562 the approval won't survive across runs.""" 563 p = allowlist_path() 564 try: 565 p.parent.mkdir(parents=True, exist_ok=True) 566 fd, tmp_path = tempfile.mkstemp( 567 prefix=f"{p.name}.", suffix=".tmp", dir=str(p.parent), 568 ) 569 try: 570 with os.fdopen(fd, "w") as fh: 571 fh.write(json.dumps(data, indent=2, sort_keys=True)) 572 atomic_replace(tmp_path, p) 573 except Exception: 574 try: 575 os.unlink(tmp_path) 576 except OSError: 577 pass 578 raise 579 except OSError as exc: 580 logger.warning( 581 "Failed to persist shell hook allowlist to %s: %s. " 582 "The approval is in-memory for this run, but the next " 583 "startup will re-prompt (or skip registration on non-TTY " 584 "runs without --accept-hooks / HERMES_ACCEPT_HOOKS).", 585 p, exc, 586 ) 587 588 589 def _is_allowlisted(event: str, command: str) -> bool: 590 data = load_allowlist() 591 return any( 592 isinstance(e, dict) 593 and e.get("event") == event 594 and e.get("command") == command 595 for e in data.get("approvals", []) 596 ) 597 598 599 @contextmanager 600 def _locked_update_approvals() -> Iterator[Dict[str, Any]]: 601 """Serialise read-modify-write on the allowlist across processes. 602 603 Holds an exclusive ``flock`` on a sibling lock file for the duration 604 of the update so concurrent ``_record_approval``/``revoke`` callers 605 cannot clobber each other's changes (the race Codex reproduced with 606 20–50 simultaneous writers). Falls back to an in-process lock on 607 platforms without ``fcntl``. 608 """ 609 p = allowlist_path() 610 p.parent.mkdir(parents=True, exist_ok=True) 611 lock_path = p.with_suffix(p.suffix + ".lock") 612 613 if fcntl is None: # pragma: no cover — non-POSIX fallback 614 with _allowlist_write_lock: 615 data = load_allowlist() 616 yield data 617 save_allowlist(data) 618 return 619 620 with open(lock_path, "a+") as lock_fh: 621 fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX) 622 try: 623 data = load_allowlist() 624 yield data 625 save_allowlist(data) 626 finally: 627 fcntl.flock(lock_fh.fileno(), fcntl.LOCK_UN) 628 629 630 def _prompt_and_record( 631 event: str, command: str, *, accept_hooks: bool, 632 ) -> bool: 633 """Decide whether to approve an unseen ``(event, command)`` pair. 634 Returns ``True`` iff the approval was granted and recorded. 635 """ 636 if accept_hooks: 637 _record_approval(event, command) 638 logger.info( 639 "shell hook auto-approved via --accept-hooks / env / config: " 640 "%s -> %s", event, command, 641 ) 642 return True 643 644 if not sys.stdin.isatty(): 645 return False 646 647 print( 648 f"\n⚠ Hermes is about to register a shell hook that will run a\n" 649 f" command on your behalf.\n\n" 650 f" Event: {event}\n" 651 f" Command: {command}\n\n" 652 f" Commands run with your full user credentials. Only approve\n" 653 f" commands you trust." 654 ) 655 try: 656 answer = input("Allow this hook to run? [y/N]: ").strip().lower() 657 except (EOFError, KeyboardInterrupt): 658 print() # keep the terminal tidy after ^C 659 return False 660 661 if answer in ("y", "yes"): 662 _record_approval(event, command) 663 return True 664 665 return False 666 667 668 def _record_approval(event: str, command: str) -> None: 669 entry = { 670 "event": event, 671 "command": command, 672 "approved_at": _utc_now_iso(), 673 "script_mtime_at_approval": script_mtime_iso(command), 674 } 675 with _locked_update_approvals() as data: 676 data["approvals"] = [ 677 e for e in data.get("approvals", []) 678 if not ( 679 isinstance(e, dict) 680 and e.get("event") == event 681 and e.get("command") == command 682 ) 683 ] + [entry] 684 685 686 def _utc_now_iso() -> str: 687 return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z") 688 689 690 def revoke(command: str) -> int: 691 """Remove every allowlist entry matching ``command``. 692 693 Returns the number of entries removed. Does not unregister any 694 callbacks that are already live on the plugin manager in the current 695 process — restart the CLI / gateway to drop them. 696 """ 697 with _locked_update_approvals() as data: 698 before = len(data.get("approvals", [])) 699 data["approvals"] = [ 700 e for e in data.get("approvals", []) 701 if not (isinstance(e, dict) and e.get("command") == command) 702 ] 703 after = len(data["approvals"]) 704 return before - after 705 706 707 _SCRIPT_EXTENSIONS: Tuple[str, ...] = ( 708 ".sh", ".bash", ".zsh", ".fish", 709 ".py", ".pyw", 710 ".rb", ".pl", ".lua", 711 ".js", ".mjs", ".cjs", ".ts", 712 ) 713 714 715 def _command_script_path(command: str) -> str: 716 """Return the script path from ``command`` for doctor / drift checks. 717 718 Prefers a token ending in a known script extension, then a token 719 containing ``/`` or leading ``~``, then the first token. Handles 720 ``python3 /path/hook.py``, ``/usr/bin/env bash hook.sh``, and the 721 common bare-path form. 722 """ 723 try: 724 parts = shlex.split(command) 725 except ValueError: 726 return command 727 if not parts: 728 return command 729 for part in parts: 730 if part.lower().endswith(_SCRIPT_EXTENSIONS): 731 return part 732 for part in parts: 733 if "/" in part or part.startswith("~"): 734 return part 735 return parts[0] 736 737 738 # --------------------------------------------------------------------------- 739 # Helpers for accept-hooks resolution 740 # --------------------------------------------------------------------------- 741 742 def _resolve_effective_accept( 743 cfg: Dict[str, Any], accept_hooks_arg: bool, 744 ) -> bool: 745 """Combine all three opt-in channels into a single boolean. 746 747 Precedence (any truthy source flips us on): 748 1. ``--accept-hooks`` flag (CLI) / explicit argument 749 2. ``HERMES_ACCEPT_HOOKS`` env var 750 3. ``hooks_auto_accept: true`` in ``cli-config.yaml`` 751 """ 752 if accept_hooks_arg: 753 return True 754 env = os.environ.get("HERMES_ACCEPT_HOOKS", "").strip().lower() 755 if env in ("1", "true", "yes", "on"): 756 return True 757 cfg_val = cfg.get("hooks_auto_accept", False) 758 if isinstance(cfg_val, bool): 759 return cfg_val 760 if isinstance(cfg_val, str): 761 return cfg_val.strip().lower() in ("1", "true", "yes", "on") 762 return False 763 764 765 # --------------------------------------------------------------------------- 766 # Introspection (used by `hermes hooks` CLI) 767 # --------------------------------------------------------------------------- 768 769 def allowlist_entry_for(event: str, command: str) -> Optional[Dict[str, Any]]: 770 """Return the allowlist record for this pair, if any.""" 771 for e in load_allowlist().get("approvals", []): 772 if ( 773 isinstance(e, dict) 774 and e.get("event") == event 775 and e.get("command") == command 776 ): 777 return e 778 return None 779 780 781 def script_mtime_iso(command: str) -> Optional[str]: 782 """ISO-8601 mtime of the resolved script path, or ``None`` if the 783 script is missing.""" 784 path = _command_script_path(command) 785 if not path: 786 return None 787 try: 788 expanded = os.path.expanduser(path) 789 return datetime.fromtimestamp( 790 os.path.getmtime(expanded), tz=timezone.utc, 791 ).isoformat().replace("+00:00", "Z") 792 except OSError: 793 return None 794 795 796 def script_is_executable(command: str) -> bool: 797 """Return ``True`` iff ``command`` is runnable as configured. 798 799 For a bare invocation (``/path/hook.sh``) the script itself must be 800 executable. For interpreter-prefixed commands (``python3 801 /path/hook.py``, ``/usr/bin/env bash hook.sh``) the script just has 802 to be readable — the interpreter doesn't care about the ``X_OK`` 803 bit. Mirrors what ``_spawn`` would actually do at runtime.""" 804 path = _command_script_path(command) 805 if not path: 806 return False 807 expanded = os.path.expanduser(path) 808 if not os.path.isfile(expanded): 809 return False 810 try: 811 argv = shlex.split(command) 812 except ValueError: 813 return False 814 is_bare_invocation = bool(argv) and argv[0] == path 815 required = os.X_OK if is_bare_invocation else os.R_OK 816 return os.access(expanded, required) 817 818 819 def run_once( 820 spec: ShellHookSpec, kwargs: Dict[str, Any], 821 ) -> Dict[str, Any]: 822 """Fire a single shell-hook invocation with a synthetic payload. 823 Used by ``hermes hooks test`` and ``hermes hooks doctor``. 824 825 ``kwargs`` is the same dict that :func:`hermes_cli.plugins.invoke_hook` 826 would pass at runtime. It is routed through :func:`_serialize_payload` 827 so the synthetic stdin exactly matches what a real hook firing would 828 produce — otherwise scripts tested via ``hermes hooks test`` could 829 diverge silently from production behaviour. 830 831 Returns the :func:`_spawn` diagnostic dict plus a ``parsed`` field 832 holding the canonical Hermes-wire-shape response.""" 833 stdin_json = _serialize_payload(spec.event, kwargs) 834 result = _spawn(spec, stdin_json) 835 result["parsed"] = _parse_response(spec.event, result["stdout"]) 836 return result